Compare commits

...

15 Commits
master ... tn

Author SHA1 Message Date
rav4kumar
bba203f755 fix(long): pass blended braking through accel personality 2026-06-08 13:17:54 -07:00
rav4kumar
89b72a23f4 feat(long): smooth eco accel ramp 2026-06-08 13:07:42 -07:00
rav4kumar
c02ca5f9b5 ref 2026-06-08 13:04:24 -07:00
rav4kumar
23a5f09cc2 feat(long): RadarDistance lead 2026-06-08 13:04:23 -07:00
rav4kumar
e8f02a1e4a feat(long): acceleration personality smooth decel 2026-06-08 13:04:22 -07:00
rav4kumar
83de89e253 feat(dec): rework dynamic experimental controller 2026-06-06 11:45:24 -07:00
rav4kumar
04224e8747 Add custom params to sunnylink settings 2026-06-06 10:31:17 -07:00
rav4kumar
6012ebb7c7 ref 2026-06-06 10:30:46 -07:00
rav4kumar
e91dbe351e ref 2026-06-05 11:29:37 -07:00
rav4kumar
e25061fd08 fix mapd scorll 2026-06-05 11:28:47 -07:00
rav4kumar
baf56ae324 feat/relc 2026-06-05 11:06:06 -07:00
rav4kumar
9badd3fa40 mici-sla-ui 2026-06-05 11:05:29 -07:00
rav4kumar
2220e7fc11 point the submodule 2026-06-05 11:04:53 -07:00
rav4kumar
e862935209 toyota sp link and drive mode btn support 2026-06-05 11:04:49 -07:00
rav4kumar
9bd504a5cb abh, bsm 2026-06-05 11:04:43 -07:00
50 changed files with 2539 additions and 382 deletions

1
.gitmodules vendored
View File

@@ -4,6 +4,7 @@
[submodule "opendbc"]
path = opendbc_repo
url = https://github.com/sunnypilot/opendbc.git
branch = tn
[submodule "msgq"]
path = msgq_repo
url = https://github.com/commaai/msgq.git

View File

@@ -1,6 +1,6 @@
Version 0.11.2 (2026-06-15)
========================
* Acceleration Profile: select Eco, Normal, or Sport personality to shape acceleration briskness and gentle (ACC) deceleration
Version 0.11.1 (2026-05-18)
========================

View File

