Compare commits

...

14 Commits

Author SHA1 Message Date
rav4kumar
903d83f34f ref 2026-06-07 13:07:37 -07:00
rav4kumar
c9a23c17ab feat(long): smooth brake 2026-06-07 13:06:46 -07:00
rav4kumar
82d39601e1 feat(long): RadarDistance lead 2026-06-06 23:21:33 -07:00
rav4kumar
77e93f4498 feat(long): acceleration personality (eco/normal/sport) 2026-06-06 12:01:02 -07:00
rav4kumar
83de89e253 feat(dec): rework dynamic experimental controller 2026-06-06 11:45:24 -07:00
rav4kumar
04224e8747 Add custom params to sunnylink settings 2026-06-06 10:31:17 -07:00
rav4kumar
6012ebb7c7 ref 2026-06-06 10:30:46 -07:00
rav4kumar
e91dbe351e ref 2026-06-05 11:29:37 -07:00
rav4kumar
e25061fd08 fix mapd scorll 2026-06-05 11:28:47 -07:00
rav4kumar
baf56ae324 feat/relc 2026-06-05 11:06:06 -07:00
rav4kumar
9badd3fa40 mici-sla-ui 2026-06-05 11:05:29 -07:00
rav4kumar
2220e7fc11 point the submodule 2026-06-05 11:04:53 -07:00
rav4kumar
e862935209 toyota sp link and drive mode btn support 2026-06-05 11:04:49 -07:00
rav4kumar
9bd504a5cb abh, bsm 2026-06-05 11:04:43 -07:00
53 changed files with 2579 additions and 386 deletions

1
.gitmodules vendored
View File

@@ -4,6 +4,7 @@
[submodule "opendbc"]
path = opendbc_repo
url = https://github.com/sunnypilot/opendbc.git
branch = tn
[submodule "msgq"]
path = msgq_repo
url = https://github.com/commaai/msgq.git

View File

@@ -1,6 +1,6 @@
Version 0.11.2 (2026-06-15)
========================
* Acceleration Profile: select Eco, Normal, or Sport personality to shape acceleration briskness and gentle (ACC) deceleration
Version 0.11.1 (2026-05-18)
========================

View File