@@ -194,6 +194,7 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
aTarget @5 :Float32;
events @6 :List(OnroadEventSP.Event);
e2eAlerts @7 :E2eAlerts;
acceleration @8 :Acceleration;
struct DynamicExperimentalControl {
state @0 :DynamicExperimentalControlState;
@@ -296,6 +297,18 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
greenLightAlert @0 :Bool;
leadDepartAlert @1 :Bool;
}
struct Acceleration {
personality @0 :AccelerationPersonality;
enabled @1 :Bool;
maxAccel @2 :Float32;
}
enum AccelerationPersonality {
eco @0;
normal @1;
sport @2;
}
}
struct OnroadEventSP @0xda96579883444c35 {
@@ -342,6 +355,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
laneChangeRoadEdge @24;
}
}
@@ -448,6 +462,8 @@ struct LiveMapDataSP @0xf416ec09499d9d19 {
struct ModelDataV2SP @0xa1680744031fdb2d {
laneTurnDirection @0 :TurnDirection;
leftLaneChangeEdgeBlock @1 :Bool;
rightLaneChangeEdgeBlock @2 :Bool;
enum TurnDirection {
none @0;

View File

@@ -179,12 +179,19 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RoadEdgeLaneChangeEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowTurnSignals", {PERSISTENT | BACKUP, BOOL, "0"}},
{"StandstillTimer", {PERSISTENT | BACKUP, BOOL, "0"}},
{"TrueVEgoUI", {PERSISTENT | BACKUP, BOOL, "0"}},
// toyota specific params
{"ToyotaAutoHold", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaEnhancedBsm", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaTSS2Long", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaDriveMode", {PERSISTENT | BACKUP, BOOL, "0"}},
// MADS params
{"Mads", {PERSISTENT | BACKUP, BOOL, "1"}},
{"MadsMainCruiseAllowed", {PERSISTENT | BACKUP, BOOL, "1"}},
@@ -226,6 +233,9 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"ToyotaStopAndGoHack", {PERSISTENT | BACKUP, BOOL, "0"}},
{"DynamicExperimentalControl", {PERSISTENT | BACKUP, BOOL, "0"}},
{"AccelPersonalityEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"AccelPersonality", {PERSISTENT | BACKUP, INT, "1"}},
{"RadarDistance", {PERSISTENT | BACKUP, BOOL, "0"}},
{"BlindSpot", {PERSISTENT | BACKUP, BOOL, "0"}},
// sunnypilot model params

View File

@@ -10,7 +10,7 @@ from cereal import car, log, custom
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog, ForwardingHandler
from opendbc.safety import ALTERNATIVE_EXPERIENCE
from opendbc.car import DT_CTRL, structs
from opendbc.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from opendbc.car.carlog import carlog
@@ -121,7 +121,13 @@ class Car:
self.CI, self.CP, self.CP_SP = CI, CI.CP, CI.CP_SP
self.RI = RI
# set alternative experiences from parameters
sp_toyota_auto_brake_hold = self.params.get_bool("ToyotaAutoHold")
self.CP.alternativeExperience = 0
if sp_toyota_auto_brake_hold:
self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.ALLOW_AEB
# mads
set_alternative_experience(self.CP, self.CP_SP, self.params)
set_car_specific_params(self.CP, self.CP_SP, self.params)

View File

@@ -56,7 +56,7 @@ class DesireHelper:
def get_lane_change_direction(CS):
return LaneChangeDirection.left if CS.leftBlinker else LaneChangeDirection.right
def update(self, carstate, lateral_active, lane_change_prob):
def update(self, carstate, lateral_active, lane_change_prob, left_edge_detected=False, right_edge_detected=False):
self.alc.update_params()
self.lane_turn_controller.update_params()
v_ego = carstate.vEgo
@@ -88,8 +88,8 @@ class DesireHelper:
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = (((carstate.leftBlindspot or left_edge_detected) and self.lane_change_direction == LaneChangeDirection.left) or
((carstate.rightBlindspot or right_edge_detected) and self.lane_change_direction == LaneChangeDirection.right))
self.alc.update_lane_change(blindspot_detected, carstate.brakePressed)

View File

@@ -70,6 +70,9 @@ def get_jerk_factor(personality=log.LongitudinalPersonality.standard):
raise NotImplementedError("Longitudinal personality not supported")
T_FOLLOW_MIN = 0.9 # safety floor for follow time after any sunnypilot tier offset
def get_T_FOLLOW(personality=log.LongitudinalPersonality.standard):
if personality==log.LongitudinalPersonality.relaxed:
return 1.75

View File

@@ -110,7 +110,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_clip = [ACCEL_MIN, get_max_accel(v_ego)]
accel_clip = [ACCEL_MIN, self.accel.get_max_accel(v_ego)]
steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg
accel_clip = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_clip, self.CP)
@@ -138,7 +138,8 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
self.mpc.set_weights(prev_accel_constraint, personality=sm['selfdriveState'].personality)
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
self.mpc.update(sm['radarState'], v_cruise, personality=sm['selfdriveState'].personality)
radarstate = self.smooth_radarstate(sm['radarState'])
self.mpc.update(radarstate, v_cruise, personality=sm['selfdriveState'].personality)
self.v_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
@@ -160,7 +161,8 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
output_a_target_e2e = sm['modelV2'].action.desiredAcceleration
output_should_stop_e2e = sm['modelV2'].action.shouldStop
if self.is_e2e(sm):
is_e2e = self.is_e2e(sm)
if is_e2e:
output_a_target = min(output_a_target_e2e, output_a_target_mpc)
self.output_should_stop = output_should_stop_e2e or output_should_stop_mpc
if output_a_target < output_a_target_mpc:
@@ -169,8 +171,13 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
output_a_target = output_a_target_mpc
self.output_should_stop = output_should_stop_mpc
for idx in range(2):
accel_clip[idx] = np.clip(accel_clip[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
output_a_target = self.accel.smooth_target_accel(output_a_target, self.a_desired_trajectory, CONTROL_N_T_IDX,
self.output_should_stop or force_slow_decel, reset=reset_state, stock_brake=is_e2e)
# Lower (braking) bound and the ceiling's downward slew stay at the stock rate; only the ceiling's
# upward slew is tier-dependent (Acceleration Personality).
accel_clip[0] = np.clip(accel_clip[0], self.prev_accel_clip[0] - 0.05, self.prev_accel_clip[0] + 0.05)
accel_clip[1] = np.clip(accel_clip[1], self.prev_accel_clip[1] - 0.05, self.prev_accel_clip[1] + self.accel.get_rise_rate())
self.output_a_target = np.clip(output_a_target, accel_clip[0], accel_clip[1])
self.prev_accel_clip = accel_clip

View File

@@ -321,9 +321,16 @@ class SelfdriveD(CruiseHelper):
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
mdv2sp = self.sm['modelDataV2SP']
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
(CS.rightBlindspot and direction == LaneChangeDirection.right):
self.events.add(EventName.laneChangeBlocked)
elif (mdv2sp.leftLaneChangeEdgeBlock and direction == LaneChangeDirection.left) or \
(mdv2sp.rightLaneChangeEdgeBlock and direction == LaneChangeDirection.right):
self.events_sp.add(custom.OnroadEventSP.EventName.laneChangeRoadEdge)
else:
if direction == LaneChangeDirection.left:
self.events.add(EventName.preLaneChangeLeft)

View File

@@ -35,6 +35,13 @@ DESCRIPTIONS = {
'RecordFront': tr_noop("Upload data from the driver facing camera and help improve the driver monitoring algorithm."),
"IsMetric": tr_noop("Display speed in km/h instead of mph."),
"RecordAudio": tr_noop("Record and store microphone audio while driving. The audio will be included in the dashcam video in comma connect."),
"AccelPersonalityEnabled": tr_noop(
"Enable the Acceleration Profile selector below. When off, sunnypilot uses its default (Normal) acceleration and deceleration behavior."
),
"AccelPersonality": tr_noop(
"Sets how sunnypilot accelerates and eases into comfort braking. Eco is gentler, Normal matches the default profile, " +
"and Sport responds more briskly while still smoothing non-emergency deceleration."
),
}
@@ -64,6 +71,12 @@ class TogglesLayout(Widget):
"disengage_on_accelerator.png",
False,
),
"AccelPersonalityEnabled": (
lambda: tr("Enable Acceleration Profile"),
DESCRIPTIONS["AccelPersonalityEnabled"],
"speed_limit.png",
False,
),
"IsLdwEnabled": (
lambda: tr("Enable Lane Departure Warnings"),
DESCRIPTIONS["IsLdwEnabled"],
@@ -106,6 +119,16 @@ class TogglesLayout(Widget):
icon="speed_limit.png"
)
self._accel_personality_setting = multiple_button_item(
lambda: tr("Acceleration Profile"),
lambda: tr(DESCRIPTIONS["AccelPersonality"]),
buttons=[lambda: tr("Eco"), lambda: tr("Normal"), lambda: tr("Sport")],
button_width=300,
callback=self._set_accel_personality,
selected_index=self._params.get("AccelPersonality", return_default=True),
icon="speed_limit.png"
)
self._toggles = {}
self._locked_toggles = set()
for param, (title, desc, icon, needs_restart) in self._toggle_defs.items():
@@ -139,6 +162,10 @@ class TogglesLayout(Widget):
if param == "DisengageOnAccelerator":
self._toggles["LongitudinalPersonality"] = self._long_personality_setting
# insert acceleration profile selector after its enable toggle
if param == "AccelPersonalityEnabled":
self._toggles["AccelPersonality"] = self._accel_personality_setting
self._update_experimental_mode_icon()
self._scroller = Scroller(list(self._toggles.values()), line_separator=True, spacing=0)
@@ -176,11 +203,13 @@ class TogglesLayout(Widget):
self._toggles["ExperimentalMode"].action_item.set_enabled(True)
self._toggles["ExperimentalMode"].set_description(e2e_description)
self._long_personality_setting.action_item.set_enabled(True)
self._accel_personality_setting.action_item.set_enabled(True)
else:
# no long for now
self._toggles["ExperimentalMode"].action_item.set_enabled(False)
self._toggles["ExperimentalMode"].action_item.set_state(False)
self._long_personality_setting.action_item.set_enabled(False)
self._accel_personality_setting.action_item.set_enabled(False)
self._params.remove("ExperimentalMode")
unavailable = tr("Experimental mode is currently unavailable on this car since the car's stock ACC is used for longitudinal control.")
@@ -247,3 +276,6 @@ class TogglesLayout(Widget):
def _set_longitudinal_personality(self, button_index: int):
self._params.put("LongitudinalPersonality", button_index, block=True)
def _set_accel_personality(self, button_index: int):
self._params.put("AccelPersonality", button_index, block=True)

View File

@@ -13,6 +13,7 @@ from openpilot.system.ui.lib.application import gui_app
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.settings import SettingsLayoutSP as SettingsLayout
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad import OnroadViewContainerSP as AugmentedRoadView
ONROAD_DELAY = 2.5 # seconds
@@ -118,13 +119,15 @@ class MiciMainLayout(Scroller):
# FIXME: these two pops can interrupt user interacting in the settings
if self._onroad_time_delay is not None and rl.get_time() - self._onroad_time_delay >= ONROAD_DELAY:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._onroad_time_delay = None
# When car leaves standstill, pop nav stack and scroll to onroad
CS = ui_state.sm["carState"]
if not CS.standstill and self._prev_standstill:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._prev_standstill = CS.standstill
def _on_interactive_timeout(self):

View File

@@ -51,11 +51,17 @@ class LaneChangeSettingsLayout(Widget):
description=lambda: tr("Toggle to enable a delay timer for seamless lane changes when blind spot monitoring " +
"(BSM) detects a obstructing vehicle, ensuring safe maneuvering."),
)
self._road_edge_block = toggle_item_sp(
param="RoadEdgeLaneChangeEnabled",
title=lambda: tr("Block Lane Change: Road Edge Detection"),
description=lambda: tr("Blocks the lane change if the model sees a road edge on your signaled side."),
)
items = [
self._lane_change_timer,
LineSeparatorSP(40),
self._bsm_delay,
self._road_edge_block,
]
return items

View File

@@ -0,0 +1,13 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
class MiciMainLayoutSP(MiciMainLayout):
def _should_auto_scroll_to_onroad(self) -> bool:
return not self._onroad_layout.is_on_info_panel()

View File

@@ -0,0 +1,63 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import gui_app
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroller_sp import ScrollerSP
from openpilot.selfdrive.ui.sunnypilot.mici.onroad.augmented_road_view import AugmentedRoadViewSP
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad_info_panel import OnroadInfoPanel
CONFIDENCE_BALL_VISIBLE_RATIO = 0.4
HORIZONTAL_SETTLE_PX = 5
HORIZONTAL_RESET_RATIO = 0.5
class OnroadViewContainerSP(ScrollerSP):
def __init__(self, bookmark_callback=None):
super().__init__(horizontal=False, snap_items=True, spacing=0, pad=0, scroll_indicator=False, edge_shadows=False)
self.road_view = AugmentedRoadViewSP(bookmark_callback=bookmark_callback)
self.onroad_info_panel = OnroadInfoPanel(bookmark_callback=bookmark_callback)
self._scroller.add_widgets([
self.road_view,
self.onroad_info_panel,
])
self._scroller.set_reset_scroll_at_show(False)
self._scroller.set_scrolling_enabled(lambda: abs(self.rect.x) < HORIZONTAL_SETTLE_PX)
for child in (self.road_view, self.onroad_info_panel):
inner_touch_valid = child._touch_valid_callback
child.set_touch_valid_callback(
lambda inner=inner_touch_valid: self._touch_valid() and (inner() if inner else True)
)
def set_rect(self, rect: rl.Rectangle):
super().set_rect(rect)
self.road_view.set_rect(rect)
self.onroad_info_panel.set_rect(rect)
return self
def is_swiping_left(self) -> bool:
return self.road_view.is_swiping_left() or self.onroad_info_panel.is_swiping_left()
def set_click_callback(self, callback) -> None:
self.road_view.set_click_callback(callback)
self.onroad_info_panel.set_click_callback(callback)
def is_on_info_panel(self) -> bool:
"""True when scrolled past halfway toward onroad_info_panel (used by main layout
to skip auto-pop-back-to-camera while user is reading the info panel)."""
return abs(self._scroller.scroll_panel.get_offset()) > self._rect.height / 2
def _render(self, rect: rl.Rectangle):
if abs(self.rect.x) > gui_app.width * HORIZONTAL_RESET_RATIO:
self._scroller.scroll_panel.set_offset(0)
vertical_offset = self._scroller.scroll_panel.get_offset()
show_ball = abs(vertical_offset) < rect.height * CONFIDENCE_BALL_VISIBLE_RATIO
self.road_view.set_show_confidence_ball(show_ball)
super()._render(rect)

View File

@@ -0,0 +1,324 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from dataclasses import dataclass
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.application import MousePos
from openpilot.system.ui.widgets import Widget
from openpilot.selfdrive.ui.mici.onroad.alert_renderer import AlertRenderer
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import BookmarkIcon
METER_TO_KM = 0.001
METER_TO_MILE = 0.000621371
@dataclass(frozen=True)
class OnroadInfoPanelColors:
white: rl.Color = rl.WHITE
black: rl.Color = rl.BLACK
red: rl.Color = rl.Color(255, 0, 0, 255)
green: rl.Color = rl.Color(0, 255, 0, 255)
grey: rl.Color = rl.Color(190, 195, 190, 255)
light_grey: rl.Color = rl.Color(200, 200, 200, 255)
dark_grey: rl.Color = rl.Color(100, 100, 100, 255)
bg_dark: rl.Color = rl.Color(0, 0, 0, 255)
card_bg: rl.Color = rl.Color(50, 50, 50, 200)
badge_bg: rl.Color = rl.Color(60, 60, 60, 255)
COLORS = OnroadInfoPanelColors()
class OnroadInfoPanel(Widget):
def __init__(self, bookmark_callback=None):
super().__init__()
self.speed_limit: float = 0.0
self.speed_limit_valid: bool = False
self.speed_limit_offset: float = 0.0
self.next_speed_limit: float = 0.0
self.next_speed_limit_distance: float = 0.0
self.road_name: str = ""
self.current_speed: float = 0.0
self.set_speed: float = 0.0
self.cruise_enabled: bool = False
self._sign_slide: float = 0.0
self._font_bold: rl.Font = gui_app.font(FontWeight.BOLD)
self._font_semi_bold: rl.Font = gui_app.font(FontWeight.SEMI_BOLD)
self._font_medium: rl.Font = gui_app.font(FontWeight.MEDIUM)
self._marquee_offset: float = 0.0
self._marquee_direction: int = 1
self._marquee_pause_timer: float = 0.0
self._marquee_speed: float = 40.0
self._marquee_pause_duration: float = 1.5
self._alert_renderer = AlertRenderer()
self._alert_alpha_filter = FirstOrderFilter(0, 0.05, 1 / gui_app.target_fps)
self._bookmark_icon = BookmarkIcon(bookmark_callback)
def is_swiping_left(self) -> bool:
return self._bookmark_icon.is_swiping_left()
def _handle_mouse_release(self, mouse_pos: MousePos) -> None:
# Mirror stock AugmentedRoadView: suppress click while bookmark gesture active
if not self._bookmark_icon.interacting():
super()._handle_mouse_release(mouse_pos)
def _update_state(self) -> None:
sm = ui_state.sm
speed_conv = CV.MS_TO_KPH if ui_state.is_metric else CV.MS_TO_MPH
if sm.valid["longitudinalPlanSP"]:
lp_sp = sm["longitudinalPlanSP"]
resolver = lp_sp.speedLimit.resolver
self.speed_limit = resolver.speedLimit * speed_conv
self.speed_limit_valid = resolver.speedLimitValid
self.speed_limit_offset = resolver.speedLimitOffset * speed_conv
if sm.valid["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
self.next_speed_limit = lmd.speedLimitAhead * speed_conv
self.next_speed_limit_distance = lmd.speedLimitAheadDistance
self.road_name = lmd.roadName
if sm.updated["carState"]:
self.current_speed = sm["carState"].vEgo * speed_conv
if sm.valid["carState"] and sm.valid["controlsState"]:
self.cruise_enabled = sm["carState"].cruiseState.enabled
v_cruise_cluster = sm["carState"].vCruiseCluster
set_speed_kph = sm["controlsState"].vCruiseDEPRECATED if v_cruise_cluster == 0.0 else v_cruise_cluster
self.set_speed = set_speed_kph * (METER_TO_MILE / METER_TO_KM) if not ui_state.is_metric else set_speed_kph
def _render(self, rect: rl.Rectangle) -> None:
self._update_state()
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), COLORS.bg_dark)
margin = 20
mid_y = rect.y + rect.height / 2
left_x = rect.x + margin
if self.cruise_enabled:
unit = tr("MAX")
display_speed = self.set_speed
else:
unit = tr("km/h") if ui_state.is_metric else tr("MPH")
display_speed = self.current_speed
speed_val = str(round(display_speed))
if self.speed_limit_valid and display_speed > self.speed_limit:
speed_color = COLORS.red
else:
speed_color = COLORS.white
rl.draw_text_ex(self._font_semi_bold, unit, rl.Vector2(left_x, mid_y - 95), 38, 0, COLORS.grey)
rl.draw_text_ex(self._font_bold, speed_val, rl.Vector2(left_x, mid_y - 60), 110, 0, speed_color)
sign_width = 135
sign_height = 135 if ui_state.is_metric else 175
has_next = self.next_speed_limit > 0 and self.next_speed_limit != self.speed_limit
target_slide = 1.0 if has_next else 0.0
slide_speed = 3.0 * rl.get_frame_time()
if self._sign_slide < target_slide:
self._sign_slide = min(self._sign_slide + slide_speed, target_slide)
elif self._sign_slide > target_slide:
self._sign_slide = max(self._sign_slide - slide_speed, target_slide)
next_w = int(sign_width * 0.7)
next_h = int(sign_height * 0.7)
next_peek = int(next_w * 0.85) + 5
centered_x = rect.x + rect.width - sign_width - margin
shifted_x = rect.x + rect.width - sign_width - margin - next_peek
sign_x = centered_x + (shifted_x - centered_x) * self._sign_slide
sign_y = rect.y + (rect.height - sign_height) / 2
road_y = mid_y + 55
road_width = sign_x - left_x - margin
self._draw_road_name(left_x, road_y, road_width)
if has_next and self._sign_slide > 0.01:
next_val = str(round(self.next_speed_limit))
dist_str = self._format_distance(self.next_speed_limit_distance)
next_x = sign_x + sign_width - int(next_w * 0.15)
next_y = sign_y + (sign_height - next_h) / 2
next_speed_color = COLORS.black
if ui_state.is_metric:
self._draw_vienna_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
else:
self._draw_mutcd_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
dist_size = measure_text_cached(self._font_medium, dist_str, 24)
rl.draw_text_ex(self._font_medium, dist_str, rl.Vector2(next_x + next_w / 2 - dist_size.x / 2, next_y + next_h + 4), 24, 0, COLORS.grey)
self._draw_speed_limit_sign(sign_x, sign_y, sign_width, sign_height)
if self.speed_limit_offset != 0 and self.speed_limit_valid:
offset_val = str(abs(round(self.speed_limit_offset)))
badge_sz = 42
badge_x = sign_x + sign_width - badge_sz * 0.85
badge_y = sign_y - badge_sz * 0.25
if ui_state.is_metric:
badge_r = badge_sz / 2
badge_cx = badge_x + badge_r
badge_cy = badge_y + badge_r
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r + 2, COLORS.dark_grey)
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r, COLORS.badge_bg)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(badge_cx, badge_cy), COLORS.white)
else:
mutcd_badge_x = sign_x + sign_width - badge_sz * 0.65
mutcd_badge_y = sign_y - badge_sz * 0.50
badge_rect = rl.Rectangle(mutcd_badge_x, mutcd_badge_y, badge_sz, badge_sz)
rl.draw_rectangle_rounded(badge_rect, 0.25, 10, COLORS.badge_bg)
rl.draw_rectangle_rounded_lines_ex(badge_rect, 0.25, 10, 2, COLORS.dark_grey)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(mutcd_badge_x + badge_sz / 2, mutcd_badge_y + badge_sz / 2), COLORS.white)
# SCC
speed_size = measure_text_cached(self._font_bold, speed_val, 110)
scc_x = left_x + speed_size.x + 30
scc_y = mid_y - 50
self._draw_scc_icons(scc_x, scc_y)
self._bookmark_icon.render(rect)
if ui_state.started:
alert_obj, no_alert = self._alert_renderer.will_render()
self._alert_alpha_filter.update(0 if no_alert else 1)
alpha = self._alert_alpha_filter.x
if alpha > 0.01:
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), rl.Color(0, 0, 0, int(150 * alpha)))
self._alert_renderer.render(rect)
def _draw_scc_icons(self, x: float, y: float) -> None:
sm = ui_state.sm
if not sm.valid["longitudinalPlanSP"]:
return
scc = sm["longitudinalPlanSP"].smartCruiseControl
box_w, box_h = 100, 36
gap = 6
drawn = 0
for label, active in [("SCC-V", scc.vision.active), ("SCC-M", scc.map.active)]:
if not active:
continue
bx = x
by = y + drawn * (box_h + gap)
rl.draw_rectangle_rounded(rl.Rectangle(bx, by, box_w, box_h), 0.3, 10, COLORS.green)
self._draw_text_centered(self._font_bold, label, 20, rl.Vector2(bx + box_w / 2, by + box_h / 2), COLORS.black)
drawn += 1
def _draw_speed_limit_sign(self, x: float, y: float, sign_width: float, sign_height: float) -> None:
speed_str = str(round(self.speed_limit)) if self.speed_limit_valid and self.speed_limit > 0 else "--"
speed_color = COLORS.black if not self.speed_limit_valid or self.current_speed <= self.speed_limit else COLORS.red
if ui_state.is_metric:
self._draw_vienna_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
else:
self._draw_mutcd_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
def _draw_road_name(self, x: float, y: float, width: float) -> None:
road_display = self.road_name if self.road_name else "--"
font_size = 30
road_size = measure_text_cached(self._font_semi_bold, road_display, font_size)
text_width = road_size.x
if text_width <= width:
self._marquee_offset = 0.0
self._marquee_direction = 1
self._marquee_pause_timer = 0.0
rl.draw_text_ex(self._font_semi_bold, road_display, rl.Vector2(x, y), font_size, 0, COLORS.white)
else:
overflow = text_width - width
dt = rl.get_frame_time()
if self._marquee_pause_timer > 0:
self._marquee_pause_timer -= dt
else:
self._marquee_offset += self._marquee_direction * self._marquee_speed * dt
if self._marquee_offset >= overflow:
self._marquee_offset = overflow
self._marquee_direction = -1
self._marquee_pause_timer = self._marquee_pause_duration
elif self._marquee_offset <= 0:
self._marquee_offset = 0
self._marquee_direction = 1
self._marquee_pause_timer = self._marquee_pause_duration
rl.begin_scissor_mode(int(x), int(y), int(width), int(road_size.y + 4))
text_pos = rl.Vector2(x - self._marquee_offset, y)
rl.draw_text_ex(self._font_semi_bold, road_display, text_pos, font_size, 0, COLORS.white)
rl.end_scissor_mode()
def _draw_vienna_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
center = rl.Vector2(x + width / 2, y + height / 2)
outer_radius = min(width, height) / 2
rl.draw_circle_v(center, outer_radius, COLORS.white)
ring_width = outer_radius * 0.18
rl.draw_ring(center, outer_radius - ring_width, outer_radius, 0, 360, 36, COLORS.red)
font_size = outer_radius * (0.7 if len(speed_str) >= 3 else 0.9)
text_size = measure_text_cached(self._font_bold, speed_str, int(font_size))
text_pos = rl.Vector2(center.x - text_size.x / 2, center.y - text_size.y / 2)
rl.draw_text_ex(self._font_bold, speed_str, text_pos, font_size, 0, speed_color)
def _draw_mutcd_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
sign_rect = rl.Rectangle(x, y, width, height)
rl.draw_rectangle_rounded(sign_rect, 0.35, 10, COLORS.white)
inset = max(4, width * 0.05)
inner_rect = rl.Rectangle(x + inset, y + inset, width - inset * 2, height - inset * 2)
outer_radius = 0.35 * width / 2.0
inner_radius = outer_radius - inset
inner_roundness = inner_radius / (inner_rect.width / 2.0)
rl.draw_rectangle_rounded_lines_ex(inner_rect, inner_roundness, 10, 3, COLORS.black)
mid_x = x + width / 2
label_size = max(18, int(width * 0.26))
if is_upcoming:
self._draw_text_centered(self._font_bold, tr("AHEAD"), label_size, rl.Vector2(mid_x, y + height * 0.27), COLORS.black)
else:
self._draw_text_centered(self._font_bold, tr("SPEED"), label_size, rl.Vector2(mid_x, y + height * 0.20), COLORS.black)
self._draw_text_centered(self._font_bold, tr("LIMIT"), label_size, rl.Vector2(mid_x, y + height * 0.40), COLORS.black)
speed_font_size = int(width * 0.52) if len(speed_str) >= 3 else int(width * 0.62)
self._draw_text_centered(self._font_bold, speed_str, speed_font_size, rl.Vector2(mid_x, y + height * 0.72), speed_color)
def _draw_text_centered(self, font, text, size, pos_center, color):
sz = measure_text_cached(font, text, size)
rl.draw_text_ex(font, text, rl.Vector2(pos_center.x - sz.x / 2, pos_center.y - sz.y / 2), size, 0, color)
def _format_distance(self, distance: float) -> str:
if ui_state.is_metric:
if distance < 50:
return tr("Near")
if distance >= 1000:
return f"{distance * METER_TO_KM:.1f}" + tr("km")
if distance < 200:
rounded = max(10, int(distance / 10) * 10)
else:
rounded = int(distance / 100) * 100
return str(rounded) + tr("m")
else:
distance_mi = distance * METER_TO_MILE
if distance_mi < 0.1:
return tr("Near")
return f"{distance_mi:.1f}" + tr("mi")

View File

@@ -0,0 +1,30 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import AugmentedRoadView
class _SuppressedConfidenceBall:
def render(self, *_):
pass
class AugmentedRoadViewSP(AugmentedRoadView):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._show_confidence_ball: bool = True
self._real_confidence_ball = self._confidence_ball
self._confidence_ball = _SuppressedConfidenceBall()
def set_show_confidence_ball(self, show: bool) -> None:
self._show_confidence_ball = show
def _render(self, rect: rl.Rectangle) -> None:
super()._render(rect)
if self._show_confidence_ball:
self._real_confidence_ball.render(self.rect)

View File

@@ -0,0 +1,34 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import MouseEvent
from openpilot.system.ui.lib.scroll_panel2 import GuiScrollPanel2, ScrollState
class GuiScrollPanel2SP(GuiScrollPanel2):
"""Reject orthogonal-dominant drags so nested scrollers (outer horizontal +
inner vertical) don't both engage on a slightly diagonal swipe.
Implemented as a post-super state rollback rather than reimplementing the
PRESSED state machine — keeps stock behaviour authoritative."""
def _handle_mouse_event(self, mouse_event: MouseEvent, bounds: rl.Rectangle, bounds_size: float,
content_size: float) -> None:
pre_state = self._state
super()._handle_mouse_event(mouse_event, bounds, bounds_size, content_size)
if self._state == ScrollState.MANUAL_SCROLL and pre_state == ScrollState.PRESSED and \
self._initial_click_event is not None:
diff_x = abs(mouse_event.pos.x - self._initial_click_event.pos.x)
diff_y = abs(mouse_event.pos.y - self._initial_click_event.pos.y)
along = diff_x if self._horizontal else diff_y
anti = diff_y if self._horizontal else diff_x
if anti > along:
self._state = ScrollState.STEADY
self._velocity = 0.0
self._velocity_buffer.clear()

View File

@@ -0,0 +1,16 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.system.ui.widgets.scroller import Scroller
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroll_panel_sp import GuiScrollPanel2SP
class ScrollerSP(Scroller):
def __init__(self, **kwargs):
super().__init__(**kwargs)
inner = self._scroller
inner.scroll_panel = GuiScrollPanel2SP(inner._horizontal, handle_out_of_bounds=not inner._snap_items)

View File

@@ -10,6 +10,9 @@ from openpilot.selfdrive.ui.layouts.main import MainLayout
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
from openpilot.selfdrive.ui.ui_state import ui_state
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.main import MiciMainLayoutSP as MiciMainLayout
BIG_UI = gui_app.big_ui()

View File

@@ -40,6 +40,7 @@ from openpilot.sunnypilot.modeld_v2.camera_offset_helper import CameraOffsetHelp
from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
PROCESS_NAME = "selfdrive.modeld.modeld_tinygrad"
@@ -329,6 +330,7 @@ def main(demo=False):
prev_action = log.ModelDataV2.Action()
DH = DesireHelper()
RELC = RoadEdgeLaneChangeController(DH)
meta_constants = load_meta_constants()
while True:
@@ -433,7 +435,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RELC.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs, v_ego)
mdv2sp_send.modelDataV2SP.leftLaneChangeEdgeBlock = RELC.left_edge_detected
mdv2sp_send.modelDataV2SP.rightLaneChangeEdgeBlock = RELC.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RELC.left_edge_detected, RELC.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
mdv2sp_send.modelDataV2SP.laneTurnDirection = DH.lane_turn_direction

View File

@@ -0,0 +1,146 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections.abc import Sequence
import numpy as np
from cereal import messaging
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot import get_sanitize_int_param
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
NORMAL, PERSONALITY_MIN, PERSONALITY_MAX, A_CRUISE_MAX_BP, A_CRUISE_MAX_V, RISE_RATE, SMOOTH_DECEL_BP, SMOOTH_DECEL_V, \
BRAKE_DEEPENING_JERK, BRAKE_RELEASE_JERK, ACCEL_RISE_JERK, SMOOTH_DECEL_LOOKAHEAD_T, MIN_SMOOTH_BRAKE_NEED, \
HARD_BRAKE_TARGET_ACCEL, HARD_BRAKE_NEED
_ZERO_ACCEL_EPS = 1e-6
class AccelController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
self._CP = CP
self._mpc = mpc
self._params = params or Params()
self._frame = 0
self._enabled: bool = self._params.get_bool("AccelPersonalityEnabled")
self._personality = NORMAL # cereal AccelerationPersonality ordinal
self._v_ego = 0.0
self._last_target_accel = 0.0
self._brake_need = 0.0
self._smooth_active = False
self._bypassed = False
self._read_params()
def _read_params(self) -> None:
self._enabled = self._params.get_bool("AccelPersonalityEnabled")
if not self._enabled:
self._personality = NORMAL
return
self._personality = get_sanitize_int_param("AccelPersonality", PERSONALITY_MIN, PERSONALITY_MAX, self._params)
def update(self, sm: messaging.SubMaster) -> None:
if self._frame % int(1. / DT_MDL) == 0:
self._read_params()
self._v_ego = sm['carState'].vEgo
self._frame += 1
def get_max_accel(self, v_ego: float) -> float:
return float(np.interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_V[self._personality]))
def get_rise_rate(self) -> float:
return RISE_RATE[self._personality]
def get_decel_target(self, brake_need: float) -> float:
return float(np.interp(max(0.0, float(brake_need)), SMOOTH_DECEL_BP, SMOOTH_DECEL_V[self._personality]))
def smooth_target_accel(self, raw_target_accel: float, accel_trajectory: Sequence[float], t_idxs: Sequence[float],
should_stop: bool, reset: bool = False, stock_brake: bool = False) -> float:
raw_target_accel = float(raw_target_accel)
self._brake_need = self._compute_brake_need(raw_target_accel, accel_trajectory, t_idxs)
if reset or not self._enabled:
self._bypassed = False
return self._passthrough(raw_target_accel)
if stock_brake and (raw_target_accel < 0.0 or self._brake_need >= MIN_SMOOTH_BRAKE_NEED):
self._bypassed = False
return self._passthrough(raw_target_accel)
self._bypassed = self._emergency_bypass(raw_target_accel, should_stop)
if self._bypassed:
return self._passthrough(raw_target_accel)
if self._brake_need < MIN_SMOOTH_BRAKE_NEED:
self._smooth_active = False
return self._slew(raw_target_accel)
self._smooth_active = True
return self._slew(self.get_decel_target(self._brake_need))
def _compute_brake_need(self, raw_target_accel: float, accel_trajectory: Sequence[float], t_idxs: Sequence[float]) -> float:
min_accel = float(raw_target_accel)
for accel, t in zip(accel_trajectory, t_idxs, strict=False):
if float(t) <= SMOOTH_DECEL_LOOKAHEAD_T:
min_accel = min(min_accel, float(accel))
return max(0.0, -min_accel)
def _emergency_bypass(self, raw_target_accel: float, should_stop: bool) -> bool:
return (self._mpc.crash_cnt > 0 or should_stop or
raw_target_accel <= HARD_BRAKE_TARGET_ACCEL or self._brake_need >= HARD_BRAKE_NEED)
def _passthrough(self, target_accel: float) -> float:
target_accel = self._clean_accel(target_accel)
self._last_target_accel = target_accel
self._smooth_active = False
return target_accel
def _slew(self, target_accel: float) -> float:
target_accel = float(target_accel)
if target_accel > self._last_target_accel:
smoothed = self._slew_up(target_accel)
else:
step = BRAKE_DEEPENING_JERK[self._personality] * DT_MDL
smoothed = self._clean_accel(max(target_accel, self._last_target_accel - step))
self._last_target_accel = smoothed
return smoothed
def _slew_up(self, target_accel: float) -> float:
if self._last_target_accel < 0.0:
released = min(target_accel, self._last_target_accel + BRAKE_RELEASE_JERK * DT_MDL)
if released <= 0.0:
return self._clean_accel(released)
return self._clean_accel(min(target_accel, ACCEL_RISE_JERK[self._personality] * DT_MDL))
step = ACCEL_RISE_JERK[self._personality] * DT_MDL
return self._clean_accel(min(target_accel, self._last_target_accel + step))
@staticmethod
def _clean_accel(accel: float) -> float:
accel = float(accel)
return 0.0 if abs(accel) < _ZERO_ACCEL_EPS else accel
def enabled(self) -> bool:
return self._enabled
def personality(self):
return self._personality # cereal AccelerationPersonality ordinal
def max_accel(self) -> float:
# Cached value for publishing; publish_longitudinal_plan_sp has no v_ego in scope.
return self.get_max_accel(self._v_ego)
def brake_need(self) -> float:
return self._brake_need
def smooth_active(self) -> bool:
return self._smooth_active
def bypassed(self) -> bool:
return self._bypassed