@@ -194,6 +194,7 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
aTarget @5 :Float32;
events @6 :List(OnroadEventSP.Event);
e2eAlerts @7 :E2eAlerts;
acceleration @8 :Acceleration;
struct DynamicExperimentalControl {
state @0 :DynamicExperimentalControlState;
@@ -296,6 +297,20 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
greenLightAlert @0 :Bool;
leadDepartAlert @1 :Bool;
}
struct Acceleration {
personality @0 :AccelerationPersonality;
enabled @1 :Bool;
maxAccel @2 :Float32;
tFollowOffset @3 :Float32;
jerkFactorScale @4 :Float32;
}
enum AccelerationPersonality {
eco @0;
normal @1;
sport @2;
}
}
struct OnroadEventSP @0xda96579883444c35 {
@@ -342,6 +357,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
laneChangeRoadEdge @24;
}
}
@@ -448,6 +464,8 @@ struct LiveMapDataSP @0xf416ec09499d9d19 {
struct ModelDataV2SP @0xa1680744031fdb2d {
laneTurnDirection @0 :TurnDirection;
leftLaneChangeEdgeBlock @1 :Bool;
rightLaneChangeEdgeBlock @2 :Bool;
enum TurnDirection {
none @0;

View File

@@ -179,12 +179,19 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RoadEdgeLaneChangeEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowTurnSignals", {PERSISTENT | BACKUP, BOOL, "0"}},
{"StandstillTimer", {PERSISTENT | BACKUP, BOOL, "0"}},
{"TrueVEgoUI", {PERSISTENT | BACKUP, BOOL, "0"}},
// toyota specific params
{"ToyotaAutoHold", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaEnhancedBsm", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaTSS2Long", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaDriveMode", {PERSISTENT | BACKUP, BOOL, "0"}},
// MADS params
{"Mads", {PERSISTENT | BACKUP, BOOL, "1"}},
{"MadsMainCruiseAllowed", {PERSISTENT | BACKUP, BOOL, "1"}},
@@ -226,6 +233,10 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"ToyotaStopAndGoHack", {PERSISTENT | BACKUP, BOOL, "0"}},
{"DynamicExperimentalControl", {PERSISTENT | BACKUP, BOOL, "0"}},
{"AccelPersonalityEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"AccelPersonality", {PERSISTENT | BACKUP, INT, "1"}},
{"RadarDistance", {PERSISTENT | BACKUP, BOOL, "0"}},
{"SmoothBraking", {PERSISTENT | BACKUP, BOOL, "0"}},
{"BlindSpot", {PERSISTENT | BACKUP, BOOL, "0"}},
// sunnypilot model params

View File

@@ -10,7 +10,7 @@ from cereal import car, log, custom
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog, ForwardingHandler
from opendbc.safety import ALTERNATIVE_EXPERIENCE
from opendbc.car import DT_CTRL, structs
from opendbc.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from opendbc.car.carlog import carlog
@@ -121,7 +121,13 @@ class Car:
self.CI, self.CP, self.CP_SP = CI, CI.CP, CI.CP_SP
self.RI = RI
# set alternative experiences from parameters
sp_toyota_auto_brake_hold = self.params.get_bool("ToyotaAutoHold")
self.CP.alternativeExperience = 0
if sp_toyota_auto_brake_hold:
self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.ALLOW_AEB
# mads
set_alternative_experience(self.CP, self.CP_SP, self.params)
set_car_specific_params(self.CP, self.CP_SP, self.params)

View File

@@ -56,7 +56,7 @@ class DesireHelper:
def get_lane_change_direction(CS):
return LaneChangeDirection.left if CS.leftBlinker else LaneChangeDirection.right
def update(self, carstate, lateral_active, lane_change_prob):
def update(self, carstate, lateral_active, lane_change_prob, left_edge_detected=False, right_edge_detected=False):
self.alc.update_params()
self.lane_turn_controller.update_params()
v_ego = carstate.vEgo
@@ -88,8 +88,8 @@ class DesireHelper:
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = (((carstate.leftBlindspot or left_edge_detected) and self.lane_change_direction == LaneChangeDirection.left) or
((carstate.rightBlindspot or right_edge_detected) and self.lane_change_direction == LaneChangeDirection.right))
self.alc.update_lane_change(blindspot_detected, carstate.brakePressed)

View File

@@ -4,6 +4,7 @@ from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N
from openpilot.common.pid import PIDController
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.sunnypilot.selfdrive.controls.lib.brake_smoother import BrakeOnsetSmoother
CONTROL_N_T_IDX = ModelConstants.T_IDXS[:CONTROL_N]
@@ -56,6 +57,7 @@ class LongControl:
(CP.longitudinalTuning.kiBP, CP.longitudinalTuning.kiV),
rate=1 / DT_CTRL)
self.last_output_accel = 0.0
self.brake_smoother = BrakeOnsetSmoother()
def reset(self):
self.pid.reset()
@@ -87,6 +89,8 @@ class LongControl:
error = a_target - CS.aEgo
output_accel = self.pid.update(error, speed=CS.vEgo,
feedforward=a_target)
# Ease the brake-onset command (comfort); bypassed for hard braking. See BrakeOnsetSmoother.
output_accel = self.brake_smoother.apply(output_accel, self.last_output_accel, a_target)
self.last_output_accel = np.clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel

View File

@@ -70,6 +70,9 @@ def get_jerk_factor(personality=log.LongitudinalPersonality.standard):
raise NotImplementedError("Longitudinal personality not supported")
T_FOLLOW_MIN = 0.9 # safety floor for follow time after any sunnypilot tier offset
def get_T_FOLLOW(personality=log.LongitudinalPersonality.standard):
if personality==log.LongitudinalPersonality.relaxed:
return 1.75
@@ -267,8 +270,9 @@ class LongitudinalMpc:
for i in range(N):
self.solver.cost_set(i, 'Zl', Zl)
def set_weights(self, prev_accel_constraint=True, personality=log.LongitudinalPersonality.standard):
jerk_factor = get_jerk_factor(personality)
def set_weights(self, prev_accel_constraint=True, personality=log.LongitudinalPersonality.standard, jerk_factor_scale=1.0):
# jerk_factor_scale (sunnypilot Acceleration Personality): >1 smooths decel/accel, <1 crisps it.
jerk_factor = get_jerk_factor(personality) * jerk_factor_scale
a_change_cost = A_CHANGE_COST if prev_accel_constraint else 0
cost_weights = [X_EGO_OBSTACLE_COST, X_EGO_COST, V_EGO_COST, A_EGO_COST, jerk_factor * a_change_cost, jerk_factor * J_EGO_COST]
constraint_cost_weights = [LIMIT_COST, LIMIT_COST, LIMIT_COST, DANGER_ZONE_COST]
@@ -313,8 +317,10 @@ class LongitudinalMpc:
lead_xv = self.extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau)
return lead_xv
def update(self, radarstate, v_cruise, personality=log.LongitudinalPersonality.standard):
t_follow = get_T_FOLLOW(personality)
def update(self, radarstate, v_cruise, personality=log.LongitudinalPersonality.standard, t_follow_offset=0.0):
# t_follow_offset (sunnypilot Acceleration Personality): + = earlier/gentler decel (bigger gap).
# Clamped to a safe minimum so a tier offset can never produce an unsafe follow distance.
t_follow = max(T_FOLLOW_MIN, get_T_FOLLOW(personality) + t_follow_offset)
v_ego = self.x0[1]
self.status = radarstate.leadOne.status or radarstate.leadTwo.status

View File

@@ -110,7 +110,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_clip = [ACCEL_MIN, get_max_accel(v_ego)]
accel_clip = [ACCEL_MIN, self.accel.get_max_accel(v_ego)]
steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg
accel_clip = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_clip, self.CP)
@@ -136,9 +136,12 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
if force_slow_decel:
v_cruise = 0.0
self.mpc.set_weights(prev_accel_constraint, personality=sm['selfdriveState'].personality)
# Acceleration Personality decel shaping (ACC path only; stock when disabled or e2e/blended).
jerk_factor_scale, t_follow_offset = self.accel.decel_mod(self.is_e2e(sm))
self.mpc.set_weights(prev_accel_constraint, personality=sm['selfdriveState'].personality, jerk_factor_scale=jerk_factor_scale)
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
self.mpc.update(sm['radarState'], v_cruise, personality=sm['selfdriveState'].personality)
radarstate = self.smooth_radarstate(sm['radarState'])
self.mpc.update(radarstate, v_cruise, personality=sm['selfdriveState'].personality, t_follow_offset=t_follow_offset)
self.v_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
@@ -169,8 +172,10 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
output_a_target = output_a_target_mpc
self.output_should_stop = output_should_stop_mpc
for idx in range(2):
accel_clip[idx] = np.clip(accel_clip[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
# Lower (braking) bound and the ceiling's downward slew stay at the stock rate; only the ceiling's
# upward slew is tier-dependent (Acceleration Personality).
accel_clip[0] = np.clip(accel_clip[0], self.prev_accel_clip[0] - 0.05, self.prev_accel_clip[0] + 0.05)
accel_clip[1] = np.clip(accel_clip[1], self.prev_accel_clip[1] - 0.05, self.prev_accel_clip[1] + self.accel.get_rise_rate())
self.output_a_target = np.clip(output_a_target, accel_clip[0], accel_clip[1])
self.prev_accel_clip = accel_clip

View File

@@ -321,9 +321,16 @@ class SelfdriveD(CruiseHelper):
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
mdv2sp = self.sm['modelDataV2SP']
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
(CS.rightBlindspot and direction == LaneChangeDirection.right):
self.events.add(EventName.laneChangeBlocked)
elif (mdv2sp.leftLaneChangeEdgeBlock and direction == LaneChangeDirection.left) or \
(mdv2sp.rightLaneChangeEdgeBlock and direction == LaneChangeDirection.right):
self.events_sp.add(custom.OnroadEventSP.EventName.laneChangeRoadEdge)
else:
if direction == LaneChangeDirection.left:
self.events.add(EventName.preLaneChangeLeft)

View File

@@ -35,6 +35,13 @@ DESCRIPTIONS = {
'RecordFront': tr_noop("Upload data from the driver facing camera and help improve the driver monitoring algorithm."),
"IsMetric": tr_noop("Display speed in km/h instead of mph."),
"RecordAudio": tr_noop("Record and store microphone audio while driving. The audio will be included in the dashcam video in comma connect."),
"AccelPersonalityEnabled": tr_noop(
"Enable the Acceleration Profile selector below. When off, sunnypilot uses its default (Normal) acceleration behavior."
),
"AccelPersonality": tr_noop(
"Sets how briskly sunnypilot accelerates. Eco is gentle with a soft launch, Normal matches the default behavior, " +
"and Sport accelerates more briskly. Braking behavior is unaffected."
),
}
@@ -64,6 +71,12 @@ class TogglesLayout(Widget):
"disengage_on_accelerator.png",
False,
),
"AccelPersonalityEnabled": (
lambda: tr("Enable Acceleration Profile"),
DESCRIPTIONS["AccelPersonalityEnabled"],
"speed_limit.png",
False,
),
"IsLdwEnabled": (
lambda: tr("Enable Lane Departure Warnings"),
DESCRIPTIONS["IsLdwEnabled"],
@@ -106,6 +119,16 @@ class TogglesLayout(Widget):
icon="speed_limit.png"
)
self._accel_personality_setting = multiple_button_item(
lambda: tr("Acceleration Profile"),
lambda: tr(DESCRIPTIONS["AccelPersonality"]),
buttons=[lambda: tr("Eco"), lambda: tr("Normal"), lambda: tr("Sport")],
button_width=300,
callback=self._set_accel_personality,
selected_index=self._params.get("AccelPersonality", return_default=True),
icon="speed_limit.png"
)
self._toggles = {}
self._locked_toggles = set()
for param, (title, desc, icon, needs_restart) in self._toggle_defs.items():
@@ -139,6 +162,10 @@ class TogglesLayout(Widget):
if param == "DisengageOnAccelerator":
self._toggles["LongitudinalPersonality"] = self._long_personality_setting
# insert acceleration profile selector after its enable toggle
if param == "AccelPersonalityEnabled":
self._toggles["AccelPersonality"] = self._accel_personality_setting
self._update_experimental_mode_icon()
self._scroller = Scroller(list(self._toggles.values()), line_separator=True, spacing=0)
@@ -176,11 +203,13 @@ class TogglesLayout(Widget):
self._toggles["ExperimentalMode"].action_item.set_enabled(True)
self._toggles["ExperimentalMode"].set_description(e2e_description)
self._long_personality_setting.action_item.set_enabled(True)
self._accel_personality_setting.action_item.set_enabled(True)
else:
# no long for now
self._toggles["ExperimentalMode"].action_item.set_enabled(False)
self._toggles["ExperimentalMode"].action_item.set_state(False)
self._long_personality_setting.action_item.set_enabled(False)
self._accel_personality_setting.action_item.set_enabled(False)
self._params.remove("ExperimentalMode")
unavailable = tr("Experimental mode is currently unavailable on this car since the car's stock ACC is used for longitudinal control.")
@@ -247,3 +276,6 @@ class TogglesLayout(Widget):
def _set_longitudinal_personality(self, button_index: int):
self._params.put("LongitudinalPersonality", button_index, block=True)
def _set_accel_personality(self, button_index: int):
self._params.put("AccelPersonality", button_index, block=True)

View File

@@ -13,6 +13,7 @@ from openpilot.system.ui.lib.application import gui_app
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.settings import SettingsLayoutSP as SettingsLayout
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad import OnroadViewContainerSP as AugmentedRoadView
ONROAD_DELAY = 2.5 # seconds
@@ -118,13 +119,15 @@ class MiciMainLayout(Scroller):
# FIXME: these two pops can interrupt user interacting in the settings
if self._onroad_time_delay is not None and rl.get_time() - self._onroad_time_delay >= ONROAD_DELAY:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._onroad_time_delay = None
# When car leaves standstill, pop nav stack and scroll to onroad
CS = ui_state.sm["carState"]
if not CS.standstill and self._prev_standstill:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._prev_standstill = CS.standstill
def _on_interactive_timeout(self):

View File

@@ -51,11 +51,17 @@ class LaneChangeSettingsLayout(Widget):
description=lambda: tr("Toggle to enable a delay timer for seamless lane changes when blind spot monitoring " +
"(BSM) detects a obstructing vehicle, ensuring safe maneuvering."),
)
self._road_edge_block = toggle_item_sp(
param="RoadEdgeLaneChangeEnabled",
title=lambda: tr("Block Lane Change: Road Edge Detection"),
description=lambda: tr("Blocks the lane change if the model sees a road edge on your signaled side."),
)
items = [
self._lane_change_timer,
LineSeparatorSP(40),
self._bsm_delay,
self._road_edge_block,
]
return items

View File

@@ -0,0 +1,13 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
class MiciMainLayoutSP(MiciMainLayout):
def _should_auto_scroll_to_onroad(self) -> bool:
return not self._onroad_layout.is_on_info_panel()

View File

@@ -0,0 +1,63 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import gui_app
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroller_sp import ScrollerSP
from openpilot.selfdrive.ui.sunnypilot.mici.onroad.augmented_road_view import AugmentedRoadViewSP
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad_info_panel import OnroadInfoPanel
CONFIDENCE_BALL_VISIBLE_RATIO = 0.4
HORIZONTAL_SETTLE_PX = 5
HORIZONTAL_RESET_RATIO = 0.5
class OnroadViewContainerSP(ScrollerSP):
def __init__(self, bookmark_callback=None):
super().__init__(horizontal=False, snap_items=True, spacing=0, pad=0, scroll_indicator=False, edge_shadows=False)
self.road_view = AugmentedRoadViewSP(bookmark_callback=bookmark_callback)
self.onroad_info_panel = OnroadInfoPanel(bookmark_callback=bookmark_callback)
self._scroller.add_widgets([
self.road_view,
self.onroad_info_panel,
])
self._scroller.set_reset_scroll_at_show(False)
self._scroller.set_scrolling_enabled(lambda: abs(self.rect.x) < HORIZONTAL_SETTLE_PX)
for child in (self.road_view, self.onroad_info_panel):
inner_touch_valid = child._touch_valid_callback
child.set_touch_valid_callback(
lambda inner=inner_touch_valid: self._touch_valid() and (inner() if inner else True)
)
def set_rect(self, rect: rl.Rectangle):
super().set_rect(rect)
self.road_view.set_rect(rect)
self.onroad_info_panel.set_rect(rect)
return self
def is_swiping_left(self) -> bool:
return self.road_view.is_swiping_left() or self.onroad_info_panel.is_swiping_left()
def set_click_callback(self, callback) -> None:
self.road_view.set_click_callback(callback)
self.onroad_info_panel.set_click_callback(callback)
def is_on_info_panel(self) -> bool:
"""True when scrolled past halfway toward onroad_info_panel (used by main layout
to skip auto-pop-back-to-camera while user is reading the info panel)."""
return abs(self._scroller.scroll_panel.get_offset()) > self._rect.height / 2
def _render(self, rect: rl.Rectangle):
if abs(self.rect.x) > gui_app.width * HORIZONTAL_RESET_RATIO:
self._scroller.scroll_panel.set_offset(0)
vertical_offset = self._scroller.scroll_panel.get_offset()
show_ball = abs(vertical_offset) < rect.height * CONFIDENCE_BALL_VISIBLE_RATIO
self.road_view.set_show_confidence_ball(show_ball)
super()._render(rect)

View File

@@ -0,0 +1,324 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from dataclasses import dataclass
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.application import MousePos
from openpilot.system.ui.widgets import Widget
from openpilot.selfdrive.ui.mici.onroad.alert_renderer import AlertRenderer
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import BookmarkIcon
METER_TO_KM = 0.001
METER_TO_MILE = 0.000621371
@dataclass(frozen=True)
class OnroadInfoPanelColors:
white: rl.Color = rl.WHITE
black: rl.Color = rl.BLACK
red: rl.Color = rl.Color(255, 0, 0, 255)
green: rl.Color = rl.Color(0, 255, 0, 255)
grey: rl.Color = rl.Color(190, 195, 190, 255)
light_grey: rl.Color = rl.Color(200, 200, 200, 255)
dark_grey: rl.Color = rl.Color(100, 100, 100, 255)
bg_dark: rl.Color = rl.Color(0, 0, 0, 255)
card_bg: rl.Color = rl.Color(50, 50, 50, 200)
badge_bg: rl.Color = rl.Color(60, 60, 60, 255)
COLORS = OnroadInfoPanelColors()
class OnroadInfoPanel(Widget):
def __init__(self, bookmark_callback=None):
super().__init__()
self.speed_limit: float = 0.0
self.speed_limit_valid: bool = False
self.speed_limit_offset: float = 0.0
self.next_speed_limit: float = 0.0
self.next_speed_limit_distance: float = 0.0
self.road_name: str = ""
self.current_speed: float = 0.0
self.set_speed: float = 0.0
self.cruise_enabled: bool = False
self._sign_slide: float = 0.0
self._font_bold: rl.Font = gui_app.font(FontWeight.BOLD)
self._font_semi_bold: rl.Font = gui_app.font(FontWeight.SEMI_BOLD)
self._font_medium: rl.Font = gui_app.font(FontWeight.MEDIUM)
self._marquee_offset: float = 0.0
self._marquee_direction: int = 1
self._marquee_pause_timer: float = 0.0
self._marquee_speed: float = 40.0
self._marquee_pause_duration: float = 1.5
self._alert_renderer = AlertRenderer()
self._alert_alpha_filter = FirstOrderFilter(0, 0.05, 1 / gui_app.target_fps)
self._bookmark_icon = BookmarkIcon(bookmark_callback)
def is_swiping_left(self) -> bool:
return self._bookmark_icon.is_swiping_left()
def _handle_mouse_release(self, mouse_pos: MousePos) -> None:
# Mirror stock AugmentedRoadView: suppress click while bookmark gesture active
if not self._bookmark_icon.interacting():
super()._handle_mouse_release(mouse_pos)
def _update_state(self) -> None:
sm = ui_state.sm
speed_conv = CV.MS_TO_KPH if ui_state.is_metric else CV.MS_TO_MPH
if sm.valid["longitudinalPlanSP"]:
lp_sp = sm["longitudinalPlanSP"]
resolver = lp_sp.speedLimit.resolver
self.speed_limit = resolver.speedLimit * speed_conv
self.speed_limit_valid = resolver.speedLimitValid
self.speed_limit_offset = resolver.speedLimitOffset * speed_conv
if sm.valid["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
self.next_speed_limit = lmd.speedLimitAhead * speed_conv
self.next_speed_limit_distance = lmd.speedLimitAheadDistance
self.road_name = lmd.roadName
if sm.updated["carState"]:
self.current_speed = sm["carState"].vEgo * speed_conv
if sm.valid["carState"] and sm.valid["controlsState"]:
self.cruise_enabled = sm["carState"].cruiseState.enabled
v_cruise_cluster = sm["carState"].vCruiseCluster
set_speed_kph = sm["controlsState"].vCruiseDEPRECATED if v_cruise_cluster == 0.0 else v_cruise_cluster
self.set_speed = set_speed_kph * (METER_TO_MILE / METER_TO_KM) if not ui_state.is_metric else set_speed_kph
def _render(self, rect: rl.Rectangle) -> None:
self._update_state()
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), COLORS.bg_dark)
margin = 20
mid_y = rect.y + rect.height / 2
left_x = rect.x + margin
if self.cruise_enabled:
unit = tr("MAX")
display_speed = self.set_speed
else:
unit = tr("km/h") if ui_state.is_metric else tr("MPH")
display_speed = self.current_speed
speed_val = str(round(display_speed))
if self.speed_limit_valid and display_speed > self.speed_limit:
speed_color = COLORS.red
else:
speed_color = COLORS.white
rl.draw_text_ex(self._font_semi_bold, unit, rl.Vector2(left_x, mid_y - 95), 38, 0, COLORS.grey)
rl.draw_text_ex(self._font_bold, speed_val, rl.Vector2(left_x, mid_y - 60), 110, 0, speed_color)
sign_width = 135
sign_height = 135 if ui_state.is_metric else 175
has_next = self.next_speed_limit > 0 and self.next_speed_limit != self.speed_limit
target_slide = 1.0 if has_next else 0.0
slide_speed = 3.0 * rl.get_frame_time()
if self._sign_slide < target_slide:
self._sign_slide = min(self._sign_slide + slide_speed, target_slide)
elif self._sign_slide > target_slide:
self._sign_slide = max(self._sign_slide - slide_speed, target_slide)
next_w = int(sign_width * 0.7)
next_h = int(sign_height * 0.7)
next_peek = int(next_w * 0.85) + 5
centered_x = rect.x + rect.width - sign_width - margin
shifted_x = rect.x + rect.width - sign_width - margin - next_peek
sign_x = centered_x + (shifted_x - centered_x) * self._sign_slide
sign_y = rect.y + (rect.height - sign_height) / 2
road_y = mid_y + 55
road_width = sign_x - left_x - margin
self._draw_road_name(left_x, road_y, road_width)
if has_next and self._sign_slide > 0.01:
next_val = str(round(self.next_speed_limit))
dist_str = self._format_distance(self.next_speed_limit_distance)
next_x = sign_x + sign_width - int(next_w * 0.15)
next_y = sign_y + (sign_height - next_h) / 2
next_speed_color = COLORS.black
if ui_state.is_metric:
self._draw_vienna_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
else:
self._draw_mutcd_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
dist_size = measure_text_cached(self._font_medium, dist_str, 24)
rl.draw_text_ex(self._font_medium, dist_str, rl.Vector2(next_x + next_w / 2 - dist_size.x / 2, next_y + next_h + 4), 24, 0, COLORS.grey)
self._draw_speed_limit_sign(sign_x, sign_y, sign_width, sign_height)
if self.speed_limit_offset != 0 and self.speed_limit_valid:
offset_val = str(abs(round(self.speed_limit_offset)))
badge_sz = 42
badge_x = sign_x + sign_width - badge_sz * 0.85
badge_y = sign_y - badge_sz * 0.25
if ui_state.is_metric:
badge_r = badge_sz / 2
badge_cx = badge_x + badge_r
badge_cy = badge_y + badge_r
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r + 2, COLORS.dark_grey)
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r, COLORS.badge_bg)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(badge_cx, badge_cy), COLORS.white)
else:
mutcd_badge_x = sign_x + sign_width - badge_sz * 0.65
mutcd_badge_y = sign_y - badge_sz * 0.50
badge_rect = rl.Rectangle(mutcd_badge_x, mutcd_badge_y, badge_sz, badge_sz)
rl.draw_rectangle_rounded(badge_rect, 0.25, 10, COLORS.badge_bg)
rl.draw_rectangle_rounded_lines_ex(badge_rect, 0.25, 10, 2, COLORS.dark_grey)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(mutcd_badge_x + badge_sz / 2, mutcd_badge_y + badge_sz / 2), COLORS.white)
# SCC
speed_size = measure_text_cached(self._font_bold, speed_val, 110)
scc_x = left_x + speed_size.x + 30
scc_y = mid_y - 50
self._draw_scc_icons(scc_x, scc_y)
self._bookmark_icon.render(rect)
if ui_state.started:
alert_obj, no_alert = self._alert_renderer.will_render()
self._alert_alpha_filter.update(0 if no_alert else 1)
alpha = self._alert_alpha_filter.x
if alpha > 0.01:
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), rl.Color(0, 0, 0, int(150 * alpha)))
self._alert_renderer.render(rect)
def _draw_scc_icons(self, x: float, y: float) -> None:
sm = ui_state.sm
if not sm.valid["longitudinalPlanSP"]:
return
scc = sm["longitudinalPlanSP"].smartCruiseControl
box_w, box_h = 100, 36
gap = 6
drawn = 0
for label, active in [("SCC-V", scc.vision.active), ("SCC-M", scc.map.active)]:
if not active:
continue
bx = x
by = y + drawn * (box_h + gap)
rl.draw_rectangle_rounded(rl.Rectangle(bx, by, box_w, box_h), 0.3, 10, COLORS.green)
self._draw_text_centered(self._font_bold, label, 20, rl.Vector2(bx + box_w / 2, by + box_h / 2), COLORS.black)
drawn += 1
def _draw_speed_limit_sign(self, x: float, y: float, sign_width: float, sign_height: float) -> None:
speed_str = str(round(self.speed_limit)) if self.speed_limit_valid and self.speed_limit > 0 else "--"
speed_color = COLORS.black if not self.speed_limit_valid or self.current_speed <= self.speed_limit else COLORS.red
if ui_state.is_metric:
self._draw_vienna_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
else:
self._draw_mutcd_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
def _draw_road_name(self, x: float, y: float, width: float) -> None:
road_display = self.road_name if self.road_name else "--"
font_size = 30
road_size = measure_text_cached(self._font_semi_bold, road_display, font_size)
text_width = road_size.x
if text_width <= width:
self._marquee_offset = 0.0
self._marquee_direction = 1
self._marquee_pause_timer = 0.0
rl.draw_text_ex(self._font_semi_bold, road_display, rl.Vector2(x, y), font_size, 0, COLORS.white)
else:
overflow = text_width - width
dt = rl.get_frame_time()
if self._marquee_pause_timer > 0:
self._marquee_pause_timer -= dt
else:
self._marquee_offset += self._marquee_direction * self._marquee_speed * dt
if self._marquee_offset >= overflow:
self._marquee_offset = overflow
self._marquee_direction = -1
self._marquee_pause_timer = self._marquee_pause_duration
elif self._marquee_offset <= 0:
self._marquee_offset = 0
self._marquee_direction = 1
self._marquee_pause_timer = self._marquee_pause_duration
rl.begin_scissor_mode(int(x), int(y), int(width), int(road_size.y + 4))
text_pos = rl.Vector2(x - self._marquee_offset, y)
rl.draw_text_ex(self._font_semi_bold, road_display, text_pos, font_size, 0, COLORS.white)
rl.end_scissor_mode()
def _draw_vienna_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
center = rl.Vector2(x + width / 2, y + height / 2)
outer_radius = min(width, height) / 2
rl.draw_circle_v(center, outer_radius, COLORS.white)
ring_width = outer_radius * 0.18
rl.draw_ring(center, outer_radius - ring_width, outer_radius, 0, 360, 36, COLORS.red)
font_size = outer_radius * (0.7 if len(speed_str) >= 3 else 0.9)
text_size = measure_text_cached(self._font_bold, speed_str, int(font_size))
text_pos = rl.Vector2(center.x - text_size.x / 2, center.y - text_size.y / 2)
rl.draw_text_ex(self._font_bold, speed_str, text_pos, font_size, 0, speed_color)
def _draw_mutcd_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
sign_rect = rl.Rectangle(x, y, width, height)
rl.draw_rectangle_rounded(sign_rect, 0.35, 10, COLORS.white)
inset = max(4, width * 0.05)
inner_rect = rl.Rectangle(x + inset, y + inset, width - inset * 2, height - inset * 2)
outer_radius = 0.35 * width / 2.0
inner_radius = outer_radius - inset
inner_roundness = inner_radius / (inner_rect.width / 2.0)
rl.draw_rectangle_rounded_lines_ex(inner_rect, inner_roundness, 10, 3, COLORS.black)
mid_x = x + width / 2
label_size = max(18, int(width * 0.26))
if is_upcoming:
self._draw_text_centered(self._font_bold, tr("AHEAD"), label_size, rl.Vector2(mid_x, y + height * 0.27), COLORS.black)
else:
self._draw_text_centered(self._font_bold, tr("SPEED"), label_size, rl.Vector2(mid_x, y + height * 0.20), COLORS.black)
self._draw_text_centered(self._font_bold, tr("LIMIT"), label_size, rl.Vector2(mid_x, y + height * 0.40), COLORS.black)
speed_font_size = int(width * 0.52) if len(speed_str) >= 3 else int(width * 0.62)
self._draw_text_centered(self._font_bold, speed_str, speed_font_size, rl.Vector2(mid_x, y + height * 0.72), speed_color)
def _draw_text_centered(self, font, text, size, pos_center, color):
sz = measure_text_cached(font, text, size)
rl.draw_text_ex(font, text, rl.Vector2(pos_center.x - sz.x / 2, pos_center.y - sz.y / 2), size, 0, color)
def _format_distance(self, distance: float) -> str:
if ui_state.is_metric:
if distance < 50:
return tr("Near")
if distance >= 1000:
return f"{distance * METER_TO_KM:.1f}" + tr("km")
if distance < 200:
rounded = max(10, int(distance / 10) * 10)
else:
rounded = int(distance / 100) * 100
return str(rounded) + tr("m")
else:
distance_mi = distance * METER_TO_MILE
if distance_mi < 0.1:
return tr("Near")
return f"{distance_mi:.1f}" + tr("mi")

View File

@@ -0,0 +1,30 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import AugmentedRoadView
class _SuppressedConfidenceBall:
def render(self, *_):
pass
class AugmentedRoadViewSP(AugmentedRoadView):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._show_confidence_ball: bool = True
self._real_confidence_ball = self._confidence_ball
self._confidence_ball = _SuppressedConfidenceBall()
def set_show_confidence_ball(self, show: bool) -> None:
self._show_confidence_ball = show
def _render(self, rect: rl.Rectangle) -> None:
super()._render(rect)
if self._show_confidence_ball:
self._real_confidence_ball.render(self.rect)

View File

@@ -0,0 +1,34 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import MouseEvent
from openpilot.system.ui.lib.scroll_panel2 import GuiScrollPanel2, ScrollState
class GuiScrollPanel2SP(GuiScrollPanel2):
"""Reject orthogonal-dominant drags so nested scrollers (outer horizontal +
inner vertical) don't both engage on a slightly diagonal swipe.
Implemented as a post-super state rollback rather than reimplementing the
PRESSED state machine — keeps stock behaviour authoritative."""
def _handle_mouse_event(self, mouse_event: MouseEvent, bounds: rl.Rectangle, bounds_size: float,
content_size: float) -> None:
pre_state = self._state
super()._handle_mouse_event(mouse_event, bounds, bounds_size, content_size)
if self._state == ScrollState.MANUAL_SCROLL and pre_state == ScrollState.PRESSED and \
self._initial_click_event is not None:
diff_x = abs(mouse_event.pos.x - self._initial_click_event.pos.x)
diff_y = abs(mouse_event.pos.y - self._initial_click_event.pos.y)
along = diff_x if self._horizontal else diff_y
anti = diff_y if self._horizontal else diff_x
if anti > along:
self._state = ScrollState.STEADY
self._velocity = 0.0
self._velocity_buffer.clear()

View File

@@ -0,0 +1,16 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.system.ui.widgets.scroller import Scroller
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroll_panel_sp import GuiScrollPanel2SP
class ScrollerSP(Scroller):
def __init__(self, **kwargs):
super().__init__(**kwargs)
inner = self._scroller
inner.scroll_panel = GuiScrollPanel2SP(inner._horizontal, handle_out_of_bounds=not inner._snap_items)

View File

@@ -10,6 +10,9 @@ from openpilot.selfdrive.ui.layouts.main import MainLayout
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
from openpilot.selfdrive.ui.ui_state import ui_state
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.main import MiciMainLayoutSP as MiciMainLayout
BIG_UI = gui_app.big_ui()

View File

@@ -40,6 +40,7 @@ from openpilot.sunnypilot.modeld_v2.camera_offset_helper import CameraOffsetHelp
from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
PROCESS_NAME = "selfdrive.modeld.modeld_tinygrad"
@@ -329,6 +330,7 @@ def main(demo=False):
prev_action = log.ModelDataV2.Action()
DH = DesireHelper()
RELC = RoadEdgeLaneChangeController(DH)
meta_constants = load_meta_constants()
while True:
@@ -433,7 +435,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RELC.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs, v_ego)
mdv2sp_send.modelDataV2SP.leftLaneChangeEdgeBlock = RELC.left_edge_detected
mdv2sp_send.modelDataV2SP.rightLaneChangeEdgeBlock = RELC.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RELC.left_edge_detected, RELC.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
mdv2sp_send.modelDataV2SP.laneTurnDirection = DH.lane_turn_direction

View File

@@ -0,0 +1,81 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import numpy as np
from cereal import messaging
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot import get_sanitize_int_param
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
NORMAL, PERSONALITY_MIN, PERSONALITY_MAX, A_CRUISE_MAX_BP, A_CRUISE_MAX_V, RISE_RATE, T_FOLLOW_OFFSET, JERK_FACTOR_SCALE
class AccelController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
self._CP = CP
self._mpc = mpc
self._params = params or Params()
self._frame = 0
self._enabled: bool = self._params.get_bool("AccelPersonalityEnabled")
self._personality = NORMAL # cereal AccelerationPersonality ordinal
self._v_ego = 0.0
self._jerk_scale = 1.0
self._t_follow_offset = 0.0
self._read_params()
def _read_params(self) -> None:
self._enabled = self._params.get_bool("AccelPersonalityEnabled")
# When disabled, resolve to normal so the disabled path and the normal tier share one numeric path
# that is identical to stock openpilot.
if not self._enabled:
self._personality = NORMAL
return
self._personality = get_sanitize_int_param("AccelPersonality", PERSONALITY_MIN, PERSONALITY_MAX, self._params)
def update(self, sm: messaging.SubMaster) -> None:
if self._frame % int(1. / DT_MDL) == 0:
self._read_params()
self._v_ego = sm['carState'].vEgo
self._frame += 1
def get_max_accel(self, v_ego: float) -> float:
return float(np.interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_V[self._personality]))
def get_rise_rate(self) -> float:
return RISE_RATE[self._personality]
def decel_mod(self, is_e2e: bool) -> tuple[float, float]:
# Decel-side comfort shaping applies only to the comfort MPC (ACC) path. When disabled or when the
# plan is e2e/blended, fall back to stock (no offset/scale) so e2e and blended braking are untouched.
# The MPC's ACCEL_MIN floor and FCW crash detection do not depend on these comfort weights, so
# emergency braking authority is preserved regardless of tier.
if not self._enabled or is_e2e:
self._jerk_scale = 1.0
self._t_follow_offset = 0.0
else:
self._jerk_scale = JERK_FACTOR_SCALE[self._personality]
self._t_follow_offset = T_FOLLOW_OFFSET[self._personality]
return self._jerk_scale, self._t_follow_offset
def jerk_factor_scale(self) -> float:
return self._jerk_scale
def t_follow_offset(self) -> float:
return self._t_follow_offset
def enabled(self) -> bool:
return self._enabled
def personality(self):
return self._personality # cereal AccelerationPersonality ordinal
def max_accel(self) -> float:
# Cached value for publishing; publish_longitudinal_plan_sp has no v_ego in scope.
return self.get_max_accel(self._v_ego)

View File