View File

@@ -0,0 +1,61 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from cereal import custom
# Profile ids come from cereal: eco @0, normal @1, sport @2.
AccelerationPersonality = custom.LongitudinalPlanSP.AccelerationPersonality
ECO = AccelerationPersonality.eco
NORMAL = AccelerationPersonality.normal
SPORT = AccelerationPersonality.sport
PERSONALITY_MIN = min(AccelerationPersonality.schema.enumerants.values())
PERSONALITY_MAX = max(AccelerationPersonality.schema.enumerants.values())
A_CRUISE_MAX_BP = [0., 10., 25., 40.]
# Stock openpilot acceleration ceiling. Normal and disabled mode intentionally match this path.
STOCK_A_CRUISE_MAX_V = [1.6, 1.2, 0.8, 0.6]
STOCK_RISE_RATE = 0.05
# Eco softens launch and cruise/highway roll-on.
A_CRUISE_MAX_V = {
ECO: [1.25, 0.90, 0.45, 0.30],
NORMAL: STOCK_A_CRUISE_MAX_V,
SPORT: [1.8, 1.40, 1.00, 0.75],
}
RISE_RATE = {
ECO: 0.02,
NORMAL: STOCK_RISE_RATE,
SPORT: 0.10,
}
# Positive predicted brake need -> comfort accel target.
SMOOTH_DECEL_BP = [0.0, 0.4, 0.8, 1.2, 1.6, 2.0, 2.4]
SMOOTH_DECEL_V = {
ECO: [0.00, -0.12, -0.28, -0.50, -0.78, -1.05, -1.30],
NORMAL: [0.00, -0.15, -0.35, -0.65, -0.95, -1.25, -1.55],
SPORT: [0.00, -0.18, -0.45, -0.80, -1.15, -1.45, -1.75],
}
BRAKE_DEEPENING_JERK = {
ECO: 0.7,
NORMAL: 0.9,
SPORT: 1.1,
}
BRAKE_RELEASE_JERK = 2.0
ACCEL_RISE_JERK = {
ECO: 0.6,
NORMAL: BRAKE_RELEASE_JERK,
SPORT: BRAKE_RELEASE_JERK,
}
SMOOTH_DECEL_LOOKAHEAD_T = 2.5
MIN_SMOOTH_BRAKE_NEED = 0.05
HARD_BRAKE_TARGET_ACCEL = -2.0
HARD_BRAKE_NEED = 2.6

View File