@@ -0,0 +1,60 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from cereal import custom
# Single source of truth for the tiers: the cereal enum (eco @0, normal @1, sport @2).
AccelerationPersonality = custom.LongitudinalPlanSP.AccelerationPersonality
ECO = AccelerationPersonality.eco
NORMAL = AccelerationPersonality.normal
SPORT = AccelerationPersonality.sport
# Valid range for the AccelPersonality int param (used for clamping).
PERSONALITY_MIN = min(AccelerationPersonality.schema.enumerants.values())
PERSONALITY_MAX = max(AccelerationPersonality.schema.enumerants.values())
# Speed-dependent max acceleration ceiling (m/s^2), interpolated over A_CRUISE_MAX_BP (m/s).
A_CRUISE_MAX_BP = [0., 10., 25., 40.]
# Stock openpilot values (selfdrive/controls/lib/longitudinal_planner.py). Single source of truth so the
# normal tier (and the disabled path, which resolves to normal) is identical to stock by construction.
STOCK_A_CRUISE_MAX_V = [1.6, 1.2, 0.8, 0.6]
STOCK_RISE_RATE = 0.05 # m/s^2 per planner cycle (DT_MDL=0.05s -> 1.0 m/s^2/s); stock accel_clip upper-bound slew
# Eco launch (v<=10) is kept ~stock so departing a stop/green light is not sluggish (no getting honked at);
# the eco character is in the cruise/highway roll-on (>=25 m/s), not off-the-line.
A_CRUISE_MAX_V = {
ECO: [1.6, 1.10, 0.45, 0.30],
NORMAL: STOCK_A_CRUISE_MAX_V,
SPORT: [1.8, 1.40, 1.00, 0.75],
}
# How fast the max-accel ceiling is allowed to rise, in m/s^2 per planner cycle (only the upward slew;
# the downward/braking slew stays at the stock rate for safety).
RISE_RATE = {
ECO: 0.025,
NORMAL: STOCK_RISE_RATE,
SPORT: 0.10,
}
# --- Decel-side comfort, applied to the comfort MPC (non-e2e ACC) path only ---
# e2e/blended keep stock braking, and the MPC's ACCEL_MIN floor + FCW crash detection are unaffected
# by these comfort weights (they only shape gentle decel onset/smoothness when following/cruising).
# Offset (s) added to the Driving Personality's T_FOLLOW: + = earlier and gentler decel (bigger gap).
T_FOLLOW_OFFSET = {
ECO: 0.15,
NORMAL: 0.0,
SPORT: -0.10,
}
# Multiplies the Driving Personality's jerk factor: >1 = smoother decel/accel (more jerk cost), <1 = crisper.
JERK_FACTOR_SCALE = {
ECO: 1.2,
NORMAL: 1.0,
SPORT: 0.85,
}

View File

@@ -0,0 +1,130 @@
import numpy as np
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
AccelerationPersonality, ECO, NORMAL, SPORT, STOCK_RISE_RATE
# Stock openpilot accel ceiling, duplicated independently here so the test fails if the normal tier ever drifts.
STOCK_BP = [0., 10., 25., 40.]
STOCK_V = [1.6, 1.2, 0.8, 0.6]
V_EGO_GRID = [0.0, 5.0, 10.0, 17.0, 25.0, 33.0, 40.0, 50.0]
class MockParams:
def __init__(self, enabled=False, personality=NORMAL):
self._vals = {"AccelPersonalityEnabled": enabled, "AccelPersonality": int(personality)}
def get_bool(self, key):
return bool(self._vals.get(key, False))
def get(self, key, return_default=False):
return self._vals.get(key, 0)
def put(self, key, val, block=False):
self._vals[key] = int(val)
class MockCarState:
def __init__(self, vEgo=0.0):
self.vEgo = vEgo
def make_sm(v_ego=0.0):
return {'carState': MockCarState(vEgo=v_ego)}
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def stock_max_accel(v_ego):
return float(np.interp(v_ego, STOCK_BP, STOCK_V))
def test_disabled_matches_stock(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=False, personality=SPORT))
for v in V_EGO_GRID:
assert c.get_max_accel(v) == pytest.approx(stock_max_accel(v))
assert c.get_rise_rate() == STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.normal
assert not c.enabled()
def test_normal_matches_stock(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
for v in V_EGO_GRID:
assert c.get_max_accel(v) == pytest.approx(stock_max_accel(v))
assert c.get_rise_rate() == STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.normal
def test_eco_is_gentler(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
for v in V_EGO_GRID:
assert c.get_max_accel(v) <= stock_max_accel(v) + 1e-6
assert c.get_rise_rate() < STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.eco
def test_sport_is_brisker(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=SPORT))
for v in V_EGO_GRID:
assert c.get_max_accel(v) >= stock_max_accel(v) - 1e-6
assert c.get_rise_rate() > STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.sport
def test_param_clamp(mock_cp, mock_mpc):
# Out-of-range int must clamp to the max tier (sport), not raise.
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=9))
assert c.personality() == AccelerationPersonality.sport
def test_decel_mod_disabled_is_stock(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=False, personality=SPORT))
assert c.decel_mod(is_e2e=False) == (1.0, 0.0)
assert c.jerk_factor_scale() == 1.0
assert c.t_follow_offset() == 0.0
def test_decel_mod_e2e_bypass_is_stock(mock_cp, mock_mpc):
# Even enabled + sport, an e2e/blended plan must keep stock braking.
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=SPORT))
assert c.decel_mod(is_e2e=True) == (1.0, 0.0)
def test_decel_mod_eco_gentler(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
jerk_scale, t_follow_offset = c.decel_mod(is_e2e=False)
assert jerk_scale > 1.0 # smoother decel jerk
assert t_follow_offset > 0.0 # earlier / gentler decel (bigger gap)
def test_decel_mod_sport_crisper(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=SPORT))
jerk_scale, t_follow_offset = c.decel_mod(is_e2e=False)
assert jerk_scale < 1.0 # crisper decel jerk
assert t_follow_offset < 0.0 # firmer / later decel
def test_frame_gated_read(mock_cp, mock_mpc):
params = MockParams(enabled=True, personality=NORMAL)
c = AccelController(mock_cp, mock_mpc, params=params)
c.update(make_sm()) # frame 0 -> reads, picks up normal
params._vals["AccelPersonality"] = int(SPORT)
for _ in range(19): # frames 1..19, none on the 20-frame gate boundary
c.update(make_sm())
assert c.personality() == AccelerationPersonality.normal
c.update(make_sm()) # frame 20 -> reads, picks up sport
assert c.personality() == AccelerationPersonality.sport

View File

@@ -0,0 +1,40 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL
_PARAM_REFRESH_FRAMES = max(1, int(1.0 / DT_CTRL))
# Cap how fast the longitudinal command may DEEPEN a brake at onset, so braking eases in gradually
# instead of biting sharply (the planner aTarget is already smooth; the PID proportional term reacting
# to the aEgo lag at onset is what makes the command jerk). Comfort only:
# - applied solely while the brake command is getting more negative (onset / deepening),
# - bypassed entirely when aTarget demands a real hard brake, so emergencies are never delayed.
BRAKE_ONSET_JERK = 2.5 # m/s^3, max rate the brake command may deepen at onset
HARD_BRAKE_THRESH = -1.8 # m/s^2, aTarget at/below this = real hard brake -> pass through unmodified
class BrakeOnsetSmoother:
def __init__(self, params=None):
self._params = params or Params()
self._enabled = self._params.get_bool("SmoothBraking")
self._frame = 0
def apply(self, output_accel: float, last_output_accel: float, a_target: float) -> float:
if self._frame % _PARAM_REFRESH_FRAMES == 0:
self._enabled = self._params.get_bool("SmoothBraking")
self._frame += 1
if not self._enabled:
return output_accel
# Only ease the onset of a comfort brake: command deepening AND the plan isn't asking for a hard brake.
if output_accel < last_output_accel and a_target > HARD_BRAKE_THRESH:
return max(output_accel, last_output_accel - BRAKE_ONSET_JERK * DT_CTRL)
return output_accel

View File

@@ -1,17 +1,46 @@
from openpilot.common.realtime import DT_MDL
class WMACConstants:
# Lead detection parameters
LEAD_WINDOW_SIZE = 6 # Stable detection window
LEAD_PROB = 0.45 # Balanced threshold for lead detection
TRAJECTORY_SIZE = 33
PARAM_READ_FRAMES = max(1, int(round(1.0 / DT_MDL)))
# Slow down detection parameters
SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable
SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios
EMERGENCY_HOLD_FRAMES = max(1, int(round(0.75 / DT_MDL)))
MIN_MODE_DURATION = {'acc': max(1, int(round(0.6 / DT_MDL))), 'blended': max(1, int(round(0.5 / DT_MDL)))}
ENTER_BLENDED_FRAMES = max(1, int(round(0.4 / DT_MDL)))
EXIT_BLENDED_FRAMES = max(1, int(round(0.35 / DT_MDL)))
STANDSTILL_FRAMES = max(1, int(round(0.2 / DT_MDL)))
# Optimized slow down distance curve - smooth and progressive
LEAD_PROB = 0.45
LEAD_EXIT_PROB = 0.25
LEAD_RISE_RATE = 1.0
LEAD_FALL_RATE = 0.35
RADAR_LEAD_ACC_PROB = 0.5
RADAR_LEAD_ACC_EXIT_PROB = 0.4
RADAR_LEAD_ACC_RISE_RATE = 1.0
RADAR_LEAD_ACC_FALL_RATE = 0.25
RADAR_LEAD_ACC_MAX_DREL = 80.0
RADAR_LEAD_ACC_MAX_TTC = 6.0
RADAR_LEAD_ACC_MIN_CLOSING_SPEED = -0.5
SLOW_DOWN_PROB = 0.5
SLOW_DOWN_EXIT_PROB = 0.4
SLOW_DOWN_RISE_RATE = 0.65
SLOW_DOWN_FALL_RATE = 0.15
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.]
URGENT_SLOW_DOWN_PROB = 0.85
# Slowness detection parameters
SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection
SLOWNESS_PROB = 0.55 # Clear threshold for slowness
SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset
MODEL_DECEL_START = -0.5
MODEL_DECEL_RANGE = 2.0
ENDPOINT_URGENCY_GAIN = 1.3
CRITICAL_ENDPOINT_FACTOR = 0.3
CRITICAL_URGENCY_GAIN = 1.5
SPEED_URGENCY_MIN = 25.0
SPEED_URGENCY_RANGE = 80.0
SLOWNESS_PROB = 0.55
SLOWNESS_EXIT_PROB = 0.45
SLOWNESS_RISE_RATE = 0.35
SLOWNESS_FALL_RATE = 0.5
SLOWNESS_CRUISE_OFFSET = 1.025

View File

@@ -6,129 +6,116 @@ See the LICENSE.md file in the root directory for more details.
"""
# Version = 2025-6-30
from cereal import messaging
from opendbc.car import structs
from numpy import interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
from typing import Literal
# d-e2e, from modeldata.h
TRAJECTORY_SIZE = 33
SET_MODE_TIMEOUT = 15
from cereal import messaging
from numpy import interp
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
# Define the valid mode types
ModeType = Literal['acc', 'blended']
class SmoothKalmanFilter:
"""Enhanced Kalman filter with smoothing for stable decision making."""
def clip01(value: float) -> float:
return max(0.0, min(1.0, float(value)))
def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01,
alpha=1.0, smoothing_factor=0.85):
self.x = initial_value
self.P = 1.0
self.R = measurement_noise
self.Q = process_noise
self.alpha = alpha
self.smoothing_factor = smoothing_factor
self.initialized = False
self.history = []
self.max_history = 10
self.confidence = 0.0
def add_data(self, measurement):
if len(self.history) >= self.max_history:
self.history.pop(0)
self.history.append(measurement)
class SmoothedSignal:
def __init__(self, rise_rate: float, fall_rate: float, initial_value: float = 0.0):
self.rise_rate = clip01(rise_rate)
self.fall_rate = clip01(fall_rate)
self.value = clip01(initial_value)
if not self.initialized:
self.x = measurement
self.initialized = True
self.confidence = 0.1
return
def update(self, measurement: float) -> float:
measurement = clip01(measurement)
rate = self.rise_rate if measurement > self.value else self.fall_rate
self.value += (measurement - self.value) * rate
return self.value
self.P = self.alpha * self.P + self.Q
def reset(self, value: float = 0.0) -> None:
self.value = clip01(value)
K = self.P / (self.P + self.R)
effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1
innovation = measurement - self.x
self.x = self.x + effective_K * innovation
self.P = (1 - effective_K) * self.P
class HysteresisSignal:
def __init__(self, enter_threshold: float, exit_threshold: float, rise_rate: float, fall_rate: float):
self.enter_threshold = clip01(enter_threshold)
self.exit_threshold = clip01(exit_threshold)
self.filter = SmoothedSignal(rise_rate, fall_rate)
self.active = False
if abs(innovation) < 0.1:
self.confidence = min(1.0, self.confidence + 0.05)
else:
self.confidence = max(0.1, self.confidence - 0.02)
def update(self, measurement: float) -> bool:
value = self.filter.update(measurement)
threshold = self.exit_threshold if self.active else self.enter_threshold
self.active = value > threshold
return self.active
def get_value(self):
return self.x if self.initialized else None
def reset(self) -> None:
self.filter.reset()
self.active = False
def get_confidence(self):
return self.confidence
def reset_data(self):
self.initialized = False
self.history = []
self.confidence = 0.0
@property
def value(self) -> float:
return self.filter.value
class ModeTransitionManager:
"""Manages smooth transitions between driving modes with hysteresis."""
def __init__(self):
self.current_mode: ModeType = 'acc'
self.mode_confidence = {'acc': 1.0, 'blended': 0.0}
self.transition_timeout = 0
self.min_mode_duration = 10
self.mode_duration = 0
self.emergency_override = False
self._pending_mode: ModeType = 'acc'
self._pending_count = 0
self._blended_hold_frames = 0
def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False):
# Emergency override for critical situations (stops, collisions)
if emergency:
self.emergency_override = True
self.current_mode = mode
self.transition_timeout = SET_MODE_TIMEOUT
self.mode_duration = 0
def request_mode(self, mode: ModeType, immediate: bool = False, hold_frames: int = 0, cancel_hold: bool = False) -> None:
if immediate:
self._blended_hold_frames = max(self._blended_hold_frames, hold_frames)
self._pending_mode = mode
self._pending_count = 0
self._switch_mode(mode)
return
self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence)
for m in self.mode_confidence:
if m != mode:
self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05)
if cancel_hold and mode == 'acc':
self._blended_hold_frames = 0
# Require minimum duration in current mode (unless emergency)
if self.mode_duration < self.min_mode_duration and not self.emergency_override:
if self._blended_hold_frames > 0:
mode = 'blended'
if mode == self.current_mode:
self._pending_mode = mode
self._pending_count = 0
return
# Hysteresis: higher threshold for mode changes
confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response
if mode != self._pending_mode:
self._pending_mode = mode
self._pending_count = 1
else:
self._pending_count += 1
if self.mode_confidence[mode] > confidence_threshold:
if mode != self.current_mode and self.transition_timeout == 0:
self.transition_timeout = SET_MODE_TIMEOUT
self.current_mode = mode
self.mode_duration = 0
if self.mode_duration < WMACConstants.MIN_MODE_DURATION[self.current_mode]:
return
def update(self):
if self.transition_timeout > 0:
self.transition_timeout -= 1
required_count = WMACConstants.ENTER_BLENDED_FRAMES if mode == 'blended' else WMACConstants.EXIT_BLENDED_FRAMES
if self._pending_count >= required_count:
self._switch_mode(mode)
def update(self) -> None:
if self._blended_hold_frames > 0:
self._blended_hold_frames -= 1
self.mode_duration += 1
# Reset emergency override after some time
if self.emergency_override and self.mode_duration > 20:
self.emergency_override = False
# Gradual confidence decay
for mode in self.mode_confidence:
self.mode_confidence[mode] *= 0.98
def get_mode(self) -> ModeType:
return self.current_mode
def _switch_mode(self, mode: ModeType) -> None:
if mode == self.current_mode:
return
self.current_mode = mode
self.mode_duration = 0
self._pending_mode = mode
self._pending_count = 0
class DynamicExperimentalController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
@@ -142,35 +129,33 @@ class DynamicExperimentalController:
self._mode_manager = ModeTransitionManager()
# Smooth filters for stable decision making with faster response for critical scenarios
self._lead_filter = SmoothKalmanFilter(
measurement_noise=0.15,
process_noise=0.05,
alpha=1.02,
smoothing_factor=0.8
self._lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.LEAD_PROB,
exit_threshold=WMACConstants.LEAD_EXIT_PROB,
rise_rate=WMACConstants.LEAD_RISE_RATE,
fall_rate=WMACConstants.LEAD_FALL_RATE,
)
self._radar_acc_lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.RADAR_LEAD_ACC_PROB,
exit_threshold=WMACConstants.RADAR_LEAD_ACC_EXIT_PROB,
rise_rate=WMACConstants.RADAR_LEAD_ACC_RISE_RATE,
fall_rate=WMACConstants.RADAR_LEAD_ACC_FALL_RATE,
)
self._slow_down_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOW_DOWN_PROB,
exit_threshold=WMACConstants.SLOW_DOWN_EXIT_PROB,
rise_rate=WMACConstants.SLOW_DOWN_RISE_RATE,
fall_rate=WMACConstants.SLOW_DOWN_FALL_RATE,
)
self._slowness_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOWNESS_PROB,
exit_threshold=WMACConstants.SLOWNESS_EXIT_PROB,
rise_rate=WMACConstants.SLOWNESS_RISE_RATE,
fall_rate=WMACConstants.SLOWNESS_FALL_RATE,
)
self._slow_down_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.1,
alpha=1.05,
smoothing_factor=0.7
)
self._slowness_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.06,
alpha=1.015,
smoothing_factor=0.92
)
self._mpc_fcw_filter = SmoothKalmanFilter(
measurement_noise=0.2,
process_noise=0.1,
alpha=1.1,
smoothing_factor=0.5
)
self._has_lead_filtered = False
self._has_radar_acc_lead = False
self._has_slow_down = False
self._has_slowness = False
self._has_mpc_fcw = False
@@ -179,13 +164,14 @@ class DynamicExperimentalController:
self._has_standstill = False
self._mpc_fcw_crash_cnt = 0
self._standstill_count = 0
# debug
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
self._raw_urgency = 0.0
def _read_params(self) -> None:
if self._frame % int(1. / DT_MDL) == 0:
if self._frame % WMACConstants.PARAM_READ_FRAMES == 0:
self._enabled = self._params.get_bool("DynamicExperimentalControl")
def mode(self) -> str:
@@ -198,7 +184,6 @@ class DynamicExperimentalController:
return self._active
def set_mpc_fcw_crash_cnt(self) -> None:
"""Set MPC FCW crash count"""
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
def _update_calculations(self, sm: messaging.SubMaster) -> None:
@@ -210,179 +195,109 @@ class DynamicExperimentalController:
self._v_cruise_kph = car_state.vCruise
self._has_standstill = car_state.standstill
# standstill detection
if self._has_standstill:
self._standstill_count = min(20, self._standstill_count + 1)
self._standstill_count = min(WMACConstants.STANDSTILL_FRAMES * 3, self._standstill_count + 1)
else:
self._standstill_count = max(0, self._standstill_count - 1)
# Lead detection
self._lead_filter.add_data(float(lead_one.status))
lead_value = self._lead_filter.get_value() or 0.0
self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB
# MPC FCW detection
fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0
self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0))
self._has_mpc_fcw = fcw_filtered_value > 0.5
# Slow down detection
self._has_lead_filtered = self._lead_tracker.update(float(lead_one.status))
self._has_radar_acc_lead = self._radar_acc_lead_tracker.update(self._radar_acc_lead_score(lead_one))
self._has_mpc_fcw = self._mpc_fcw_crash_cnt > 0
self._calculate_slow_down(md)
# Slowness detection
if not (self._standstill_count > 5) and not self._has_slow_down:
if self._standstill_count > WMACConstants.STANDSTILL_FRAMES or self._has_slow_down:
self._slowness_tracker.reset()
self._has_slowness = False
else:
current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
self._slowness_filter.add_data(current_slowness)
slowness_value = self._slowness_filter.get_value() or 0.0
self._has_slowness = self._slowness_tracker.update(current_slowness)
# Hysteresis for slowness
threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1)
self._has_slowness = slowness_value > threshold
def _calculate_slow_down(self, md):
"""Calculate urgency based on trajectory endpoint vs expected distance."""
# Reset to safe defaults
urgency = 0.0
def _calculate_slow_down(self, md) -> None:
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
#Require exact trajectory size
position_valid = len(md.position.x) == TRAJECTORY_SIZE
orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE
urgency = self._model_action_urgency(md)
position_valid = len(md.position.x) == WMACConstants.TRAJECTORY_SIZE
if not (position_valid and orientation_valid):
# Invalid trajectory - this itself might indicate a stop scenario
# Apply moderate urgency for incomplete trajectories at speed
if self._v_ego_kph > 20.0:
urgency = 0.3
if position_valid:
self._trajectory_valid = True
self._endpoint_x = md.position.x[WMACConstants.TRAJECTORY_SIZE - 1]
self._expected_distance = interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST)
urgency = max(urgency, self._endpoint_urgency(self._endpoint_x, self._expected_distance))
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB
self._urgency = urgency_filtered
return
self._raw_urgency = clip01(urgency)
self._has_slow_down = self._slow_down_tracker.update(self._raw_urgency)
self._urgency = self._slow_down_tracker.value
# We have a valid full trajectory
self._trajectory_valid = True
def _radar_acc_lead_score(self, lead_one) -> float:
if not lead_one.status:
return 0.0
# Use the exact endpoint (33rd point, index 32)
endpoint_x = md.position.x[TRAJECTORY_SIZE - 1]
self._endpoint_x = endpoint_x
d_rel = float(getattr(lead_one, 'dRel', float('inf')))
v_rel = float(getattr(lead_one, 'vRel', 0.0))
if d_rel <= WMACConstants.RADAR_LEAD_ACC_MAX_DREL:
return 1.0
if v_rel <= WMACConstants.RADAR_LEAD_ACC_MIN_CLOSING_SPEED and d_rel / max(-v_rel, 0.1) <= WMACConstants.RADAR_LEAD_ACC_MAX_TTC:
return 1.0
return 0.0
# Get expected distance based on current speed using tuned constants
expected_distance = interp(self._v_ego_kph,
WMACConstants.SLOW_DOWN_BP,
WMACConstants.SLOW_DOWN_DIST)
self._expected_distance = expected_distance
def _model_action_urgency(self, md) -> float:
action = getattr(md, 'action', None)
if action is None:
return 0.0
# Calculate urgency based on trajectory shortage
if endpoint_x < expected_distance:
shortage = expected_distance - endpoint_x
shortage_ratio = shortage / expected_distance
urgency = 1.0 if getattr(action, 'shouldStop', False) else 0.0
desired_accel = getattr(action, 'desiredAcceleration', 0.0)
if desired_accel < WMACConstants.MODEL_DECEL_START:
urgency = max(urgency, min(1.0, (WMACConstants.MODEL_DECEL_START - desired_accel) / WMACConstants.MODEL_DECEL_RANGE))
return urgency
# Base urgency on shortage ratio
urgency = min(1.0, shortage_ratio * 2.0)
def _endpoint_urgency(self, endpoint_x: float, expected_distance: float) -> float:
if endpoint_x >= expected_distance:
return 0.0
# Increase urgency for very short trajectories (imminent stops)
critical_distance = expected_distance * 0.3
if endpoint_x < critical_distance:
urgency = min(1.0, urgency * 2.0)
shortage_ratio = (expected_distance - endpoint_x) / expected_distance
urgency = min(1.0, shortage_ratio * WMACConstants.ENDPOINT_URGENCY_GAIN)
# Speed-based urgency adjustment
if self._v_ego_kph > 25.0:
speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0
urgency = min(1.0, urgency * speed_factor)
if endpoint_x < expected_distance * WMACConstants.CRITICAL_ENDPOINT_FACTOR:
urgency = min(1.0, urgency * WMACConstants.CRITICAL_URGENCY_GAIN)
# Apply filtering but with less smoothing for stops
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
if self._v_ego_kph > WMACConstants.SPEED_URGENCY_MIN:
speed_factor = 1.0 + (self._v_ego_kph - WMACConstants.SPEED_URGENCY_MIN) / WMACConstants.SPEED_URGENCY_RANGE
urgency = min(1.0, urgency * speed_factor)
# Update state with lower threshold for better stop detection
self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8)
self._urgency = urgency_filtered
return urgency
def _radarless_mode(self) -> None:
"""Radarless mode decision logic with emergency handling."""
def _desired_mode(self) -> tuple[ModeType, bool]:
if not self._CP.radarUnavailable and self._has_radar_acc_lead:
return 'acc', False
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
return 'blended', True
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
standstill = self._standstill_count > WMACConstants.STANDSTILL_FRAMES
urgent_slow_down = self._has_slow_down and self._raw_urgency > WMACConstants.URGENT_SLOW_DOWN_PROB
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.5)
self._mode_manager.request_mode('blended', confidence=confidence)
return
if self._CP.radarUnavailable:
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
return 'acc', False
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
def _radar_mode(self) -> None:
"""Radar mode with emergency handling."""
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
# If lead detected and not in standstill: always use ACC
if self._has_lead_filtered and not (self._standstill_count > 3):
self._mode_manager.request_mode('acc', confidence=1.0)
return
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.3)
self._mode_manager.request_mode('blended', confidence=confidence)
return
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
return 'acc', False
def update(self, sm: messaging.SubMaster) -> None:
self._read_params()
self.set_mpc_fcw_crash_cnt()
self._update_calculations(sm)
if self._CP.radarUnavailable:
self._radarless_mode()
else:
self._radar_mode()
mode, immediate = self._desired_mode()
self._mode_manager.request_mode(mode, immediate=immediate, hold_frames=WMACConstants.EMERGENCY_HOLD_FRAMES,
cancel_hold=self._has_radar_acc_lead)
self._mode_manager.update()
self._active = sm['selfdriveState'].experimentalMode and self._enabled
self._frame += 1

View File

@@ -1,94 +0,0 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
class MockLeadOne:
def __init__(self, status=0.0):
self.status = status
class MockRadarState:
def __init__(self, status=0.0):
self.leadOne = MockLeadOne(status=status)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockModelData:
def __init__(self, valid=True):
size = 33 if valid else 10 # incomplete if invalid
self.position = type("Pos", (), {"x": [0.0] * size})()
self.orientation = type("Ori", (), {"x": [0.0] * size})()
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
# Fake Kalman Filter that always returns a given value
class FakeKalman:
def __init__(self, value=1.0):
self.value = value
def add_data(self, v): pass
def get_value(self): return self.value
def get_confidence(self): return 1.0
def reset_data(self): pass
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['carState'].standstill = True
for _ in range(10):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
mock_mpc.crash_cnt = 1 # simulate FCW
for _ in range(2):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
# Force conditions to simulate slowdown
controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown
controller._v_ego_kph = 35.0
default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory
for _ in range(3):
controller.update(default_sm)
assert controller.mode() == "blended"

View File

@@ -0,0 +1,235 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, HysteresisSignal
class MockLeadOne:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.status = status
self.dRel = dRel
self.vRel = vRel
class MockRadarState:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.leadOne = MockLeadOne(status=status, dRel=dRel, vRel=vRel)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockAction:
def __init__(self, desiredAcceleration=0.0, shouldStop=False):
self.desiredAcceleration = desiredAcceleration
self.shouldStop = shouldStop
class MockModelData:
def __init__(self, valid=True, endpoint_x=200.0, orientation_valid=None, desired_acceleration=0.0, should_stop=False):
position_size = 33 if valid else 10
orientation_size = position_size if orientation_valid is None else (33 if orientation_valid else 10)
position_x = [0.0] * position_size
if position_x:
position_x[-1] = endpoint_x
self.position = type("Pos", (), {"x": position_x})()
self.orientation = type("Ori", (), {"x": [0.0] * orientation_size})()
self.acceleration = type("Accel", (), {"x": [0.0] * position_size})()
self.action = MockAction(desired_acceleration, should_stop)
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['carState'].standstill = True
for _ in range(20):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
mock_mpc.crash_cnt = 1
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
def test_valid_position_with_missing_orientation_can_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, orientation_valid=False)
controller.update(default_sm)
assert controller._trajectory_valid
assert controller.mode() == "blended"
def test_incomplete_position_does_not_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert not controller._trajectory_valid
assert not controller._has_slow_down
assert controller.mode() == "acc"
def test_slowdown_hysteresis_prevents_threshold_chatter():
signal = HysteresisSignal(enter_threshold=0.5, exit_threshold=0.4, rise_rate=1.0, fall_rate=1.0)
assert signal.update(0.55)
assert signal.update(0.45)
assert not signal.update(0.35)
def test_model_should_stop_triggers_blended_without_valid_trajectory(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, should_stop=True)
controller.update(default_sm)
assert not controller._trajectory_valid
assert controller.mode() == "blended"
def test_radar_lead_keeps_acc_over_model_slowdown(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert controller._has_slow_down
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_far_radar_lead_allows_blended_until_acc_relevant(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert not controller._has_radar_acc_lead
assert controller.mode() == "blended"
def test_relevant_radar_lead_smoothly_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
default_sm['radarState'] = MockRadarState(status=1.0, dRel=45.0, vRel=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_closing_far_radar_lead_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=-25.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_radar_lead_keeps_acc_over_fcw_and_standstill(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['carState'].standstill = True
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, should_stop=True)
mock_mpc.crash_cnt = 1
for _ in range(10):
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller._has_mpc_fcw
assert controller.mode() == "acc"
def test_lead_flicker_hold_prevents_one_frame_mode_flip(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
controller.update(default_sm)
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller.mode() == "acc"

View File

@@ -0,0 +1,259 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections import deque
from dataclasses import dataclass
from openpilot.common.realtime import DT_MDL
_HOLD_FRAMES = 16
_STATUS_WINDOW = 20
_STABILITY_FLIPS_FULL = 6.0
# Lead target-switch slew: on a sudden closer dRel jump, lag reported dRel and decay to
# truth (true vRel/vLead/aLeadK pass through); urgent switches pass through.
_SWITCH_STEP_THRESH = 5.0 # m, single-frame closer jump = switch
_SWITCH_TTC_MIN = 4.0 # s, below this = urgent, no masking
_SWITCH_PASSTHROUGH_VREL = -8.0 # m/s, faster closing = no masking
_SLEW_DECAY = 0.88 # per-frame offset decay
_SLEW_OFFSET_MAX = 25.0 # m
_SLEW_OFFSET_EPS = 0.5 # m, snap to zero below this
# Phantom-lead mask: a fresh low-modelProb close lead is usually a radar ghost; mask it
# so the MPC ignores it. Established/held leads are never masked (see smooth()).
_PHANTOM_MODELPROB_MAX = 0.5
_PHANTOM_DREL_MAX = 5.0
@dataclass
class _LeadSnap:
dRel: float = 0.0
yRel: float = 0.0
vRel: float = 0.0
vLead: float = 0.0
aLeadK: float = 0.0
aLeadTau: float = 0.0
modelProb: float = 0.0
aRel: float = 0.0
fcw: bool = False
class _LeadProxy:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'aLeadK', 'aLeadTau',
'modelProb', 'aRel', 'fcw')
def __init__(self, snap: _LeadSnap):
self.status = True
self.dRel = snap.dRel
self.yRel = snap.yRel
self.vRel = snap.vRel
self.vLead = snap.vLead
self.aLeadK = snap.aLeadK
self.aLeadTau = snap.aLeadTau
self.modelProb = snap.modelProb
self.aRel = snap.aRel
self.fcw = snap.fcw
class _LeadProxyMasked:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'aLeadK', 'aLeadTau',
'modelProb', 'aRel', 'fcw')
def __init__(self):
self.status = False
self.dRel = 0.0
self.yRel = 0.0
self.vRel = 0.0
self.vLead = 0.0
self.aLeadK = 0.0
self.aLeadTau = 0.0
self.modelProb = 0.0
self.aRel = 0.0
self.fcw = False
class _RadarStateProxy:
__slots__ = ('_raw', '_lead_one', '_lead_two')
def __init__(self, raw, lead_one, lead_two):
self._raw = raw
self._lead_one = lead_one
self._lead_two = lead_two
@property
def leadOne(self):
return self._lead_one if self._lead_one is not None else self._raw.leadOne
@property
def leadTwo(self):
return self._lead_two if self._lead_two is not None else self._raw.leadTwo
def __getattr__(self, name):
return getattr(self._raw, name)
class LeadPersistence:
"""Internal helper. Hold last-known leadOne/leadTwo alive for HOLD_FRAMES
after a status drop, so the MPC view of radarState ignores brief flicker.
Also masks freshly-acquired close-range low-confidence phantom leads so the
planner doesn't demand emergency brake on radar ghosts.
No own param — owner (RadarDistanceController) gates via force_enabled."""
def __init__(self):
self._last_one: _LeadSnap | None = None
self._last_two: _LeadSnap | None = None
self._alive_one = 0
self._alive_two = 0
self._status_hist: deque[bool] = deque(maxlen=_STATUS_WINDOW)
self._stability = 1.0
# lead target-switch slew state (leadOne only)
self._slew_offset = 0.0
self._reported_one_dRel: float | None = None
self._prev_raw_one_dRel: float | None = None
self._prev_raw_one_vRel = 0.0
@property
def stability(self) -> float:
return self._stability
@property
def slew_offset(self) -> float:
return self._slew_offset
def reset(self) -> None:
self._last_one = None
self._last_two = None
self._alive_one = 0
self._alive_two = 0
self._status_hist.clear()
self._stability = 1.0
self._slew_offset = 0.0
self._reported_one_dRel = None
self._prev_raw_one_dRel = None
self._prev_raw_one_vRel = 0.0
def update(self, radarstate, force_enabled: bool = True) -> None:
if radarstate is None:
return
if not force_enabled:
self.reset()
return
one = radarstate.leadOne
two = radarstate.leadTwo
# phantom counts as not-valid so hold-alive doesn't re-inject the ghost
one_valid = bool(one.status) and not self._is_phantom(one)
two_valid = bool(two.status) and not self._is_phantom(two)
self._compute_switch_slew(one, one_valid)
if one_valid:
self._last_one = self._snap(one)
self._alive_one = _HOLD_FRAMES
elif self._alive_one > 0:
self._alive_one -= 1
if two_valid:
self._last_two = self._snap(two)
self._alive_two = _HOLD_FRAMES
elif self._alive_two > 0:
self._alive_two -= 1
self._status_hist.append(bool(one.status))
if len(self._status_hist) >= 5:
flips = sum(1 for i in range(1, len(self._status_hist))
if self._status_hist[i] != self._status_hist[i - 1])
self._stability = max(0.0, 1.0 - min(1.0, flips / _STABILITY_FLIPS_FULL))
else:
self._stability = 1.0
def _compute_switch_slew(self, one, one_valid: bool) -> None:
"""Carry a decaying dRel lag across a leadOne target-switch; urgent switches pass through."""
if not one_valid:
self._slew_offset = 0.0
self._reported_one_dRel = None
self._prev_raw_one_dRel = None
self._prev_raw_one_vRel = 0.0
return
raw_d = float(one.dRel)
raw_v = float(one.vRel)
ttc = raw_d / max(0.1, -raw_v) if raw_v < 0.0 else float('inf')
urgent = ttc <= _SWITCH_TTC_MIN or raw_v <= _SWITCH_PASSTHROUGH_VREL
if urgent:
self._slew_offset = 0.0
else:
self._slew_offset = self._slew_offset * _SLEW_DECAY if self._slew_offset > _SLEW_OFFSET_EPS else 0.0
if self._prev_raw_one_dRel is not None and self._reported_one_dRel is not None:
expected = self._prev_raw_one_dRel + self._prev_raw_one_vRel * DT_MDL
if raw_d < expected - _SWITCH_STEP_THRESH: # closer jump = switch
new_offset = self._reported_one_dRel - raw_d
self._slew_offset = min(_SLEW_OFFSET_MAX, max(self._slew_offset, new_offset))
self._reported_one_dRel = raw_d + self._slew_offset
self._prev_raw_one_dRel = raw_d
self._prev_raw_one_vRel = raw_v
@staticmethod
def _is_phantom(lead) -> bool:
if not (bool(lead.status)
and float(lead.modelProb) < _PHANTOM_MODELPROB_MAX
and float(lead.dRel) < _PHANTOM_DREL_MAX):
return False
# don't mask an urgently-closing lead
v_rel = float(lead.vRel)
if v_rel <= _SWITCH_PASSTHROUGH_VREL:
return False
ttc = float(lead.dRel) / max(0.1, -v_rel) if v_rel < 0.0 else float('inf')
return ttc > _SWITCH_TTC_MIN
def smooth(self, radarstate, force_enabled: bool = True):
if not force_enabled or radarstate is None:
return radarstate
l1 = None
l2 = None
# mask only a fresh ghost (alive==0); never drop an established/held lead
if self._is_phantom(radarstate.leadOne) and self._alive_one == 0:
l1 = _LeadProxyMasked()
elif not radarstate.leadOne.status and self._alive_one > 0 and self._last_one is not None:
l1 = _LeadProxy(self._last_one)
elif radarstate.leadOne.status and self._slew_offset > _SLEW_OFFSET_EPS:
# target-switch slew: lag dRel only, true vRel/vLead/aLeadK pass through
snap = self._snap(radarstate.leadOne)
snap.dRel = snap.dRel + self._slew_offset
l1 = _LeadProxy(snap)
if self._is_phantom(radarstate.leadTwo) and self._alive_two == 0:
l2 = _LeadProxyMasked()
elif not radarstate.leadTwo.status and self._alive_two > 0 and self._last_two is not None:
l2 = _LeadProxy(self._last_two)
if l1 is None and l2 is None:
return radarstate
return _RadarStateProxy(radarstate, l1, l2)
@staticmethod
def _snap(lead) -> _LeadSnap:
return _LeadSnap(
dRel=float(lead.dRel),
yRel=float(lead.yRel),
vRel=float(lead.vRel),
vLead=float(lead.vLead),
aLeadK=float(lead.aLeadK),
aLeadTau=float(lead.aLeadTau),
modelProb=float(lead.modelProb),
aRel=float(lead.aRel),
fcw=bool(lead.fcw),
)

View File

@@ -9,8 +9,10 @@ from cereal import messaging, custom
from opendbc.car import structs
from openpilot.common.constants import CV
from openpilot.selfdrive.car.cruise import V_CRUISE_MAX
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
from openpilot.sunnypilot.selfdrive.controls.lib.e2e_alerts_helper import E2EAlertsHelper
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import RadarDistanceController
from openpilot.sunnypilot.selfdrive.controls.lib.smart_cruise_control.smart_cruise_control import SmartCruiseControl
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_resolver import SpeedLimitResolver
@@ -26,6 +28,8 @@ class LongitudinalPlannerSP:
self.events_sp = EventsSP()
self.resolver = SpeedLimitResolver()
self.dec = DynamicExperimentalController(CP, mpc)
self.accel = AccelController(CP, mpc)
self.radar_distance = RadarDistanceController()
self.scc = SmartCruiseControl()
self.resolver = SpeedLimitResolver()
self.sla = SpeedLimitAssist(CP, CP_SP)
@@ -35,6 +39,14 @@ class LongitudinalPlannerSP:
self.output_v_target = 0.
self.output_a_target = 0.
self._smoothed_radarstate = None
def smooth_radarstate(self, radarstate):
# RadarDistance: hold leads through flicker, mask fresh close phantoms, and slew dRel across
# lead target-switches so the MPC doesn't demand a catch-up hard brake. Cached per cycle.
if self._smoothed_radarstate is None:
self._smoothed_radarstate = self.radar_distance.smooth_radarstate(radarstate)
return self._smoothed_radarstate
def is_e2e(self, sm: messaging.SubMaster) -> bool:
experimental_mode = sm['selfdriveState'].experimentalMode
@@ -74,8 +86,11 @@ class LongitudinalPlannerSP:
return self.output_v_target, self.output_a_target
def update(self, sm: messaging.SubMaster) -> None:
self._smoothed_radarstate = None
self.events_sp.clear()
self.dec.update(sm)
self.accel.update(sm)
self.radar_distance.update(sm)
self.e2e_alerts_helper.update(sm, self.events_sp)
def publish_longitudinal_plan_sp(self, sm: messaging.SubMaster, pm: messaging.PubMaster) -> None:
@@ -95,6 +110,14 @@ class LongitudinalPlannerSP:
dec.enabled = self.dec.enabled()
dec.active = self.dec.active()
# Acceleration Personality
acceleration = longitudinalPlanSP.acceleration
acceleration.personality = self.accel.personality()
acceleration.enabled = self.accel.enabled()
acceleration.maxAccel = float(self.accel.max_accel())
acceleration.tFollowOffset = float(self.accel.t_follow_offset())
acceleration.jerkFactorScale = float(self.accel.jerk_factor_scale())
# Smart Cruise Control
smartCruiseControl = longitudinalPlanSP.smartCruiseControl
# Vision Control

View File

@@ -0,0 +1,58 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import LeadPersistence
_PARAM_REFRESH_FRAMES = max(1, int(1.0 / DT_MDL))
class RadarDistanceController:
"""Holds last-known radar leads alive through brief flicker and masks close-range
phantom leads, so the MPC view of radarState stays stable. The behavior lives in
LeadPersistence; this owns the RadarDistance param gate."""
def __init__(self):
self.params = Params()
self._frame = 0
self._enabled = self.params.get_bool('RadarDistance')
self._lead_persistence = LeadPersistence()
def is_enabled(self) -> bool:
return self._enabled
def set_enabled(self, enabled: bool):
self._enabled = bool(enabled)
self.params.put_bool('RadarDistance', self._enabled)
def toggle(self) -> bool:
self.set_enabled(not self._enabled)
return self._enabled
def update(self, sm=None, sm_sp=None) -> None:
self._frame += 1
if self._frame % _PARAM_REFRESH_FRAMES == 0:
self._enabled = self.params.get_bool('RadarDistance')
radarstate = None
if sm is not None:
try:
radarstate = sm['radarState']
except (KeyError, AttributeError, TypeError):
radarstate = None
self._lead_persistence.update(radarstate, force_enabled=self._enabled)
def smooth_radarstate(self, radarstate):
if not self._enabled:
return radarstate
return self._lead_persistence.smooth(radarstate, force_enabled=True)
def reset(self) -> None:
self._lead_persistence.reset()

View File

@@ -0,0 +1,84 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import numpy as np
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.common.params import Params
NEARSIDE_PROB = 0.2
EDGE_PROB = 0.35
EDGE_REACTION_TIME = 1.0
EDGE_CLEAR_TIME = 0.3
MIN_SPEED = 20 * CV.MPH_TO_MS
class RoadEdgeLaneChangeController:
def __init__(self, desire_helper):
self.DH = desire_helper
self.params = Params()
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
self.param_read_counter = 0
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def read_params(self) -> None:
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
def update_params(self) -> None:
if self.param_read_counter % 50 == 0:
self.read_params()
self.param_read_counter += 1
def reset(self) -> None:
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def update(self, road_edge_stds, lane_line_probs, v_ego: float) -> None:
self.update_params()
if not self.enabled or v_ego < MIN_SPEED:
self.reset()
return
left_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
right_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
left_lane_prob = lane_line_probs[0]
right_lane_prob = lane_line_probs[3]
left_cond = left_edge_prob > EDGE_PROB and left_lane_prob < NEARSIDE_PROB and right_lane_prob >= left_lane_prob
right_cond = right_edge_prob > EDGE_PROB and right_lane_prob < NEARSIDE_PROB and left_lane_prob >= right_lane_prob
if left_cond:
self.left_edge_timer = min(self.left_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.left_clear_timer = 0.0
if self.left_edge_timer > EDGE_REACTION_TIME:
self.left_edge_detected = True
else:
self.left_clear_timer += DT_MDL
if self.left_clear_timer > EDGE_CLEAR_TIME:
self.left_edge_timer = 0.0
self.left_edge_detected = False
if right_cond:
self.right_edge_timer = min(self.right_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.right_clear_timer = 0.0
if self.right_edge_timer > EDGE_REACTION_TIME:
self.right_edge_detected = True
else:
self.right_clear_timer += DT_MDL
if self.right_clear_timer > EDGE_CLEAR_TIME:
self.right_edge_timer = 0.0
self.right_edge_detected = False

View File

@@ -0,0 +1,43 @@
from openpilot.common.realtime import DT_CTRL
from openpilot.sunnypilot.selfdrive.controls.lib.brake_smoother import BrakeOnsetSmoother, BRAKE_ONSET_JERK, HARD_BRAKE_THRESH
MAX_STEP = BRAKE_ONSET_JERK * DT_CTRL
class MockParams:
def __init__(self, enabled):
self._v = {"SmoothBraking": enabled}
def get_bool(self, k):
return bool(self._v.get(k, False))
def test_disabled_passthrough():
s = BrakeOnsetSmoother(params=MockParams(False))
assert s.apply(-1.0, 0.0, -0.5) == -1.0
def test_comfort_onset_rate_limited():
s = BrakeOnsetSmoother(params=MockParams(True))
# deepening comfort brake (a_target above the hard threshold) -> clamped to last - max_step
out = s.apply(-0.5, 0.0, -0.5)
assert out == -MAX_STEP
def test_hard_brake_passthrough():
s = BrakeOnsetSmoother(params=MockParams(True))
# a_target at/below hard threshold -> no limiting, full brake passes through
assert s.apply(-1.0, 0.0, HARD_BRAKE_THRESH - 0.1) == -1.0
def test_release_passthrough():
s = BrakeOnsetSmoother(params=MockParams(True))
# output less negative than last (releasing brake) -> not limited
assert s.apply(-0.2, -0.5, -0.3) == -0.2
def test_already_within_rate():
s = BrakeOnsetSmoother(params=MockParams(True))
# small deepening within the cap is unchanged
out = s.apply(-MAX_STEP * 0.5, 0.0, -0.3)
assert out == -MAX_STEP * 0.5

View File

@@ -5,6 +5,8 @@ from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.lane_turn_desire import LaneTurnController, LANE_CHANGE_SPEED_MIN
from openpilot.sunnypilot.selfdrive.controls.lib.auto_lane_change import AutoLaneChangeMode
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
TurnDirection = custom.ModelDataV2SP.TurnDirection
@@ -107,7 +109,11 @@ def set_lane_turn_params():
])
def test_desire_helper_integration(carstate, lateral_active, lane_change_prob, expected_desire, set_lane_turn_params):
dh = DesireHelper()
relc = RoadEdgeLaneChangeController(dh)
relc.enabled = True
dh.alc.lane_change_set_timer = AutoLaneChangeMode.NUDGE
for _ in range(10):
dh.update(carstate, lateral_active, lane_change_prob)
dh.update(carstate, lateral_active, lane_change_prob,
left_edge_detected=relc.left_edge_detected, right_edge_detected=relc.right_edge_detected)
assert dh.desire == expected_desire # The first four tests were unit tests to test the controller, where this tests the integration in desire helpers

View File

@@ -0,0 +1,251 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import (
LeadPersistence,
_HOLD_FRAMES,
)
class FakeLead:
def __init__(self, status=False, d_rel=0.0, v_rel=0.0, v_lead=0.0,
a_lead=0.0, a_tau=0.0, model_prob=0.0, y_rel=0.0, a_rel=0.0, fcw=False):
self.status = status
self.dRel = d_rel
self.yRel = y_rel
self.vRel = v_rel
self.vLead = v_lead
self.aLeadK = a_lead
self.aLeadTau = a_tau
self.modelProb = model_prob
self.aRel = a_rel
self.fcw = fcw
class FakeRadarState:
def __init__(self, lead_one=None, lead_two=None, extra='ok'):
self.leadOne = lead_one or FakeLead()
self.leadTwo = lead_two or FakeLead()
self.extra = extra
def _make():
return LeadPersistence()
class TestLeadPersistence:
def test_disabled_passthrough(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
lp.update(raw, force_enabled=False)
out = lp.smooth(raw, force_enabled=False)
assert out is raw
def test_status_true_passthrough(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0))
for _ in range(3):
lp.update(raw)
out = lp.smooth(raw)
# leadOne still truly present → passthrough (or wrapper with same .status=True)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
def test_dropout_held(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0, a_lead=-1.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadOne.aLeadK == -1.0
def test_dropout_expires(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(_HOLD_FRAMES + 2):
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is False
def test_reappearance_resets_hold(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(3):
lp.update(raw_off)
raw_on2 = FakeRadarState(lead_one=FakeLead(status=True, d_rel=28.0))
lp.update(raw_on2)
out = lp.smooth(raw_on2)
assert out.leadOne.status is True
assert out.leadOne.dRel == 28.0
def test_other_attrs_passthrough(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0), extra='special')
for _ in range(3):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False), extra='special2')
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.extra == 'special2'
def test_stability_high_on_solid_lead(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(10):
lp.update(raw)
assert lp.stability >= 0.9
def test_stability_low_on_churn(self):
lp = _make()
on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(10):
lp.update(on)
lp.update(off)
assert lp.stability < 0.5
def test_leadtwo_independent(self):
lp = _make()
raw_on = FakeRadarState(
lead_one=FakeLead(status=True, d_rel=30.0),
lead_two=FakeLead(status=True, d_rel=80.0),
)
for _ in range(5):
lp.update(raw_on)
raw_off2 = FakeRadarState(
lead_one=FakeLead(status=True, d_rel=30.0),
lead_two=FakeLead(status=False),
)
lp.update(raw_off2)
out = lp.smooth(raw_off2)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadTwo.status is True
assert out.leadTwo.dRel == 80.0
def test_reset(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw)
lp.reset()
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is False
def _step(lp, lead):
raw = FakeRadarState(lead_one=lead)
lp.update(raw)
return lp.smooth(raw)
class TestPhantomMask:
def test_masked_when_not_urgent(self):
lp = _make()
# fresh, low modelProb, close, slow closing (TTC 8s) -> phantom ghost -> masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=4.0, v_rel=-0.5, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is False
def test_not_masked_when_urgent_ttc(self):
lp = _make()
# fresh, low modelProb, close, TTC 3s <= 4 -> real cut-in risk -> NOT masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=3.0, v_rel=-1.0, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is True
assert out.leadOne.dRel == 3.0
def test_not_masked_when_fast_closing(self):
lp = _make()
# fresh, low modelProb, close, vRel -10 <= -8 -> NOT masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=4.0, v_rel=-10.0, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is True
class TestLeadSwitchSlew:
def _settle_far(self, lp, d=45.0):
out = None
for _ in range(6):
out = _step(lp, FakeLead(status=True, d_rel=d, v_rel=-3.0, v_lead=22.0))
return out
def test_steady_lead_no_slew(self):
lp = _make()
out = self._settle_far(lp, d=45.0)
assert out.leadOne.dRel == 45.0
assert lp.slew_offset == 0.0
def test_switch_lags_dRel_keeps_true_vrel(self):
lp = _make()
self._settle_far(lp, d=45.0)
# switch to a nearer car: 28 m, closing -5 (TTC 5.6 s > 4) -> non-urgent
out = _step(lp, FakeLead(status=True, d_rel=28.0, v_rel=-5.0, v_lead=19.0))
assert out.leadOne.dRel > 40.0 # reported stays continuous, not stepped to 28
assert out.leadOne.dRel >= 28.0 # never under-reports distance
assert out.leadOne.vRel == -5.0 # true closing speed passes through
assert out.leadOne.vLead == 19.0
def test_switch_converges_to_truth(self):
lp = _make()
self._settle_far(lp, d=45.0)
out = _step(lp, FakeLead(status=True, d_rel=28.0, v_rel=-5.0, v_lead=19.0))
prev = out.leadOne.dRel
for _ in range(40):
out = _step(lp, FakeLead(status=True, d_rel=28.0, v_rel=-5.0, v_lead=19.0))
assert out.leadOne.dRel <= prev + 1e-6 # monotonically eases down
assert out.leadOne.dRel >= 28.0 - 1e-6 # never below truth
prev = out.leadOne.dRel
assert abs(out.leadOne.dRel - 28.0) < 1.0 # converged to real distance
def test_urgent_switch_passes_through(self):
lp = _make()
self._settle_far(lp, d=45.0)
# nearer car at 12 m closing -5 -> TTC 2.4 s < 4 -> urgent, no masking
out = _step(lp, FakeLead(status=True, d_rel=12.0, v_rel=-5.0, v_lead=10.0))
assert out.leadOne.dRel == 12.0
assert lp.slew_offset == 0.0
def test_fast_closing_switch_passes_through(self):
lp = _make()
self._settle_far(lp, d=60.0)
# vRel -10 <= -8 -> emergency closing, no masking even though TTC ok
out = _step(lp, FakeLead(status=True, d_rel=40.0, v_rel=-10.0, v_lead=15.0))
assert out.leadOne.dRel == 40.0
assert lp.slew_offset == 0.0
def test_normal_closing_not_slewed(self):
lp = _make()
d = 45.0
self._settle_far(lp, d=d)
for _ in range(20):
d -= 0.25 # ~5 m/s closing at 20 Hz, continuous (not a switch)
out = _step(lp, FakeLead(status=True, d_rel=d, v_rel=-5.0, v_lead=20.0))
assert out.leadOne.dRel == d
assert lp.slew_offset == 0.0
def test_lead_moving_away_not_slewed(self):
lp = _make()
self._settle_far(lp, d=30.0)
# sudden farther jump (lead drops out / switch to far car) -> no brake risk, no lag
out = _step(lp, FakeLead(status=True, d_rel=60.0, v_rel=-1.0, v_lead=24.0))
assert out.leadOne.dRel == 60.0
assert lp.slew_offset == 0.0

View File

@@ -0,0 +1,96 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import RadarDistanceController
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import _HOLD_FRAMES
class FakeLead:
def __init__(self, status=False, d_rel=0.0, v_rel=0.0, v_lead=0.0, a_lead=0.0,
a_tau=0.0, model_prob=0.95, y_rel=0.0, a_rel=0.0, fcw=False):
self.status = status
self.dRel = d_rel
self.yRel = y_rel
self.vRel = v_rel
self.vLead = v_lead
self.aLeadK = a_lead
self.aLeadTau = a_tau
self.modelProb = model_prob
self.aRel = a_rel
self.fcw = fcw
class FakeRadarState:
def __init__(self, lead_one=None, lead_two=None):
self.leadOne = lead_one or FakeLead()
self.leadTwo = lead_two or FakeLead()
class FakeSM:
def __init__(self, radarstate):
self._data = {'radarState': radarstate}
def __getitem__(self, k):
return self._data[k]
def _make(enabled=True):
c = RadarDistanceController()
c.set_enabled(enabled)
return c
class TestRadarDistanceController:
def test_toggle(self):
c = _make(enabled=False)
assert c.is_enabled() is False
assert c.toggle() is True
assert c.is_enabled() is True
assert c.toggle() is False
def test_disabled_passthrough(self):
c = _make(enabled=False)
rs = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
c.update(FakeSM(rs))
assert c.smooth_radarstate(rs) is rs
def test_no_sm_safe(self):
c = _make()
c.update(None, None) # must not raise
def test_holds_dropped_lead(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0, a_lead=-1.0))
for _ in range(5):
c.update(FakeSM(rs_on))
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadOne.aLeadK == -1.0
def test_hold_expires(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
c.update(FakeSM(rs_on))
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(_HOLD_FRAMES + 2):
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is False
def test_reset_clears_hold(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
c.update(FakeSM(rs_on))
c.reset()
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is False

View File

@@ -0,0 +1,99 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.relc import (
RoadEdgeLaneChangeController, EDGE_REACTION_TIME, EDGE_CLEAR_TIME, MIN_SPEED,
)
V_HIGH = MIN_SPEED + 2.0
V_LOW = MIN_SPEED - 1.0
@pytest.fixture
def relc(mocker):
mock_params = mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.relc.Params")
mock_params.return_value.get_bool.return_value = True
controller = RoadEdgeLaneChangeController(DesireHelper())
controller.enabled = True
return controller
def drive(controller, road_edge_stds, lane_line_probs, seconds, v_ego=V_HIGH):
for _ in range(int(seconds / DT_MDL) + 1):
controller.update(road_edge_stds, lane_line_probs, v_ego)
@pytest.mark.parametrize("road_edge_stds,lane_line_probs,attr", [
([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], "left_edge_detected"),
([0.9, 0.0], [0.8, 0.8, 0.8, 0.0], "right_edge_detected"),
])
def test_edge_detection(relc, road_edge_stds, lane_line_probs, attr):
drive(relc, road_edge_stds, lane_line_probs, EDGE_REACTION_TIME + 0.1)
assert getattr(relc, attr)
def test_edge_detection_requires_time(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME - 0.05)
assert not relc.left_edge_detected
def test_both_edges_detected(relc):
drive(relc, [0.0, 0.0], [0.0, 0.8, 0.8, 0.0], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
assert relc.right_edge_detected
def test_noise_doesnt_clear(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update(*clear, V_HIGH)
relc.update(*edge, V_HIGH)
assert relc.left_edge_detected
def test_clears_after_window(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
drive(relc, *clear, EDGE_CLEAR_TIME + 0.05)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_low_speed_skips(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1, v_ego=V_LOW)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_speed_drop_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_LOW)
assert not relc.left_edge_detected
def test_param_off_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.params.get_bool.return_value = False
relc.read_params()
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_HIGH)
assert not relc.left_edge_detected
assert not relc.right_edge_detected

View File

@@ -243,4 +243,12 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.prompt, 3.),
},
EventNameSP.laneChangeRoadEdge: {
ET.WARNING: Alert(
"Lane Change Unavailable: Road Edge",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 0.1),
},
}

View File

@@ -1118,6 +1118,10 @@
"title": "Record Front Lock",
"description": ""
},
"RoadEdgeLaneChangeEnabled": {
"title": "Block Lane Change: Road Edge Detection",
"description": ""
},
"RoadName": {
"title": "Road Name",
"description": ""
@@ -1323,6 +1327,22 @@
"max": 5.0,
"step": 0.1,
"unit": "m/s\u00b2"
},
"ToyotaAutoHold": {
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"description": ""
},
"ToyotaEnhancedBsm": {
"title": "Toyota: Prius TSS2 BSM and some tssp",
"description": ""
},
"ToyotaTSS2Long": {
"title": "Toyota: custom longitudinal for TSS2",
"description": ""
},
"ToyotaDriveMode": {
"title": "Enable drive mode btn link",
"description": ""
},
"ToyotaEnforceStockLongitudinal": {
"title": "Toyota: Enforce Factory Longitudinal Control",

View File

@@ -519,6 +519,12 @@
}
]
},
{
"key": "RoadEdgeLaneChangeEnabled",
"widget": "toggle",
"title": "Block Lane Change: Road Edge Detection",
"description": "Blocks lane change when the model sees a road edge on the side you signal."
},
{
"key": "AutoLaneChangeBsmDelay",
"widget": "toggle",
@@ -620,6 +626,72 @@
}
]
},
{
"key": "AccelPersonalityEnabled",
"widget": "toggle",
"title": "Enable Acceleration Profile",
"description": "When off, sunnypilot uses its default (Normal) acceleration behavior. Enable to use the Acceleration Profile below.",
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "AccelPersonality",
"widget": "multiple_button",
"title": "Acceleration Profile",
"description": "Controls how quickly sunnypilot accelerates while preserving braking and stop behavior.",
"options": [
{
"value": 0,
"label": "Eco"
},
{
"value": 1,
"label": "Normal"
},
{
"value": 2,
"label": "Sport"
}
],
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "RadarDistance",
"widget": "toggle",
"title": "Radar Lead Smoothing",
"description": "Holds radar leads through brief dropouts, ignores close-range radar ghosts, and eases braking when the radar lead target jumps, for smoother lead following. Emergency braking is unaffected.",
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "SmoothBraking",
"widget": "toggle",
"title": "Smooth Brake Onset",
"description": "Eases the start of light braking so it builds in gradually instead of biting sharply. Hard and emergency braking are unaffected.",
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "IntelligentCruiseButtonManagement",
"widget": "toggle",
@@ -2001,6 +2073,22 @@
"equals": true
}
]
},
{
"key": "PlanplusControl",
"widget": "option",
"title": "Plan Plus Controls",
"description": "Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover to lane center; too high and it will ping-pong.",
"min": 0.0,
"max": 2.0,
"step": 0.1,
"enablement": [
{
"type": "param",
"key": "ShowAdvancedControls",
"equals": true
}
]
}
]
},
@@ -2168,6 +2256,50 @@
"title": "Toyota / Lexus Settings",
"description": "",
"items": [
{
"key": "ToyotaAutoHold",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnhancedBsm",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Prius TSS2 BSM and some tssp",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaTSS2Long",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: custom longitudinal for TSS2",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaDriveMode",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Enable drive mode btn link",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnforceStockLongitudinal",
"widget": "toggle",

View File

@@ -43,6 +43,37 @@ sections:
label: Relaxed
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonalityEnabled
widget: toggle
title: Enable Acceleration Profile
description: When off, sunnypilot uses its default (Normal) acceleration behavior. Enable to use the Acceleration Profile below.
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonality
widget: multiple_button
title: Acceleration Profile
description: Controls how quickly sunnypilot accelerates while preserving braking and stop behavior.
options:
- value: 0
label: Eco
- value: 1
label: Normal
- value: 2
label: Sport
enablement:
- $ref: '#/macros/longitudinal'
- key: RadarDistance
widget: toggle
title: Radar Lead Smoothing
description: Holds radar leads through brief dropouts, ignores close-range radar ghosts, and eases braking when the radar lead target jumps, for smoother lead following. Emergency braking is unaffected.
enablement:
- $ref: '#/macros/longitudinal'
- key: SmoothBraking
widget: toggle
title: Smooth Brake Onset
description: Eases the start of light braking so it builds in gradually instead of biting sharply. Hard and emergency braking are unaffected.
enablement:
- $ref: '#/macros/longitudinal'
- key: IntelligentCruiseButtonManagement
widget: toggle
title: Intelligent Cruise Button Management (ICBM) (Alpha)

View File

@@ -51,6 +51,16 @@ sections:
key: LagdToggle
equals: true
- $ref: '#/macros/advanced_only'
- key: PlanplusControl
widget: option
title: Plan Plus Controls
description: Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover
to lane center; too high and it will ping-pong.
min: 0.0
max: 2.0
step: 0.1
enablement:
- $ref: '#/macros/advanced_only'
- id: lateral_control
title: Lateral Control
description: Neural network lateral control for supported models

View File

@@ -242,6 +242,10 @@ sections:
label: 2 seconds
- value: 5
label: 3 seconds
- key: RoadEdgeLaneChangeEnabled
widget: toggle
title: 'Block Lane Change: Road Edge Detection'
description: Blocks lane change when the model sees a road edge on the side you signal.
- key: AutoLaneChangeBsmDelay
widget: toggle
title: 'Auto Lane Change: Delay with Blind Spot'

View File

@@ -60,6 +60,30 @@ sections:
title: Toyota / Lexus Settings
description: ''
items:
- key: ToyotaAutoHold
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnhancedBsm
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Prius TSS2 BSM and some tssp'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaTSS2Long
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: custom longitudinal for TSS2'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaDriveMode
widget: toggle
needs_onroad_cycle: true
title: Enable drive mode btn link
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnforceStockLongitudinal
widget: toggle
needs_onroad_cycle: true

View File

@@ -45,8 +45,9 @@ class ScrollState(Enum):
class GuiScrollPanel2:
def __init__(self, horizontal: bool = True) -> None:
def __init__(self, horizontal: bool = True, handle_out_of_bounds: bool = True) -> None:
self._horizontal = horizontal
self._handle_out_of_bounds = handle_out_of_bounds
self._state = ScrollState.STEADY
self._offset: rl.Vector2 = rl.Vector2(0, 0)
self._initial_click_event: MouseEvent | None = None
@@ -98,7 +99,7 @@ class GuiScrollPanel2:
# simple exponential return if out of bounds
# out of bounds is handled by snapping, so skip if set
out_of_bounds = self.get_offset() > max_offset or self.get_offset() < min_offset
if out_of_bounds and snap_target is None:
if out_of_bounds and snap_target is None and self._handle_out_of_bounds:
target = max_offset if self.get_offset() > max_offset else min_offset
dt = rl.get_frame_time() or 1e-6

View File

@@ -75,7 +75,6 @@ class _Scroller(Widget):
self._items: list[Widget] = []
self._horizontal = horizontal
self._snap_items = snap_items
assert not self._snap_items or self._horizontal, "Snapping is only supported for horizontal scrolling"
self._spacing = spacing
self._pad = pad
@@ -191,8 +190,12 @@ class _Scroller(Widget):
snap_target: float | None = None
if self._snap_items and visible_items and self._scrolling_to[0] is None:
# TODO: this doesn't handle two small buttons at the edges well
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
if self._horizontal:
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
else:
center_pos = self._rect.y + self._rect.height / 2
closest_delta_pos = min((((item.rect.y + item.rect.height / 2) - center_pos) for item in visible_items), key=abs)
snap_target = self.scroll_panel.get_offset() - closest_delta_pos
return self.scroll_panel.update(self._rect, content_size, snap_target=snap_target)

View File

@@ -0,0 +1,58 @@
<?xml version='1.0' encoding='UTF-8'?>
<root>
<tabbed_widget name="Main Window" parent="main_window">
<Tab tab_name="Acceleration Personality" containers="1">
<Container>
<DockSplitter orientation="-" sizes="0.25;0.25;0.25;0.25" count="4">
<DockArea name="accel / decel">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="2.000000" left="150.000000" bottom="-4.000000"/>
<limitY/>
<curve name="/longitudinalPlan/aTarget" color="#1f77b4"/>
<curve name="/carState/aEgo" color="#f14cc1"/>
<curve name="/longitudinalPlanSP/acceleration/maxAccel" color="#1ac938"/>
</plot>
</DockArea>
<DockArea name="accel personality (gating)">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="1.300000" left="150.000000" bottom="-0.200000"/>
<limitY/>
<curve name="/longitudinalPlanSP/acceleration/jerkFactorScale" color="#9467bd"/>
<curve name="/longitudinalPlanSP/acceleration/tFollowOffset" color="#ff7f0e"/>
<curve name="/longitudinalPlanSP/acceleration/personality" color="#d62728"/>
<curve name="/longitudinalPlanSP/acceleration/enabled" color="#17becf"/>
</plot>
</DockArea>
<DockArea name="lead">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="150.000000" left="150.000000" bottom="-25.000000"/>
<limitY/>
<curve name="/radarState/leadOne/dRel" color="#1f77b4"/>
<curve name="/radarState/leadOne/vRel" color="#d62728"/>
</plot>
</DockArea>
<DockArea name="speed">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="25.000000" left="150.000000" bottom="0.000000"/>
<limitY/>
<curve name="/carState/vEgo" color="#1ac938"/>
</plot>
</DockArea>
</DockSplitter>
</Container>
</Tab>
<currentTabIndex index="0"/>
</tabbed_widget>
<use_relative_time_offset enabled="1"/>
<!-- - - - - - - - - - - - - - - -->
<!-- - - - - - - - - - - - - - - -->
<Plugins>
<plugin ID="DataLoad Rlog"/>
<plugin ID="Cereal Subscriber"/>
</Plugins>
<!-- - - - - - - - - - - - - - - -->
<!-- - - - - - - - - - - - - - - -->
<customMathEquations/>
<snippets/>
<!-- - - - - - - - - - - - - - - -->
</root>