@@ -0,0 +1,215 @@
import numpy as np
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
AccelerationPersonality, ECO, NORMAL, SPORT, STOCK_RISE_RATE, SMOOTH_DECEL_BP, SMOOTH_DECEL_V, BRAKE_DEEPENING_JERK, \
BRAKE_RELEASE_JERK, ACCEL_RISE_JERK, HARD_BRAKE_TARGET_ACCEL, HARD_BRAKE_NEED
from openpilot.common.realtime import DT_MDL
# Stock openpilot accel ceiling, duplicated independently here so the test fails if the normal tier ever drifts.
STOCK_BP = [0., 10., 25., 40.]
STOCK_V = [1.6, 1.2, 0.8, 0.6]
V_EGO_GRID = [0.0, 5.0, 10.0, 17.0, 25.0, 33.0, 40.0, 50.0]
class MockParams:
def __init__(self, enabled=False, personality=NORMAL):
self._vals = {"AccelPersonalityEnabled": enabled, "AccelPersonality": int(personality)}
def get_bool(self, key):
return bool(self._vals.get(key, False))
def get(self, key, return_default=False):
return self._vals.get(key, 0)
def put(self, key, val, block=False):
self._vals[key] = int(val)
class MockCarState:
def __init__(self, vEgo=0.0):
self.vEgo = vEgo
def make_sm(v_ego=0.0):
return {'carState': MockCarState(vEgo=v_ego)}
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def stock_max_accel(v_ego):
return float(np.interp(v_ego, STOCK_BP, STOCK_V))
def test_disabled_matches_stock(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=False, personality=SPORT))
for v in V_EGO_GRID:
assert c.get_max_accel(v) == pytest.approx(stock_max_accel(v))
assert c.get_rise_rate() == STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.normal
assert not c.enabled()
assert c.smooth_target_accel(-0.5, [-2.0], [1.0], should_stop=False) == -0.5
def test_normal_matches_stock(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
for v in V_EGO_GRID:
assert c.get_max_accel(v) == pytest.approx(stock_max_accel(v))
assert c.get_rise_rate() == STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.normal
def test_eco_is_gentler(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
for v in V_EGO_GRID:
assert c.get_max_accel(v) <= stock_max_accel(v) + 1e-6
assert c.get_rise_rate() < STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.eco
def test_sport_is_brisker(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=SPORT))
for v in V_EGO_GRID:
assert c.get_max_accel(v) >= stock_max_accel(v) - 1e-6
assert c.get_rise_rate() > STOCK_RISE_RATE
assert c.personality() == AccelerationPersonality.sport
def test_param_clamp(mock_cp, mock_mpc):
# Out-of-range int must clamp to the max tier (sport), not raise.
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=9))
assert c.personality() == AccelerationPersonality.sport
@pytest.mark.parametrize("personality", [ECO, NORMAL, SPORT])
def test_profile_decel_tables(mock_cp, mock_mpc, personality):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=personality))
for bp, expected in zip(SMOOTH_DECEL_BP, SMOOTH_DECEL_V[personality], strict=True):
assert c.get_decel_target(bp) == pytest.approx(expected)
def test_profile_jerk_limits_match_plan():
assert BRAKE_DEEPENING_JERK[ECO] == pytest.approx(0.7)
assert BRAKE_DEEPENING_JERK[NORMAL] == pytest.approx(0.9)
assert BRAKE_DEEPENING_JERK[SPORT] == pytest.approx(1.1)
assert BRAKE_RELEASE_JERK == pytest.approx(2.0)
assert ACCEL_RISE_JERK[ECO] < ACCEL_RISE_JERK[NORMAL]
def test_zero_accel_outputs_positive_zero(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
out = c.smooth_target_accel(-0.0, [-0.0], [0.0], should_stop=False, reset=True)
assert out == 0.0
assert not np.signbit(out)
def test_future_brake_need_starts_decel_before_raw_target_brakes(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
c.smooth_target_accel(0.2, [0.2], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(0.2, [0.2, -1.2], [0.0, 1.0], should_stop=False)
assert c.brake_need() == pytest.approx(1.2)
assert c.smooth_active()
assert out < 0.2
def test_brake_deepening_is_jerk_limited(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
c.smooth_target_accel(0.0, [0.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(0.0, [-1.2], [1.0], should_stop=False)
assert out == pytest.approx(-BRAKE_DEEPENING_JERK[NORMAL] * DT_MDL)
def test_brake_release_is_faster_than_deepening(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
c.smooth_target_accel(-1.0, [-1.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(0.5, [0.5], [0.0], should_stop=False)
assert out == pytest.approx(-1.0 + BRAKE_RELEASE_JERK * DT_MDL)
assert BRAKE_RELEASE_JERK > BRAKE_DEEPENING_JERK[NORMAL]
def test_eco_positive_accel_rise_is_smooth(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
c.smooth_target_accel(0.0, [0.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(1.0, [1.0], [0.0], should_stop=False)
assert out == pytest.approx(ACCEL_RISE_JERK[ECO] * DT_MDL)
def test_eco_brake_release_does_not_jump_to_gas(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
c.smooth_target_accel(-0.01, [-0.01], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(1.0, [1.0], [0.0], should_stop=False)
assert out == pytest.approx(ACCEL_RISE_JERK[ECO] * DT_MDL)
def test_blended_stock_brake_passthrough(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
c.smooth_target_accel(0.0, [0.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(-1.0, [-1.0], [0.0], should_stop=False, stock_brake=True)
assert out == pytest.approx(-1.0)
assert not c.smooth_active()
def test_blended_future_brake_need_passthrough(mock_cp, mock_mpc):
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=ECO))
c.smooth_target_accel(0.0, [0.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(0.1, [0.1, -1.0], [0.0, 1.0], should_stop=False, stock_brake=True)
assert out == pytest.approx(0.1)
assert not c.smooth_active()
@pytest.mark.parametrize("raw_target, accel_trajectory, should_stop, crash_cnt", [
(HARD_BRAKE_TARGET_ACCEL - 0.1, [-0.5], False, 0),
(-0.5, [-HARD_BRAKE_NEED], False, 0),
(-0.5, [-0.5], True, 0),
(-0.5, [-0.5], False, 1),
])
def test_emergency_bypass_passthrough(mock_cp, mock_mpc, raw_target, accel_trajectory, should_stop, crash_cnt):
mock_mpc.crash_cnt = crash_cnt
c = AccelController(mock_cp, mock_mpc, params=MockParams(enabled=True, personality=NORMAL))
c.smooth_target_accel(0.0, [0.0], [0.0], should_stop=False, reset=True)
out = c.smooth_target_accel(raw_target, accel_trajectory, [1.0], should_stop=should_stop)
assert out == raw_target
assert c.bypassed()
def test_frame_gated_read(mock_cp, mock_mpc):
params = MockParams(enabled=True, personality=NORMAL)
c = AccelController(mock_cp, mock_mpc, params=params)
c.update(make_sm()) # frame 0 -> reads, picks up normal
params._vals["AccelPersonality"] = int(SPORT)
for _ in range(19): # frames 1..19, none on the 20-frame gate boundary
c.update(make_sm())
assert c.personality() == AccelerationPersonality.normal
c.update(make_sm()) # frame 20 -> reads, picks up sport
assert c.personality() == AccelerationPersonality.sport

View File

@@ -1,17 +1,46 @@
from openpilot.common.realtime import DT_MDL
class WMACConstants:
# Lead detection parameters
LEAD_WINDOW_SIZE = 6 # Stable detection window
LEAD_PROB = 0.45 # Balanced threshold for lead detection
TRAJECTORY_SIZE = 33
PARAM_READ_FRAMES = max(1, int(round(1.0 / DT_MDL)))
# Slow down detection parameters
SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable
SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios
EMERGENCY_HOLD_FRAMES = max(1, int(round(0.75 / DT_MDL)))
MIN_MODE_DURATION = {'acc': max(1, int(round(0.6 / DT_MDL))), 'blended': max(1, int(round(0.5 / DT_MDL)))}
ENTER_BLENDED_FRAMES = max(1, int(round(0.4 / DT_MDL)))
EXIT_BLENDED_FRAMES = max(1, int(round(0.35 / DT_MDL)))
STANDSTILL_FRAMES = max(1, int(round(0.2 / DT_MDL)))
# Optimized slow down distance curve - smooth and progressive
LEAD_PROB = 0.45
LEAD_EXIT_PROB = 0.25
LEAD_RISE_RATE = 1.0
LEAD_FALL_RATE = 0.35
RADAR_LEAD_ACC_PROB = 0.5
RADAR_LEAD_ACC_EXIT_PROB = 0.4
RADAR_LEAD_ACC_RISE_RATE = 1.0
RADAR_LEAD_ACC_FALL_RATE = 0.25
RADAR_LEAD_ACC_MAX_DREL = 80.0
RADAR_LEAD_ACC_MAX_TTC = 6.0
RADAR_LEAD_ACC_MIN_CLOSING_SPEED = -0.5
SLOW_DOWN_PROB = 0.5
SLOW_DOWN_EXIT_PROB = 0.4
SLOW_DOWN_RISE_RATE = 0.65
SLOW_DOWN_FALL_RATE = 0.15
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.]
URGENT_SLOW_DOWN_PROB = 0.85
# Slowness detection parameters
SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection
SLOWNESS_PROB = 0.55 # Clear threshold for slowness
SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset
MODEL_DECEL_START = -0.5
MODEL_DECEL_RANGE = 2.0
ENDPOINT_URGENCY_GAIN = 1.3
CRITICAL_ENDPOINT_FACTOR = 0.3
CRITICAL_URGENCY_GAIN = 1.5
SPEED_URGENCY_MIN = 25.0
SPEED_URGENCY_RANGE = 80.0
SLOWNESS_PROB = 0.55
SLOWNESS_EXIT_PROB = 0.45
SLOWNESS_RISE_RATE = 0.35
SLOWNESS_FALL_RATE = 0.5
SLOWNESS_CRUISE_OFFSET = 1.025

View File

@@ -6,129 +6,116 @@ See the LICENSE.md file in the root directory for more details.
"""
# Version = 2025-6-30
from cereal import messaging
from opendbc.car import structs
from numpy import interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
from typing import Literal
# d-e2e, from modeldata.h
TRAJECTORY_SIZE = 33
SET_MODE_TIMEOUT = 15
from cereal import messaging
from numpy import interp
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
# Define the valid mode types
ModeType = Literal['acc', 'blended']
class SmoothKalmanFilter:
"""Enhanced Kalman filter with smoothing for stable decision making."""
def clip01(value: float) -> float:
return max(0.0, min(1.0, float(value)))
def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01,
alpha=1.0, smoothing_factor=0.85):
self.x = initial_value
self.P = 1.0
self.R = measurement_noise
self.Q = process_noise
self.alpha = alpha
self.smoothing_factor = smoothing_factor
self.initialized = False
self.history = []
self.max_history = 10
self.confidence = 0.0
def add_data(self, measurement):
if len(self.history) >= self.max_history:
self.history.pop(0)
self.history.append(measurement)
class SmoothedSignal:
def __init__(self, rise_rate: float, fall_rate: float, initial_value: float = 0.0):
self.rise_rate = clip01(rise_rate)
self.fall_rate = clip01(fall_rate)
self.value = clip01(initial_value)
if not self.initialized:
self.x = measurement
self.initialized = True
self.confidence = 0.1
return
def update(self, measurement: float) -> float:
measurement = clip01(measurement)
rate = self.rise_rate if measurement > self.value else self.fall_rate
self.value += (measurement - self.value) * rate
return self.value
self.P = self.alpha * self.P + self.Q
def reset(self, value: float = 0.0) -> None:
self.value = clip01(value)
K = self.P / (self.P + self.R)
effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1
innovation = measurement - self.x
self.x = self.x + effective_K * innovation
self.P = (1 - effective_K) * self.P
class HysteresisSignal:
def __init__(self, enter_threshold: float, exit_threshold: float, rise_rate: float, fall_rate: float):
self.enter_threshold = clip01(enter_threshold)
self.exit_threshold = clip01(exit_threshold)
self.filter = SmoothedSignal(rise_rate, fall_rate)
self.active = False
if abs(innovation) < 0.1:
self.confidence = min(1.0, self.confidence + 0.05)
else:
self.confidence = max(0.1, self.confidence - 0.02)
def update(self, measurement: float) -> bool:
value = self.filter.update(measurement)
threshold = self.exit_threshold if self.active else self.enter_threshold
self.active = value > threshold
return self.active
def get_value(self):
return self.x if self.initialized else None
def reset(self) -> None:
self.filter.reset()
self.active = False
def get_confidence(self):
return self.confidence
def reset_data(self):
self.initialized = False
self.history = []
self.confidence = 0.0
@property
def value(self) -> float:
return self.filter.value
class ModeTransitionManager:
"""Manages smooth transitions between driving modes with hysteresis."""
def __init__(self):
self.current_mode: ModeType = 'acc'
self.mode_confidence = {'acc': 1.0, 'blended': 0.0}
self.transition_timeout = 0
self.min_mode_duration = 10
self.mode_duration = 0
self.emergency_override = False
self._pending_mode: ModeType = 'acc'
self._pending_count = 0
self._blended_hold_frames = 0
def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False):
# Emergency override for critical situations (stops, collisions)
if emergency:
self.emergency_override = True
self.current_mode = mode
self.transition_timeout = SET_MODE_TIMEOUT
self.mode_duration = 0
def request_mode(self, mode: ModeType, immediate: bool = False, hold_frames: int = 0, cancel_hold: bool = False) -> None:
if immediate:
self._blended_hold_frames = max(self._blended_hold_frames, hold_frames)
self._pending_mode = mode
self._pending_count = 0
self._switch_mode(mode)
return
self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence)
for m in self.mode_confidence:
if m != mode:
self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05)
if cancel_hold and mode == 'acc':
self._blended_hold_frames = 0
# Require minimum duration in current mode (unless emergency)
if self.mode_duration < self.min_mode_duration and not self.emergency_override:
if self._blended_hold_frames > 0:
mode = 'blended'
if mode == self.current_mode:
self._pending_mode = mode
self._pending_count = 0
return
# Hysteresis: higher threshold for mode changes
confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response
if mode != self._pending_mode:
self._pending_mode = mode
self._pending_count = 1
else:
self._pending_count += 1
if self.mode_confidence[mode] > confidence_threshold:
if mode != self.current_mode and self.transition_timeout == 0:
self.transition_timeout = SET_MODE_TIMEOUT
self.current_mode = mode
self.mode_duration = 0
if self.mode_duration < WMACConstants.MIN_MODE_DURATION[self.current_mode]:
return
def update(self):
if self.transition_timeout > 0:
self.transition_timeout -= 1
required_count = WMACConstants.ENTER_BLENDED_FRAMES if mode == 'blended' else WMACConstants.EXIT_BLENDED_FRAMES
if self._pending_count >= required_count:
self._switch_mode(mode)
def update(self) -> None:
if self._blended_hold_frames > 0:
self._blended_hold_frames -= 1
self.mode_duration += 1
# Reset emergency override after some time
if self.emergency_override and self.mode_duration > 20:
self.emergency_override = False
# Gradual confidence decay
for mode in self.mode_confidence:
self.mode_confidence[mode] *= 0.98
def get_mode(self) -> ModeType:
return self.current_mode
def _switch_mode(self, mode: ModeType) -> None:
if mode == self.current_mode:
return
self.current_mode = mode
self.mode_duration = 0
self._pending_mode = mode
self._pending_count = 0
class DynamicExperimentalController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
@@ -142,35 +129,33 @@ class DynamicExperimentalController:
self._mode_manager = ModeTransitionManager()
# Smooth filters for stable decision making with faster response for critical scenarios
self._lead_filter = SmoothKalmanFilter(
measurement_noise=0.15,
process_noise=0.05,
alpha=1.02,
smoothing_factor=0.8
self._lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.LEAD_PROB,
exit_threshold=WMACConstants.LEAD_EXIT_PROB,
rise_rate=WMACConstants.LEAD_RISE_RATE,
fall_rate=WMACConstants.LEAD_FALL_RATE,
)
self._radar_acc_lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.RADAR_LEAD_ACC_PROB,
exit_threshold=WMACConstants.RADAR_LEAD_ACC_EXIT_PROB,
rise_rate=WMACConstants.RADAR_LEAD_ACC_RISE_RATE,
fall_rate=WMACConstants.RADAR_LEAD_ACC_FALL_RATE,
)
self._slow_down_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOW_DOWN_PROB,
exit_threshold=WMACConstants.SLOW_DOWN_EXIT_PROB,
rise_rate=WMACConstants.SLOW_DOWN_RISE_RATE,
fall_rate=WMACConstants.SLOW_DOWN_FALL_RATE,
)
self._slowness_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOWNESS_PROB,
exit_threshold=WMACConstants.SLOWNESS_EXIT_PROB,
rise_rate=WMACConstants.SLOWNESS_RISE_RATE,
fall_rate=WMACConstants.SLOWNESS_FALL_RATE,
)
self._slow_down_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.1,
alpha=1.05,
smoothing_factor=0.7
)
self._slowness_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.06,
alpha=1.015,
smoothing_factor=0.92
)
self._mpc_fcw_filter = SmoothKalmanFilter(
measurement_noise=0.2,
process_noise=0.1,
alpha=1.1,
smoothing_factor=0.5
)
self._has_lead_filtered = False
self._has_radar_acc_lead = False
self._has_slow_down = False
self._has_slowness = False
self._has_mpc_fcw = False
@@ -179,13 +164,14 @@ class DynamicExperimentalController:
self._has_standstill = False
self._mpc_fcw_crash_cnt = 0
self._standstill_count = 0
# debug
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
self._raw_urgency = 0.0
def _read_params(self) -> None:
if self._frame % int(1. / DT_MDL) == 0:
if self._frame % WMACConstants.PARAM_READ_FRAMES == 0:
self._enabled = self._params.get_bool("DynamicExperimentalControl")
def mode(self) -> str:
@@ -198,7 +184,6 @@ class DynamicExperimentalController:
return self._active
def set_mpc_fcw_crash_cnt(self) -> None:
"""Set MPC FCW crash count"""
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
def _update_calculations(self, sm: messaging.SubMaster) -> None:
@@ -210,179 +195,109 @@ class DynamicExperimentalController:
self._v_cruise_kph = car_state.vCruise
self._has_standstill = car_state.standstill
# standstill detection
if self._has_standstill:
self._standstill_count = min(20, self._standstill_count + 1)
self._standstill_count = min(WMACConstants.STANDSTILL_FRAMES * 3, self._standstill_count + 1)
else:
self._standstill_count = max(0, self._standstill_count - 1)
# Lead detection
self._lead_filter.add_data(float(lead_one.status))
lead_value = self._lead_filter.get_value() or 0.0
self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB
# MPC FCW detection
fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0
self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0))
self._has_mpc_fcw = fcw_filtered_value > 0.5
# Slow down detection
self._has_lead_filtered = self._lead_tracker.update(float(lead_one.status))
self._has_radar_acc_lead = self._radar_acc_lead_tracker.update(self._radar_acc_lead_score(lead_one))
self._has_mpc_fcw = self._mpc_fcw_crash_cnt > 0
self._calculate_slow_down(md)
# Slowness detection
if not (self._standstill_count > 5) and not self._has_slow_down:
if self._standstill_count > WMACConstants.STANDSTILL_FRAMES or self._has_slow_down:
self._slowness_tracker.reset()
self._has_slowness = False
else:
current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
self._slowness_filter.add_data(current_slowness)
slowness_value = self._slowness_filter.get_value() or 0.0
self._has_slowness = self._slowness_tracker.update(current_slowness)
# Hysteresis for slowness
threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1)
self._has_slowness = slowness_value > threshold
def _calculate_slow_down(self, md):
"""Calculate urgency based on trajectory endpoint vs expected distance."""
# Reset to safe defaults
urgency = 0.0
def _calculate_slow_down(self, md) -> None:
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
#Require exact trajectory size
position_valid = len(md.position.x) == TRAJECTORY_SIZE
orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE
urgency = self._model_action_urgency(md)
position_valid = len(md.position.x) == WMACConstants.TRAJECTORY_SIZE
if not (position_valid and orientation_valid):
# Invalid trajectory - this itself might indicate a stop scenario
# Apply moderate urgency for incomplete trajectories at speed
if self._v_ego_kph > 20.0:
urgency = 0.3
if position_valid:
self._trajectory_valid = True
self._endpoint_x = md.position.x[WMACConstants.TRAJECTORY_SIZE - 1]
self._expected_distance = interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST)
urgency = max(urgency, self._endpoint_urgency(self._endpoint_x, self._expected_distance))
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB
self._urgency = urgency_filtered
return
self._raw_urgency = clip01(urgency)
self._has_slow_down = self._slow_down_tracker.update(self._raw_urgency)
self._urgency = self._slow_down_tracker.value
# We have a valid full trajectory
self._trajectory_valid = True
def _radar_acc_lead_score(self, lead_one) -> float:
if not lead_one.status:
return 0.0
# Use the exact endpoint (33rd point, index 32)
endpoint_x = md.position.x[TRAJECTORY_SIZE - 1]
self._endpoint_x = endpoint_x
d_rel = float(getattr(lead_one, 'dRel', float('inf')))
v_rel = float(getattr(lead_one, 'vRel', 0.0))
if d_rel <= WMACConstants.RADAR_LEAD_ACC_MAX_DREL:
return 1.0
if v_rel <= WMACConstants.RADAR_LEAD_ACC_MIN_CLOSING_SPEED and d_rel / max(-v_rel, 0.1) <= WMACConstants.RADAR_LEAD_ACC_MAX_TTC:
return 1.0
return 0.0
# Get expected distance based on current speed using tuned constants
expected_distance = interp(self._v_ego_kph,
WMACConstants.SLOW_DOWN_BP,
WMACConstants.SLOW_DOWN_DIST)
self._expected_distance = expected_distance
def _model_action_urgency(self, md) -> float:
action = getattr(md, 'action', None)
if action is None:
return 0.0
# Calculate urgency based on trajectory shortage
if endpoint_x < expected_distance:
shortage = expected_distance - endpoint_x
shortage_ratio = shortage / expected_distance
urgency = 1.0 if getattr(action, 'shouldStop', False) else 0.0
desired_accel = getattr(action, 'desiredAcceleration', 0.0)
if desired_accel < WMACConstants.MODEL_DECEL_START:
urgency = max(urgency, min(1.0, (WMACConstants.MODEL_DECEL_START - desired_accel) / WMACConstants.MODEL_DECEL_RANGE))
return urgency
# Base urgency on shortage ratio
urgency = min(1.0, shortage_ratio * 2.0)
def _endpoint_urgency(self, endpoint_x: float, expected_distance: float) -> float:
if endpoint_x >= expected_distance:
return 0.0
# Increase urgency for very short trajectories (imminent stops)
critical_distance = expected_distance * 0.3
if endpoint_x < critical_distance:
urgency = min(1.0, urgency * 2.0)
shortage_ratio = (expected_distance - endpoint_x) / expected_distance
urgency = min(1.0, shortage_ratio * WMACConstants.ENDPOINT_URGENCY_GAIN)
# Speed-based urgency adjustment
if self._v_ego_kph > 25.0:
speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0
urgency = min(1.0, urgency * speed_factor)
if endpoint_x < expected_distance * WMACConstants.CRITICAL_ENDPOINT_FACTOR:
urgency = min(1.0, urgency * WMACConstants.CRITICAL_URGENCY_GAIN)
# Apply filtering but with less smoothing for stops
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
if self._v_ego_kph > WMACConstants.SPEED_URGENCY_MIN:
speed_factor = 1.0 + (self._v_ego_kph - WMACConstants.SPEED_URGENCY_MIN) / WMACConstants.SPEED_URGENCY_RANGE
urgency = min(1.0, urgency * speed_factor)
# Update state with lower threshold for better stop detection
self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8)
self._urgency = urgency_filtered
return urgency
def _radarless_mode(self) -> None:
"""Radarless mode decision logic with emergency handling."""
def _desired_mode(self) -> tuple[ModeType, bool]:
if not self._CP.radarUnavailable and self._has_radar_acc_lead:
return 'acc', False
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
return 'blended', True
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
standstill = self._standstill_count > WMACConstants.STANDSTILL_FRAMES
urgent_slow_down = self._has_slow_down and self._raw_urgency > WMACConstants.URGENT_SLOW_DOWN_PROB
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.5)
self._mode_manager.request_mode('blended', confidence=confidence)
return
if self._CP.radarUnavailable:
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
return 'acc', False
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
def _radar_mode(self) -> None:
"""Radar mode with emergency handling."""
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
# If lead detected and not in standstill: always use ACC
if self._has_lead_filtered and not (self._standstill_count > 3):
self._mode_manager.request_mode('acc', confidence=1.0)
return
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.3)
self._mode_manager.request_mode('blended', confidence=confidence)
return
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
return 'acc', False
def update(self, sm: messaging.SubMaster) -> None:
self._read_params()
self.set_mpc_fcw_crash_cnt()
self._update_calculations(sm)
if self._CP.radarUnavailable:
self._radarless_mode()
else:
self._radar_mode()
mode, immediate = self._desired_mode()
self._mode_manager.request_mode(mode, immediate=immediate, hold_frames=WMACConstants.EMERGENCY_HOLD_FRAMES,
cancel_hold=self._has_radar_acc_lead)
self._mode_manager.update()
self._active = sm['selfdriveState'].experimentalMode and self._enabled
self._frame += 1

View File

@@ -1,94 +0,0 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
class MockLeadOne:
def __init__(self, status=0.0):
self.status = status
class MockRadarState:
def __init__(self, status=0.0):
self.leadOne = MockLeadOne(status=status)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockModelData:
def __init__(self, valid=True):
size = 33 if valid else 10 # incomplete if invalid
self.position = type("Pos", (), {"x": [0.0] * size})()
self.orientation = type("Ori", (), {"x": [0.0] * size})()
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
# Fake Kalman Filter that always returns a given value
class FakeKalman:
def __init__(self, value=1.0):
self.value = value
def add_data(self, v): pass
def get_value(self): return self.value
def get_confidence(self): return 1.0
def reset_data(self): pass
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['carState'].standstill = True
for _ in range(10):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
mock_mpc.crash_cnt = 1 # simulate FCW
for _ in range(2):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
# Force conditions to simulate slowdown
controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown
controller._v_ego_kph = 35.0
default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory
for _ in range(3):
controller.update(default_sm)
assert controller.mode() == "blended"

View File

@@ -0,0 +1,235 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, HysteresisSignal
class MockLeadOne:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.status = status
self.dRel = dRel
self.vRel = vRel
class MockRadarState:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.leadOne = MockLeadOne(status=status, dRel=dRel, vRel=vRel)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockAction:
def __init__(self, desiredAcceleration=0.0, shouldStop=False):
self.desiredAcceleration = desiredAcceleration
self.shouldStop = shouldStop
class MockModelData:
def __init__(self, valid=True, endpoint_x=200.0, orientation_valid=None, desired_acceleration=0.0, should_stop=False):
position_size = 33 if valid else 10
orientation_size = position_size if orientation_valid is None else (33 if orientation_valid else 10)
position_x = [0.0] * position_size
if position_x:
position_x[-1] = endpoint_x
self.position = type("Pos", (), {"x": position_x})()
self.orientation = type("Ori", (), {"x": [0.0] * orientation_size})()
self.acceleration = type("Accel", (), {"x": [0.0] * position_size})()
self.action = MockAction(desired_acceleration, should_stop)
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['carState'].standstill = True
for _ in range(20):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
mock_mpc.crash_cnt = 1
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
def test_valid_position_with_missing_orientation_can_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, orientation_valid=False)
controller.update(default_sm)
assert controller._trajectory_valid
assert controller.mode() == "blended"
def test_incomplete_position_does_not_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert not controller._trajectory_valid
assert not controller._has_slow_down
assert controller.mode() == "acc"
def test_slowdown_hysteresis_prevents_threshold_chatter():
signal = HysteresisSignal(enter_threshold=0.5, exit_threshold=0.4, rise_rate=1.0, fall_rate=1.0)
assert signal.update(0.55)
assert signal.update(0.45)
assert not signal.update(0.35)
def test_model_should_stop_triggers_blended_without_valid_trajectory(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, should_stop=True)
controller.update(default_sm)
assert not controller._trajectory_valid
assert controller.mode() == "blended"
def test_radar_lead_keeps_acc_over_model_slowdown(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert controller._has_slow_down
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_far_radar_lead_allows_blended_until_acc_relevant(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert not controller._has_radar_acc_lead
assert controller.mode() == "blended"
def test_relevant_radar_lead_smoothly_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
default_sm['radarState'] = MockRadarState(status=1.0, dRel=45.0, vRel=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_closing_far_radar_lead_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=-25.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_radar_lead_keeps_acc_over_fcw_and_standstill(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['carState'].standstill = True
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, should_stop=True)
mock_mpc.crash_cnt = 1
for _ in range(10):
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller._has_mpc_fcw
assert controller.mode() == "acc"
def test_lead_flicker_hold_prevents_one_frame_mode_flip(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
controller.update(default_sm)
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller.mode() == "acc"

View File

@@ -0,0 +1,203 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections import deque
from dataclasses import dataclass
_HOLD_FRAMES = 16
_STATUS_WINDOW = 20
_STABILITY_FLIPS_FULL = 6.0
# Fresh low-confidence close leads only get masked when they are not urgent.
_URGENT_TTC_MIN = 4.0 # s, below this = urgent, no masking
_URGENT_CLOSING_VREL = -8.0 # m/s, faster closing = no masking
# Phantom-lead mask: a fresh low-modelProb close lead is usually a radar ghost; mask it
# so the MPC ignores it. Established/held leads are never masked (see smooth()).
_PHANTOM_MODELPROB_MAX = 0.5
_PHANTOM_DREL_MAX = 5.0
@dataclass
class _LeadSnap:
dRel: float = 0.0
yRel: float = 0.0
vRel: float = 0.0
vLead: float = 0.0
aLeadK: float = 0.0
aLeadTau: float = 0.0
modelProb: float = 0.0
aRel: float = 0.0
fcw: bool = False
class _LeadProxy:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'aLeadK', 'aLeadTau',
'modelProb', 'aRel', 'fcw')
def __init__(self, snap: _LeadSnap):
self.status = True
self.dRel = snap.dRel
self.yRel = snap.yRel
self.vRel = snap.vRel
self.vLead = snap.vLead
self.aLeadK = snap.aLeadK
self.aLeadTau = snap.aLeadTau
self.modelProb = snap.modelProb
self.aRel = snap.aRel
self.fcw = snap.fcw
class _LeadProxyMasked:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'aLeadK', 'aLeadTau',
'modelProb', 'aRel', 'fcw')
def __init__(self):
self.status = False
self.dRel = 0.0
self.yRel = 0.0
self.vRel = 0.0
self.vLead = 0.0
self.aLeadK = 0.0
self.aLeadTau = 0.0
self.modelProb = 0.0
self.aRel = 0.0
self.fcw = False
class _RadarStateProxy:
__slots__ = ('_raw', '_lead_one', '_lead_two')
def __init__(self, raw, lead_one, lead_two):
self._raw = raw
self._lead_one = lead_one
self._lead_two = lead_two
@property
def leadOne(self):
return self._lead_one if self._lead_one is not None else self._raw.leadOne
@property
def leadTwo(self):
return self._lead_two if self._lead_two is not None else self._raw.leadTwo
def __getattr__(self, name):
return getattr(self._raw, name)
class LeadPersistence:
"""Internal helper. Hold last-known leadOne/leadTwo alive for HOLD_FRAMES
after a status drop, so the MPC view of radarState ignores brief flicker.
Also masks freshly-acquired close-range low-confidence phantom leads so the
planner doesn't demand emergency brake on radar ghosts.
No own param — owner (RadarDistanceController) gates via force_enabled."""
def __init__(self):
self._last_one: _LeadSnap | None = None
self._last_two: _LeadSnap | None = None
self._alive_one = 0
self._alive_two = 0
self._status_hist: deque[bool] = deque(maxlen=_STATUS_WINDOW)
self._stability = 1.0
@property
def stability(self) -> float:
return self._stability
def reset(self) -> None:
self._last_one = None
self._last_two = None
self._alive_one = 0
self._alive_two = 0
self._status_hist.clear()
self._stability = 1.0
def update(self, radarstate, force_enabled: bool = True) -> None:
if radarstate is None:
return
if not force_enabled:
self.reset()
return
one = radarstate.leadOne
two = radarstate.leadTwo
# phantom counts as not-valid so hold-alive doesn't re-inject the ghost
one_valid = bool(one.status) and not self._is_phantom(one)
two_valid = bool(two.status) and not self._is_phantom(two)
if one_valid:
self._last_one = self._snap(one)
self._alive_one = _HOLD_FRAMES
elif self._alive_one > 0:
self._alive_one -= 1
if two_valid:
self._last_two = self._snap(two)
self._alive_two = _HOLD_FRAMES
elif self._alive_two > 0:
self._alive_two -= 1
self._status_hist.append(bool(one.status))
if len(self._status_hist) >= 5:
flips = sum(1 for i in range(1, len(self._status_hist))
if self._status_hist[i] != self._status_hist[i - 1])
self._stability = max(0.0, 1.0 - min(1.0, flips / _STABILITY_FLIPS_FULL))
else:
self._stability = 1.0
@staticmethod
def _is_phantom(lead) -> bool:
if not (bool(lead.status)
and float(lead.modelProb) < _PHANTOM_MODELPROB_MAX
and float(lead.dRel) < _PHANTOM_DREL_MAX):
return False
# don't mask an urgently-closing lead
v_rel = float(lead.vRel)
if v_rel <= _URGENT_CLOSING_VREL:
return False
ttc = float(lead.dRel) / max(0.1, -v_rel) if v_rel < 0.0 else float('inf')
return ttc > _URGENT_TTC_MIN
def smooth(self, radarstate, force_enabled: bool = True):
if not force_enabled or radarstate is None:
return radarstate
l1 = None
l2 = None
# mask only a fresh ghost (alive==0); never drop an established/held lead
if self._is_phantom(radarstate.leadOne) and self._alive_one == 0:
l1 = _LeadProxyMasked()
elif not radarstate.leadOne.status and self._alive_one > 0 and self._last_one is not None:
l1 = _LeadProxy(self._last_one)
if self._is_phantom(radarstate.leadTwo) and self._alive_two == 0:
l2 = _LeadProxyMasked()
elif not radarstate.leadTwo.status and self._alive_two > 0 and self._last_two is not None:
l2 = _LeadProxy(self._last_two)
if l1 is None and l2 is None:
return radarstate
return _RadarStateProxy(radarstate, l1, l2)
@staticmethod
def _snap(lead) -> _LeadSnap:
return _LeadSnap(
dRel=float(lead.dRel),
yRel=float(lead.yRel),
vRel=float(lead.vRel),
vLead=float(lead.vLead),
aLeadK=float(lead.aLeadK),
aLeadTau=float(lead.aLeadTau),
modelProb=float(lead.modelProb),
aRel=float(lead.aRel),
fcw=bool(lead.fcw),
)

View File

@@ -9,8 +9,10 @@ from cereal import messaging, custom
from opendbc.car import structs
from openpilot.common.constants import CV
from openpilot.selfdrive.car.cruise import V_CRUISE_MAX
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
from openpilot.sunnypilot.selfdrive.controls.lib.e2e_alerts_helper import E2EAlertsHelper
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import RadarDistanceController
from openpilot.sunnypilot.selfdrive.controls.lib.smart_cruise_control.smart_cruise_control import SmartCruiseControl
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_resolver import SpeedLimitResolver
@@ -26,6 +28,8 @@ class LongitudinalPlannerSP:
self.events_sp = EventsSP()
self.resolver = SpeedLimitResolver()
self.dec = DynamicExperimentalController(CP, mpc)
self.accel = AccelController(CP, mpc)
self.radar_distance = RadarDistanceController()
self.scc = SmartCruiseControl()
self.resolver = SpeedLimitResolver()
self.sla = SpeedLimitAssist(CP, CP_SP)
@@ -35,6 +39,14 @@ class LongitudinalPlannerSP:
self.output_v_target = 0.
self.output_a_target = 0.
self._smoothed_radarstate = None
def smooth_radarstate(self, radarstate):
# RadarDistance: hold leads through flicker, mask fresh close phantoms, and slew dRel across
# lead target-switches so the MPC doesn't demand a catch-up hard brake. Cached per cycle.
if self._smoothed_radarstate is None:
self._smoothed_radarstate = self.radar_distance.smooth_radarstate(radarstate)
return self._smoothed_radarstate
def is_e2e(self, sm: messaging.SubMaster) -> bool:
experimental_mode = sm['selfdriveState'].experimentalMode
@@ -74,8 +86,11 @@ class LongitudinalPlannerSP:
return self.output_v_target, self.output_a_target
def update(self, sm: messaging.SubMaster) -> None:
self._smoothed_radarstate = None
self.events_sp.clear()
self.dec.update(sm)
self.accel.update(sm)
self.radar_distance.update(sm)
self.e2e_alerts_helper.update(sm, self.events_sp)
def publish_longitudinal_plan_sp(self, sm: messaging.SubMaster, pm: messaging.PubMaster) -> None:
@@ -95,6 +110,12 @@ class LongitudinalPlannerSP:
dec.enabled = self.dec.enabled()
dec.active = self.dec.active()
# Acceleration Personality
acceleration = longitudinalPlanSP.acceleration
acceleration.personality = self.accel.personality()
acceleration.enabled = self.accel.enabled()
acceleration.maxAccel = float(self.accel.max_accel())
# Smart Cruise Control
smartCruiseControl = longitudinalPlanSP.smartCruiseControl
# Vision Control

View File

@@ -0,0 +1,58 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import LeadPersistence
_PARAM_REFRESH_FRAMES = max(1, int(1.0 / DT_MDL))
class RadarDistanceController:
"""Holds last-known radar leads alive through brief flicker and masks close-range
phantom leads, so the MPC view of radarState stays stable. The behavior lives in
LeadPersistence; this owns the RadarDistance param gate."""
def __init__(self):
self.params = Params()
self._frame = 0
self._enabled = self.params.get_bool('RadarDistance')
self._lead_persistence = LeadPersistence()
def is_enabled(self) -> bool:
return self._enabled
def set_enabled(self, enabled: bool):
self._enabled = bool(enabled)
self.params.put_bool('RadarDistance', self._enabled)
def toggle(self) -> bool:
self.set_enabled(not self._enabled)
return self._enabled
def update(self, sm=None, sm_sp=None) -> None:
self._frame += 1
if self._frame % _PARAM_REFRESH_FRAMES == 0:
self._enabled = self.params.get_bool('RadarDistance')
radarstate = None
if sm is not None:
try:
radarstate = sm['radarState']
except (KeyError, AttributeError, TypeError):
radarstate = None
self._lead_persistence.update(radarstate, force_enabled=self._enabled)
def smooth_radarstate(self, radarstate):
if not self._enabled:
return radarstate
return self._lead_persistence.smooth(radarstate, force_enabled=True)
def reset(self) -> None:
self._lead_persistence.reset()

View File

@@ -0,0 +1,84 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import numpy as np
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.common.params import Params
NEARSIDE_PROB = 0.2
EDGE_PROB = 0.35
EDGE_REACTION_TIME = 1.0
EDGE_CLEAR_TIME = 0.3
MIN_SPEED = 20 * CV.MPH_TO_MS
class RoadEdgeLaneChangeController:
def __init__(self, desire_helper):
self.DH = desire_helper
self.params = Params()
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
self.param_read_counter = 0
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def read_params(self) -> None:
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
def update_params(self) -> None:
if self.param_read_counter % 50 == 0:
self.read_params()
self.param_read_counter += 1
def reset(self) -> None:
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def update(self, road_edge_stds, lane_line_probs, v_ego: float) -> None:
self.update_params()
if not self.enabled or v_ego < MIN_SPEED:
self.reset()
return
left_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
right_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
left_lane_prob = lane_line_probs[0]
right_lane_prob = lane_line_probs[3]
left_cond = left_edge_prob > EDGE_PROB and left_lane_prob < NEARSIDE_PROB and right_lane_prob >= left_lane_prob
right_cond = right_edge_prob > EDGE_PROB and right_lane_prob < NEARSIDE_PROB and left_lane_prob >= right_lane_prob
if left_cond:
self.left_edge_timer = min(self.left_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.left_clear_timer = 0.0
if self.left_edge_timer > EDGE_REACTION_TIME:
self.left_edge_detected = True
else:
self.left_clear_timer += DT_MDL
if self.left_clear_timer > EDGE_CLEAR_TIME:
self.left_edge_timer = 0.0
self.left_edge_detected = False
if right_cond:
self.right_edge_timer = min(self.right_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.right_clear_timer = 0.0
if self.right_edge_timer > EDGE_REACTION_TIME:
self.right_edge_detected = True
else:
self.right_clear_timer += DT_MDL
if self.right_clear_timer > EDGE_CLEAR_TIME:
self.right_edge_timer = 0.0
self.right_edge_detected = False

View File

@@ -5,6 +5,8 @@ from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.lane_turn_desire import LaneTurnController, LANE_CHANGE_SPEED_MIN
from openpilot.sunnypilot.selfdrive.controls.lib.auto_lane_change import AutoLaneChangeMode
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
TurnDirection = custom.ModelDataV2SP.TurnDirection
@@ -107,7 +109,11 @@ def set_lane_turn_params():
])
def test_desire_helper_integration(carstate, lateral_active, lane_change_prob, expected_desire, set_lane_turn_params):
dh = DesireHelper()
relc = RoadEdgeLaneChangeController(dh)
relc.enabled = True
dh.alc.lane_change_set_timer = AutoLaneChangeMode.NUDGE
for _ in range(10):
dh.update(carstate, lateral_active, lane_change_prob)
dh.update(carstate, lateral_active, lane_change_prob,
left_edge_detected=relc.left_edge_detected, right_edge_detected=relc.right_edge_detected)
assert dh.desire == expected_desire # The first four tests were unit tests to test the controller, where this tests the integration in desire helpers

View File

@@ -0,0 +1,232 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import (
LeadPersistence,
_HOLD_FRAMES,
)
class FakeLead:
def __init__(self, status=False, d_rel=0.0, v_rel=0.0, v_lead=0.0,
a_lead=0.0, a_tau=0.0, model_prob=0.0, y_rel=0.0, a_rel=0.0, fcw=False):
self.status = status
self.dRel = d_rel
self.yRel = y_rel
self.vRel = v_rel
self.vLead = v_lead
self.aLeadK = a_lead
self.aLeadTau = a_tau
self.modelProb = model_prob
self.aRel = a_rel
self.fcw = fcw
class FakeRadarState:
def __init__(self, lead_one=None, lead_two=None, extra='ok'):
self.leadOne = lead_one or FakeLead()
self.leadTwo = lead_two or FakeLead()
self.extra = extra
def _make():
return LeadPersistence()
class TestLeadPersistence:
def test_disabled_passthrough(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
lp.update(raw, force_enabled=False)
out = lp.smooth(raw, force_enabled=False)
assert out is raw
def test_status_true_passthrough(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0))
for _ in range(3):
lp.update(raw)
out = lp.smooth(raw)
# leadOne still truly present → passthrough (or wrapper with same .status=True)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
def test_dropout_held(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0, a_lead=-1.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadOne.aLeadK == -1.0
def test_dropout_expires(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(_HOLD_FRAMES + 2):
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is False
def test_reappearance_resets_hold(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(3):
lp.update(raw_off)
raw_on2 = FakeRadarState(lead_one=FakeLead(status=True, d_rel=28.0))
lp.update(raw_on2)
out = lp.smooth(raw_on2)
assert out.leadOne.status is True
assert out.leadOne.dRel == 28.0
def test_other_attrs_passthrough(self):
lp = _make()
raw_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0), extra='special')
for _ in range(3):
lp.update(raw_on)
raw_off = FakeRadarState(lead_one=FakeLead(status=False), extra='special2')
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.extra == 'special2'
def test_stability_high_on_solid_lead(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(10):
lp.update(raw)
assert lp.stability >= 0.9
def test_stability_low_on_churn(self):
lp = _make()
on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(10):
lp.update(on)
lp.update(off)
assert lp.stability < 0.5
def test_leadtwo_independent(self):
lp = _make()
raw_on = FakeRadarState(
lead_one=FakeLead(status=True, d_rel=30.0),
lead_two=FakeLead(status=True, d_rel=80.0),
)
for _ in range(5):
lp.update(raw_on)
raw_off2 = FakeRadarState(
lead_one=FakeLead(status=True, d_rel=30.0),
lead_two=FakeLead(status=False),
)
lp.update(raw_off2)
out = lp.smooth(raw_off2)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadTwo.status is True
assert out.leadTwo.dRel == 80.0
def test_reset(self):
lp = _make()
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
lp.update(raw)
lp.reset()
raw_off = FakeRadarState(lead_one=FakeLead(status=False))
lp.update(raw_off)
out = lp.smooth(raw_off)
assert out.leadOne.status is False
def _step(lp, lead):
raw = FakeRadarState(lead_one=lead)
lp.update(raw)
return lp.smooth(raw)
class TestPhantomMask:
def test_masked_when_not_urgent(self):
lp = _make()
# fresh, low modelProb, close, slow closing (TTC 8s) -> phantom ghost -> masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=4.0, v_rel=-0.5, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is False
def test_not_masked_when_urgent_ttc(self):
lp = _make()
# fresh, low modelProb, close, TTC 3s <= 4 -> real cut-in risk -> NOT masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=3.0, v_rel=-1.0, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is True
assert out.leadOne.dRel == 3.0
def test_not_masked_when_fast_closing(self):
lp = _make()
# fresh, low modelProb, close, vRel -10 <= -8 -> NOT masked
raw = FakeRadarState(lead_one=FakeLead(status=True, d_rel=4.0, v_rel=-10.0, model_prob=0.3))
lp.update(raw)
out = lp.smooth(raw)
assert out.leadOne.status is True
class TestLeadSwitchPassthrough:
def _settle_far(self, lp, d=45.0):
out = None
for _ in range(6):
out = _step(lp, FakeLead(status=True, d_rel=d, v_rel=-3.0, v_lead=22.0))
return out
def test_steady_lead_passthrough(self):
lp = _make()
out = self._settle_far(lp, d=45.0)
assert out.leadOne.dRel == 45.0
def test_closer_valid_switch_passes_through(self):
lp = _make()
self._settle_far(lp, d=45.0)
# The accel controller smooths braking after MPC; RadarDistance must not hide a valid closer lead from MPC.
out = _step(lp, FakeLead(status=True, d_rel=28.0, v_rel=-5.0, v_lead=19.0))
assert out.leadOne.dRel == 28.0
assert out.leadOne.vRel == -5.0
assert out.leadOne.vLead == 19.0
def test_urgent_switch_passes_through(self):
lp = _make()
self._settle_far(lp, d=45.0)
# nearer car at 12 m closing -5 -> TTC 2.4 s < 4 -> urgent, no masking
out = _step(lp, FakeLead(status=True, d_rel=12.0, v_rel=-5.0, v_lead=10.0))
assert out.leadOne.dRel == 12.0
def test_fast_closing_switch_passes_through(self):
lp = _make()
self._settle_far(lp, d=60.0)
# vRel -10 <= -8 -> emergency closing, no masking even though TTC ok
out = _step(lp, FakeLead(status=True, d_rel=40.0, v_rel=-10.0, v_lead=15.0))
assert out.leadOne.dRel == 40.0
def test_normal_closing_passthrough(self):
lp = _make()
d = 45.0
self._settle_far(lp, d=d)
for _ in range(20):
d -= 0.25 # ~5 m/s closing at 20 Hz, continuous (not a switch)
out = _step(lp, FakeLead(status=True, d_rel=d, v_rel=-5.0, v_lead=20.0))
assert out.leadOne.dRel == d
def test_farther_switch_passthrough(self):
lp = _make()
self._settle_far(lp, d=30.0)
out = _step(lp, FakeLead(status=True, d_rel=60.0, v_rel=-1.0, v_lead=24.0))
assert out.leadOne.dRel == 60.0

View File

@@ -0,0 +1,96 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import RadarDistanceController
from openpilot.sunnypilot.selfdrive.controls.lib.lead_persistence.lead_persistence import _HOLD_FRAMES
class FakeLead:
def __init__(self, status=False, d_rel=0.0, v_rel=0.0, v_lead=0.0, a_lead=0.0,
a_tau=0.0, model_prob=0.95, y_rel=0.0, a_rel=0.0, fcw=False):
self.status = status
self.dRel = d_rel
self.yRel = y_rel
self.vRel = v_rel
self.vLead = v_lead
self.aLeadK = a_lead
self.aLeadTau = a_tau
self.modelProb = model_prob
self.aRel = a_rel
self.fcw = fcw
class FakeRadarState:
def __init__(self, lead_one=None, lead_two=None):
self.leadOne = lead_one or FakeLead()
self.leadTwo = lead_two or FakeLead()
class FakeSM:
def __init__(self, radarstate):
self._data = {'radarState': radarstate}
def __getitem__(self, k):
return self._data[k]
def _make(enabled=True):
c = RadarDistanceController()
c.set_enabled(enabled)
return c
class TestRadarDistanceController:
def test_toggle(self):
c = _make(enabled=False)
assert c.is_enabled() is False
assert c.toggle() is True
assert c.is_enabled() is True
assert c.toggle() is False
def test_disabled_passthrough(self):
c = _make(enabled=False)
rs = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
c.update(FakeSM(rs))
assert c.smooth_radarstate(rs) is rs
def test_no_sm_safe(self):
c = _make()
c.update(None, None) # must not raise
def test_holds_dropped_lead(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0, v_lead=20.0, a_lead=-1.0))
for _ in range(5):
c.update(FakeSM(rs_on))
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is True
assert out.leadOne.dRel == 30.0
assert out.leadOne.aLeadK == -1.0
def test_hold_expires(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
c.update(FakeSM(rs_on))
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
for _ in range(_HOLD_FRAMES + 2):
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is False
def test_reset_clears_hold(self):
c = _make()
rs_on = FakeRadarState(lead_one=FakeLead(status=True, d_rel=30.0))
for _ in range(5):
c.update(FakeSM(rs_on))
c.reset()
rs_off = FakeRadarState(lead_one=FakeLead(status=False))
c.update(FakeSM(rs_off))
out = c.smooth_radarstate(rs_off)
assert out.leadOne.status is False

View File

@@ -0,0 +1,99 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.relc import (
RoadEdgeLaneChangeController, EDGE_REACTION_TIME, EDGE_CLEAR_TIME, MIN_SPEED,
)
V_HIGH = MIN_SPEED + 2.0
V_LOW = MIN_SPEED - 1.0
@pytest.fixture
def relc(mocker):
mock_params = mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.relc.Params")
mock_params.return_value.get_bool.return_value = True
controller = RoadEdgeLaneChangeController(DesireHelper())
controller.enabled = True
return controller
def drive(controller, road_edge_stds, lane_line_probs, seconds, v_ego=V_HIGH):
for _ in range(int(seconds / DT_MDL) + 1):
controller.update(road_edge_stds, lane_line_probs, v_ego)
@pytest.mark.parametrize("road_edge_stds,lane_line_probs,attr", [
([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], "left_edge_detected"),
([0.9, 0.0], [0.8, 0.8, 0.8, 0.0], "right_edge_detected"),
])
def test_edge_detection(relc, road_edge_stds, lane_line_probs, attr):
drive(relc, road_edge_stds, lane_line_probs, EDGE_REACTION_TIME + 0.1)
assert getattr(relc, attr)
def test_edge_detection_requires_time(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME - 0.05)
assert not relc.left_edge_detected
def test_both_edges_detected(relc):
drive(relc, [0.0, 0.0], [0.0, 0.8, 0.8, 0.0], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
assert relc.right_edge_detected
def test_noise_doesnt_clear(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update(*clear, V_HIGH)
relc.update(*edge, V_HIGH)
assert relc.left_edge_detected
def test_clears_after_window(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
drive(relc, *clear, EDGE_CLEAR_TIME + 0.05)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_low_speed_skips(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1, v_ego=V_LOW)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_speed_drop_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_LOW)
assert not relc.left_edge_detected
def test_param_off_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.params.get_bool.return_value = False
relc.read_params()
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_HIGH)
assert not relc.left_edge_detected
assert not relc.right_edge_detected

View File

@@ -243,4 +243,12 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.prompt, 3.),
},
EventNameSP.laneChangeRoadEdge: {
ET.WARNING: Alert(
"Lane Change Unavailable: Road Edge",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 0.1),
},
}

View File

@@ -1118,6 +1118,10 @@
"title": "Record Front Lock",
"description": ""
},
"RoadEdgeLaneChangeEnabled": {
"title": "Block Lane Change: Road Edge Detection",
"description": ""
},
"RoadName": {
"title": "Road Name",
"description": ""
@@ -1323,6 +1327,22 @@
"max": 5.0,
"step": 0.1,
"unit": "m/s\u00b2"
},
"ToyotaAutoHold": {
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"description": ""
},
"ToyotaEnhancedBsm": {
"title": "Toyota: Prius TSS2 BSM and some tssp",
"description": ""
},
"ToyotaTSS2Long": {
"title": "Toyota: custom longitudinal for TSS2",
"description": ""
},
"ToyotaDriveMode": {
"title": "Enable drive mode btn link",
"description": ""
},
"ToyotaEnforceStockLongitudinal": {
"title": "Toyota: Enforce Factory Longitudinal Control",

View File

@@ -519,6 +519,12 @@
}
]
},
{
"key": "RoadEdgeLaneChangeEnabled",
"widget": "toggle",
"title": "Block Lane Change: Road Edge Detection",
"description": "Blocks lane change when the model sees a road edge on the side you signal."
},
{
"key": "AutoLaneChangeBsmDelay",
"widget": "toggle",
@@ -620,6 +626,59 @@
}
]
},
{
"key": "AccelPersonalityEnabled",
"widget": "toggle",
"title": "Enable Acceleration Profile",
"description": "When off, sunnypilot uses its default (Normal) acceleration and deceleration behavior. Enable to use the Acceleration Profile below.",
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "AccelPersonality",
"widget": "multiple_button",
"title": "Acceleration Profile",
"description": "Controls how sunnypilot accelerates and eases into non-emergency deceleration.",
"options": [
{
"value": 0,
"label": "Eco"
},
{
"value": 1,
"label": "Normal"
},
{
"value": 2,
"label": "Sport"
}
],
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "RadarDistance",
"widget": "toggle",
"title": "Radar Lead Smoothing",
"description": "Holds radar leads through brief dropouts, ignores close-range radar ghosts, and eases braking when the radar lead target jumps, for smoother lead following. Emergency braking is unaffected.",
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "IntelligentCruiseButtonManagement",
"widget": "toggle",
@@ -2001,6 +2060,22 @@
"equals": true
}
]
},
{
"key": "PlanplusControl",
"widget": "option",
"title": "Plan Plus Controls",
"description": "Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover to lane center; too high and it will ping-pong.",
"min": 0.0,
"max": 2.0,
"step": 0.1,
"enablement": [
{
"type": "param",
"key": "ShowAdvancedControls",
"equals": true
}
]
}
]
},
@@ -2168,6 +2243,50 @@
"title": "Toyota / Lexus Settings",
"description": "",
"items": [
{
"key": "ToyotaAutoHold",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnhancedBsm",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Prius TSS2 BSM and some tssp",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaTSS2Long",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: custom longitudinal for TSS2",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaDriveMode",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Enable drive mode btn link",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnforceStockLongitudinal",
"widget": "toggle",

View File

@@ -43,6 +43,31 @@ sections:
label: Relaxed
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonalityEnabled
widget: toggle
title: Enable Acceleration Profile
description: When off, sunnypilot uses its default (Normal) acceleration and deceleration behavior. Enable to use the Acceleration Profile below.
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonality
widget: multiple_button
title: Acceleration Profile
description: Controls how sunnypilot accelerates and eases into non-emergency deceleration.
options:
- value: 0
label: Eco
- value: 1
label: Normal
- value: 2
label: Sport
enablement:
- $ref: '#/macros/longitudinal'
- key: RadarDistance
widget: toggle
title: Radar Lead Smoothing
description: Holds radar leads through brief dropouts, ignores close-range radar ghosts, and eases braking when the radar lead target jumps, for smoother lead following. Emergency braking is unaffected.
enablement:
- $ref: '#/macros/longitudinal'
- key: IntelligentCruiseButtonManagement
widget: toggle
title: Intelligent Cruise Button Management (ICBM) (Alpha)

View File

@@ -51,6 +51,16 @@ sections:
key: LagdToggle
equals: true
- $ref: '#/macros/advanced_only'
- key: PlanplusControl
widget: option
title: Plan Plus Controls
description: Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover
to lane center; too high and it will ping-pong.
min: 0.0
max: 2.0
step: 0.1
enablement:
- $ref: '#/macros/advanced_only'
- id: lateral_control
title: Lateral Control
description: Neural network lateral control for supported models

View File

@@ -242,6 +242,10 @@ sections:
label: 2 seconds
- value: 5
label: 3 seconds
- key: RoadEdgeLaneChangeEnabled
widget: toggle
title: 'Block Lane Change: Road Edge Detection'
description: Blocks lane change when the model sees a road edge on the side you signal.
- key: AutoLaneChangeBsmDelay
widget: toggle
title: 'Auto Lane Change: Delay with Blind Spot'

View File

@@ -60,6 +60,30 @@ sections:
title: Toyota / Lexus Settings
description: ''
items:
- key: ToyotaAutoHold
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnhancedBsm
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Prius TSS2 BSM and some tssp'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaTSS2Long
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: custom longitudinal for TSS2'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaDriveMode
widget: toggle
needs_onroad_cycle: true
title: Enable drive mode btn link
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnforceStockLongitudinal
widget: toggle
needs_onroad_cycle: true

View File

@@ -45,8 +45,9 @@ class ScrollState(Enum):
class GuiScrollPanel2:
def __init__(self, horizontal: bool = True) -> None:
def __init__(self, horizontal: bool = True, handle_out_of_bounds: bool = True) -> None:
self._horizontal = horizontal
self._handle_out_of_bounds = handle_out_of_bounds
self._state = ScrollState.STEADY
self._offset: rl.Vector2 = rl.Vector2(0, 0)
self._initial_click_event: MouseEvent | None = None
@@ -98,7 +99,7 @@ class GuiScrollPanel2:
# simple exponential return if out of bounds
# out of bounds is handled by snapping, so skip if set
out_of_bounds = self.get_offset() > max_offset or self.get_offset() < min_offset
if out_of_bounds and snap_target is None:
if out_of_bounds and snap_target is None and self._handle_out_of_bounds:
target = max_offset if self.get_offset() > max_offset else min_offset
dt = rl.get_frame_time() or 1e-6

View File

@@ -75,7 +75,6 @@ class _Scroller(Widget):
self._items: list[Widget] = []
self._horizontal = horizontal
self._snap_items = snap_items
assert not self._snap_items or self._horizontal, "Snapping is only supported for horizontal scrolling"
self._spacing = spacing
self._pad = pad
@@ -191,8 +190,12 @@ class _Scroller(Widget):
snap_target: float | None = None
if self._snap_items and visible_items and self._scrolling_to[0] is None:
# TODO: this doesn't handle two small buttons at the edges well
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
if self._horizontal:
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
else:
center_pos = self._rect.y + self._rect.height / 2
closest_delta_pos = min((((item.rect.y + item.rect.height / 2) - center_pos) for item in visible_items), key=abs)
snap_target = self.scroll_panel.get_offset() - closest_delta_pos
return self.scroll_panel.update(self._rect, content_size, snap_target=snap_target)

View File

@@ -0,0 +1,58 @@
<?xml version='1.0' encoding='UTF-8'?>
<root>
<tabbed_widget name="Main Window" parent="main_window">
<Tab tab_name="Acceleration Personality" containers="1">
<Container>
<DockSplitter orientation="-" sizes="0.25;0.25;0.25;0.25" count="4">
<DockArea name="accel / decel">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="2.000000" left="150.000000" bottom="-4.000000"/>
<limitY/>
<curve name="/longitudinalPlan/aTarget" color="#1f77b4"/>
<curve name="/carState/aEgo" color="#f14cc1"/>
<curve name="/longitudinalPlanSP/acceleration/maxAccel" color="#1ac938"/>
</plot>
</DockArea>
<DockArea name="accel personality (gating)">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="1.300000" left="150.000000" bottom="-0.200000"/>
<limitY/>
<curve name="/longitudinalPlanSP/acceleration/jerkFactorScale" color="#9467bd"/>
<curve name="/longitudinalPlanSP/acceleration/tFollowOffset" color="#ff7f0e"/>
<curve name="/longitudinalPlanSP/acceleration/personality" color="#d62728"/>
<curve name="/longitudinalPlanSP/acceleration/enabled" color="#17becf"/>
</plot>
</DockArea>
<DockArea name="lead">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="150.000000" left="150.000000" bottom="-25.000000"/>
<limitY/>
<curve name="/radarState/leadOne/dRel" color="#1f77b4"/>
<curve name="/radarState/leadOne/vRel" color="#d62728"/>
</plot>
</DockArea>
<DockArea name="speed">
<plot flip_y="false" flip_x="false" mode="TimeSeries" style="Lines">
<range right="175.000000" top="25.000000" left="150.000000" bottom="0.000000"/>
<limitY/>
<curve name="/carState/vEgo" color="#1ac938"/>
</plot>
</DockArea>
</DockSplitter>
</Container>
</Tab>
<currentTabIndex index="0"/>
</tabbed_widget>
<use_relative_time_offset enabled="1"/>
<!-- - - - - - - - - - - - - - - -->
<!-- - - - - - - - - - - - - - - -->
<Plugins>
<plugin ID="DataLoad Rlog"/>
<plugin ID="Cereal Subscriber"/>
</Plugins>
<!-- - - - - - - - - - - - - - - -->
<!-- - - - - - - - - - - - - - - -->
<customMathEquations/>
<snippets/>
<!-- - - - - - - - - - - - - - - -->
</root>