Compare commits

...

5 Commits

Author SHA1 Message Date
Jason Wen
7e698e8aba new cap states 2026-04-17 01:49:29 -04:00
Jason Wen
e6f6e32ccf ui 2026-04-16 17:39:17 -04:00
Jason Wen
6b9a9b9075 update cap 2026-04-16 17:00:58 -04:00
Jason Wen
f6b02693a1 slight cleanup 2026-04-15 18:27:32 -04:00
Jason Wen
e0a7c57fab init 2026-04-15 17:25:01 -04:00
16 changed files with 1745 additions and 138 deletions

View File

@@ -267,6 +267,9 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
active @2 :Bool;
vTarget @3 :Float32;
aTarget @4 :Float32;
capDelta @5 :Float32; # Difference between cluster set-speed and cap (m/s), positive = driver above cap
targetCap @6 :Float32; # Speed limit cap being enforced (m/s)
disableReason @7 :AssistDisableReason;
}
enum Source {
@@ -282,6 +285,19 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
pending @3; # Awaiting new speed limit.
adapting @4; # Reducing speed to match new speed limit.
active @5; # Cruising at speed limit.
capping @6; # Silently capping speed based on limit.
tempPaused @7; # Temporarily paused by user.
}
enum AssistDisableReason {
none @0;
userCancel @1;
userTempPause @2;
longOverride @3;
belowFloor @4;
autoResume @5;
mapGap @6;
gateDisabled @7;
}
}
@@ -342,6 +358,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
speedLimitCapActive @24;
}
}

View File

@@ -261,6 +261,9 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"SpeedLimitOffsetType", {PERSISTENT | BACKUP, INT, "0"}},
{"SpeedLimitPolicy", {PERSISTENT | BACKUP, INT, "3"}},
{"SpeedLimitValueOffset", {PERSISTENT | BACKUP, INT, "0"}},
{"SpeedLimitUpshiftAccept", {PERSISTENT | BACKUP, INT, "0"}},
{"SpeedLimitMinCapFloor", {PERSISTENT | BACKUP, INT, "25"}},
{"SpeedLimitCapAudioCue", {PERSISTENT | BACKUP, INT, "1"}},
// Smart Cruise Control
{"MapTargetVelocities", {CLEAR_ON_ONROAD_TRANSITION, STRING}},

View File

@@ -21,6 +21,8 @@ from openpilot.system.ui.widgets.scroller_tici import Scroller
SPEED_LIMIT_MODE_BUTTONS = [tr("Off"), tr("Info"), tr("Warning"), tr("Assist")]
SPEED_LIMIT_OFFSET_TYPE_BUTTONS = [tr("None"), tr("Fixed"), tr("%")]
SPEED_LIMIT_UPSHIFT_ACCEPT_BUTTONS = [tr("Never Raise"), tr("Accel Pedal Confirm")]
SPEED_LIMIT_CAP_AUDIO_CUE_BUTTONS = [tr("Off"), tr("On")]
SPEED_LIMIT_MODE_DESCRIPTIONS = [
tr("Off: Disables the Speed Limit functions."),
@@ -35,6 +37,16 @@ SPEED_LIMIT_OFFSET_DESCRIPTIONS = [
tr("Percent: Adds a percent offset [Speed Limit + (Offset % Speed Limit)]"),
]
SPEED_LIMIT_UPSHIFT_ACCEPT_DESCRIPTIONS = [
tr("Never Raise: Keeps the current cap when the speed limit changes."),
tr("Accel Pedal Confirm: Accepts new speed limit cap when you release the accelerator pedal."),
]
SPEED_LIMIT_CAP_AUDIO_CUE_DESCRIPTIONS = [
tr("Off: No audio cue when entering speed limit capping mode."),
tr("On: Plays a low chime when entering speed limit capping mode."),
]
class PanelType(IntEnum):
SETTINGS = 0
@@ -86,13 +98,42 @@ class SpeedLimitSettingsLayout(Widget):
label_callback=self._get_offset_label,
)
self._speed_limit_upshift_accept = multiple_button_item_sp(
title=lambda: tr("Speed Limit Cap Upshift"),
description=self._get_upshift_accept_description,
buttons=SPEED_LIMIT_UPSHIFT_ACCEPT_BUTTONS,
param="SpeedLimitUpshiftAccept",
button_width=500,
)
self._speed_limit_min_cap_floor = option_item_sp(
title=lambda: tr("Speed Limit Cap Floor"),
param="SpeedLimitMinCapFloor",
min_value=0,
max_value=40,
description=self._get_min_cap_floor_description,
label_callback=self._get_min_cap_floor_label,
)
self._speed_limit_cap_audio_cue = multiple_button_item_sp(
title=lambda: tr("Speed Limit Cap Audio Cue"),
description=self._get_cap_audio_cue_description,
buttons=SPEED_LIMIT_CAP_AUDIO_CUE_BUTTONS,
param="SpeedLimitCapAudioCue",
button_width=450,
)
items = [
self._speed_limit_mode,
LineSeparatorSP(40),
self._source_button,
LineSeparatorSP(40),
self._speed_limit_offset_type,
self._speed_limit_value_offset
self._speed_limit_value_offset,
LineSeparatorSP(40),
self._speed_limit_upshift_accept,
self._speed_limit_min_cap_floor,
self._speed_limit_cap_audio_cue,
]
return items
@@ -120,6 +161,23 @@ class SpeedLimitSettingsLayout(Widget):
return f"{value} {unit}"
return str(value)
@staticmethod
def _get_upshift_accept_description():
return get_highlighted_description(ui_state.params, "SpeedLimitUpshiftAccept", SPEED_LIMIT_UPSHIFT_ACCEPT_DESCRIPTIONS)
@staticmethod
def _get_min_cap_floor_description():
return ""
@staticmethod
def _get_min_cap_floor_label(value):
unit = tr("km/h") if ui_state.is_metric else tr("mph")
return f"{value} {unit}"
@staticmethod
def _get_cap_audio_cue_description():
return get_highlighted_description(ui_state.params, "SpeedLimitCapAudioCue", SPEED_LIMIT_CAP_AUDIO_CUE_DESCRIPTIONS)
def _update_state(self):
super()._update_state()
@@ -128,6 +186,7 @@ class SpeedLimitSettingsLayout(Widget):
brand = ui_state.CP.brand
has_long = ui_state.has_longitudinal_control
has_icbm = ui_state.has_icbm
pcm_op_long = has_long and ui_state.CP.pcmCruise
"""
Speed Limit Assist is available when:
@@ -144,6 +203,7 @@ class SpeedLimitSettingsLayout(Widget):
else:
sla_available = False
pcm_op_long = False
if not sla_available:
self._speed_limit_mode.action_item.set_enabled_buttons({
@@ -157,6 +217,10 @@ class SpeedLimitSettingsLayout(Widget):
offset_type = ui_state.params.get("SpeedLimitOffsetType", return_default=True)
self._speed_limit_value_offset.set_visible(offset_type != int(SpeedLimitOffsetType.off))
self._speed_limit_upshift_accept.set_visible(pcm_op_long)
self._speed_limit_min_cap_floor.set_visible(pcm_op_long)
self._speed_limit_cap_audio_cue.set_visible(pcm_op_long)
def _render(self, rect):
if self._current_panel == PanelType.POLICY:
self._policy_layout.render(rect)

View File

@@ -6,6 +6,7 @@ See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from cereal import custom
from openpilot.common.constants import CV
from openpilot.selfdrive.ui.mici.onroad.torque_bar import TorqueBar
from openpilot.selfdrive.ui.sunnypilot.onroad.developer_ui import DeveloperUiRenderer, DeveloperUiState, get_bottom_dev_ui_offset
@@ -23,6 +24,7 @@ from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.lib.text_measure import measure_text_cached
SLA_ACTIVE_COLOR = rl.Color(0x91, 0x9b, 0x95, 0xff)
AssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
class HudRendererSP(HudRenderer):
@@ -89,9 +91,14 @@ class HudRendererSP(HudRenderer):
set_speed_color = COLORS.DARK_GREY
if self.is_cruise_set:
set_speed_color = COLORS.WHITE
if long_plan_sp.speedLimit.assist.active:
assist_state = long_plan_sp.speedLimit.assist.state
# Green for active/adapting/capping states, grey for tempPaused when override, else normal
if assist_state in (AssistState.active, AssistState.adapting, AssistState.capping):
set_speed_color = SLA_ACTIVE_COLOR if long_override else rl.Color(0, 0xff, 0, 0xff)
max_color = SLA_ACTIVE_COLOR if long_override else rl.Color(0x80, 0xd8, 0xa6, 0xff)
elif assist_state == AssistState.tempPaused and long_override:
set_speed_color = SLA_ACTIVE_COLOR
max_color = SLA_ACTIVE_COLOR
else:
if ui_state.status == UIStatus.ENGAGED:
max_color = COLORS.ENGAGED

View File

@@ -7,6 +7,7 @@ See the LICENSE.md file in the root directory for more details.
from dataclasses import dataclass
from enum import StrEnum
import time
import pyray as rl
from cereal import custom
@@ -28,6 +29,7 @@ SET_SPEED_NA = 255
KM_TO_MILE = 0.621371
AssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
AssistDisableReason = custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason
SpeedLimitSource = custom.LongitudinalPlanSP.SpeedLimit.Source
@@ -40,6 +42,7 @@ class Colors:
DARK_GREY = rl.Color(77, 77, 77, 255)
SUB_BG = rl.Color(0, 0, 0, 180)
MUTCD_LINES = rl.Color(255, 255, 255, 100)
AMBER = rl.Color(255, 176, 0, 255)
class IconSide(StrEnum):
@@ -110,6 +113,11 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
self.speed_limit_ahead_valid = False
self.speed_limit_ahead_frame = 0
self.cap_delta = 0.0
self.target_cap = 0.0
self.disable_reason = AssistDisableReason.none
self.disable_reason_timestamp = 0.0
self.is_cruise_set: bool = False
self.is_cruise_available: bool = True
self.set_speed: float = SET_SPEED_NA
@@ -145,6 +153,10 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
self.speed_limit_final_last = resolver.speedLimitFinalLast * self.speed_conv
self.speed_limit_source = resolver.source
self.speed_limit_assist_state = assist.state
self.cap_delta = assist.capDelta
self.target_cap = assist.targetCap * self.speed_conv
self.disable_reason = assist.disableReason
self.disable_reason_timestamp = time.monotonic()
if sm.updated["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
@@ -194,6 +206,15 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
self._draw_sign_main(sign_rect, alpha)
if self.speed_limit_assist_state == AssistState.preActive:
self._draw_pre_active_arrow(sign_rect)
elif self.speed_limit_assist_state == AssistState.tempPaused:
self._draw_temp_paused_icon(sign_rect)
elif self.speed_limit_assist_state == AssistState.capping:
self._draw_cap_badge(sign_rect)
# Also draw ahead info if valid and different from cap (mutual exclusion fix)
if self.speed_limit_ahead_valid and round(self.speed_limit_ahead) != round(self.target_cap):
ahead_info_y = sign_rect.y + sign_rect.height + 10 + 160 + 10
ahead_rect = rl.Rectangle(sign_rect.x, ahead_info_y, sign_rect.width, 100)
self._draw_ahead_info(ahead_rect)
else:
self._draw_ahead_info(sign_rect)
@@ -229,6 +250,38 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
color = rl.Color(255, 255, 255, int(icon_alpha))
rl.draw_texture_ex(txt_icon, rl.Vector2(arrow_x, arrow_y), 0.0, 1.0, color)
def _draw_temp_paused_icon(self, sign_rect):
"""Draw greyed preActive icon when tempPaused."""
# Reuse preActive icon with grey alpha
icon_alpha = 128 # 50% opacity for paused state
txt_icon = self.arrow_blank # Use blank/greyed version
sign_margin = 12
arrow_spacing = int(sign_margin * 1.4)
arrow_x = sign_rect.x + sign_rect.width + arrow_spacing
arrow_y = sign_rect.y + (sign_rect.height - txt_icon.height) / 2
color = rl.Color(145, 155, 149, icon_alpha) # GREY color with alpha
rl.draw_texture_ex(txt_icon, rl.Vector2(arrow_x, arrow_y), 0.0, 1.0, color)
def _draw_cap_badge(self, sign_rect):
"""Draw CAP info panel below speed limit sign during capping."""
rect = rl.Rectangle(sign_rect.x + (sign_rect.width - 170) / 2, sign_rect.y + sign_rect.height + 10, 170, 160)
rl.draw_rectangle_rounded(rect, 0.35, 10, Colors.SUB_BG)
rl.draw_rectangle_rounded_lines_ex(rect, 0.35, 10, 3, Colors.MUTCD_LINES)
mid_x = rect.x + rect.width / 2
label_color = Colors.AMBER if self.cap_delta > 0.5 else Colors.GREY
self._draw_text_centered(self.font_demi, "CAP", 40, rl.Vector2(mid_x, rect.y + 28), label_color)
cap_speed = round(self.target_cap)
self._draw_text_centered(self.font_bold, str(cap_speed), 70, rl.Vector2(mid_x, rect.y + 82), Colors.WHITE)
if self.cap_delta > 0.5:
delta_display = round(self.cap_delta * self.speed_conv)
delta_unit = 'km/h' if ui_state.is_metric else 'mph'
delta_text = f'-{delta_display} {delta_unit}'
self._draw_text_centered(self.font_norm, delta_text, 36, rl.Vector2(mid_x, rect.y + 134), Colors.GREY)
def _render_vienna(self, rect, val, sub, color, has_limit, alpha=1.0):
center = rl.Vector2(rect.x + rect.width / 2, rect.y + rect.height / 2)
radius = (rect.width + 18) / 2

View File

@@ -60,7 +60,7 @@ class LongitudinalPlannerSP:
# Speed Limit Assist
has_speed_limit = self.resolver.speed_limit_valid or self.resolver.speed_limit_last_valid
self.sla.update(long_enabled, long_override, v_ego, a_ego, v_cruise_cluster, self.resolver.speed_limit,
self.resolver.speed_limit_final_last, has_speed_limit, self.resolver.distance, self.events_sp)
self.resolver.speed_limit_final_last, has_speed_limit, self.resolver.distance, self.events_sp, CS.gasPressed)
targets = {
LongitudinalPlanSource.cruise: (v_cruise, a_ego),
@@ -132,6 +132,9 @@ class LongitudinalPlannerSP:
assist.active = self.sla.is_active
assist.vTarget = float(self.sla.output_v_target)
assist.aTarget = float(self.sla.output_a_target)
assist.capDelta = float(self.sla.cap_delta)
assist.targetCap = float(self.sla._target_cap)
assist.disableReason = self.sla.disable_reason
# E2E Alerts
e2eAlerts = longitudinalPlanSP.e2eAlerts

View File

@@ -17,3 +17,8 @@ CONFIRM_SPEED_THRESHOLD = {
True: 80, # km/h
False: 50, # mph
}
MIN_CAP_FLOOR_MAX = {
True: 64, # km/h
False: 40, # mph
}

View File

@@ -27,3 +27,8 @@ class Mode(IntEnumBase):
information = 1
warning = 2
assist = 3
class UpshiftAccept(IntEnumBase):
NEVER_RAISE = 0
ACCEL_PEDAL = 1

View File

@@ -9,6 +9,7 @@ from cereal import custom, car
from openpilot.common.constants import CV
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode as SpeedLimitMode
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit import MIN_CAP_FLOOR_MAX
def compare_cluster_target(v_cruise_cluster: float, target_set_speed: float, is_metric: bool) -> tuple[bool, bool]:
@@ -42,3 +43,9 @@ def set_speed_limit_assist_availability(CP: car.CarParams, CP_SP: custom.CarPara
params.put("SpeedLimitMode", int(SpeedLimitMode.warning))
return allowed
def get_min_cap_floor(params: Params, is_metric: bool) -> float:
value = params.get("SpeedLimitMinCapFloor", return_default=True)
value = max(0, min(value, MIN_CAP_FLOOR_MAX[is_metric]))
return value * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)

View File

@@ -15,16 +15,20 @@ from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.sunnypilot import PARAMS_UPDATE_PERIOD
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit import PCM_LONG_REQUIRED_MAX_SET_SPEED, CONFIRM_SPEED_THRESHOLD
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import compare_cluster_target, set_speed_limit_assist_availability
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode, UpshiftAccept
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import compare_cluster_target, set_speed_limit_assist_availability, \
get_min_cap_floor
ButtonType = car.CarState.ButtonEvent.Type
EventNameSP = custom.OnroadEventSP.EventName
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
AssistDisableReason = custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason
SpeedLimitSource = custom.LongitudinalPlanSP.SpeedLimit.Source
ACTIVE_STATES = (SpeedLimitAssistState.active, SpeedLimitAssistState.adapting)
ENABLED_STATES = (SpeedLimitAssistState.preActive, SpeedLimitAssistState.pending, *ACTIVE_STATES)
CAP_ACTIVE_STATES = (SpeedLimitAssistState.capping,)
CAP_ENABLED_STATES = CAP_ACTIVE_STATES # cap mode has no partially-engaged state
DISABLED_GUARD_PERIOD = 0.5 # secs.
# secs. Time to wait after activation before considering temp deactivation signal.
@@ -33,6 +37,10 @@ PRE_ACTIVE_GUARD_PERIOD = {
False: 5,
}
SPEED_LIMIT_CHANGED_HOLD_PERIOD = 1 # secs. Time to wait after speed limit change before switching to preActive.
CAP_RAISE_HOLD_PERIOD = 0.2 # secs. Time to confirm limit raise before upshifting.
CAP_SUSPEND_GUARD_PERIOD = 1.0 # secs. Time to hold cap disabled after long_override release.
USER_PAUSE_TIMEOUT_TICKS = 6000 # 5 min / DT_MDL (0.05 s) = 6000 ticks
RESUME_CLUSTER_DELTA_THRESHOLD = 1 # integer display-unit delta (kph or mph)
LIMIT_MIN_ACC = -1.5 # m/s^2 Maximum deceleration allowed for limit controllers to provide.
LIMIT_MAX_ACC = 1.0 # m/s^2 Maximum acceleration allowed for limit controllers to provide while active.
@@ -51,6 +59,7 @@ class SpeedLimitAssist:
v_ego: float
a_ego: float
v_offset: float
cap_delta: float
def __init__(self, CP: car.CarParams, CP_SP: custom.CarParamsSP):
self.params = Params()
@@ -64,10 +73,12 @@ class SpeedLimitAssist:
self.enabled = self.params.get("SpeedLimitMode", return_default=True) == Mode.assist
self.long_enabled = False
self.long_enabled_prev = False
self.long_override = False
self.is_enabled = False
self.is_active = False
self.output_v_target = V_CRUISE_UNSET
self.output_a_target = 0.
self.cap_delta = 0.0
self.v_ego = 0.
self.a_ego = 0.
self.v_offset = 0.
@@ -92,8 +103,28 @@ class SpeedLimitAssist:
self._minus_hold = 0.
self._last_carstate_ts = 0.
self._cap_change_timer = 0
self._cap_suspended_timer = 0
self._cap_below_floor = False
self._target_cap = 0.0
self._cap_upshift_pressed = False
self._cap_upshift_release_timer = 0
self._cap_audio_cue_fired = False
self._cap_raise_accepted = False
self._accel_pressed = False
self._was_cap_suspended = False
self._override_active_last = False
self._min_cap_floor = get_min_cap_floor(self.params, self.is_metric)
self._cap_upshift_accept = self.params.get("SpeedLimitUpshiftAccept", return_default=True)
self._cap_audio_cue_enabled = bool(self.params.get("SpeedLimitCapAudioCue", return_default=True))
self._user_paused: bool = False
self._user_paused_timer: int = 0
self._disable_reason = AssistDisableReason.none
self._speed_limit_final_last_at_pause = 0.
self.tempPaused_count = 0 # diagnostic counter for tests
# TODO-SP: SLA's own output_a_target for planner
# Solution functions mapped to respective states
self.acceleration_solutions = {
SpeedLimitAssistState.disabled: self.get_current_acceleration_as_target,
SpeedLimitAssistState.inactive: self.get_current_acceleration_as_target,
@@ -101,8 +132,26 @@ class SpeedLimitAssist:
SpeedLimitAssistState.pending: self.get_current_acceleration_as_target,
SpeedLimitAssistState.adapting: self.get_adapting_state_target_acceleration,
SpeedLimitAssistState.active: self.get_active_state_target_acceleration,
SpeedLimitAssistState.capping: self.get_current_acceleration_as_target,
SpeedLimitAssistState.tempPaused: self.get_current_acceleration_as_target,
}
@property
def disable_reason(self):
return self._disable_reason
@property
def _gates_pass(self) -> bool:
return self.long_enabled and self.enabled
@property
def _cap_gates_pass(self) -> bool:
return self._gates_pass and not self.long_override and self._cap_suspended_timer <= 0
@property
def _cap_entry_ready(self) -> bool:
return self._has_speed_limit and not self._cap_below_floor
@property
def speed_limit_changed(self) -> bool:
return self._has_speed_limit and bool(self._speed_limit != self.speed_limit_prev)
@@ -126,13 +175,13 @@ class SpeedLimitAssist:
events_sp.add(EventNameSP.speedLimitActive)
def get_v_target_from_control(self) -> float:
if self._has_speed_limit:
if self.pcm_op_long and self.is_enabled:
return self._speed_limit_final_last
if not self.pcm_op_long and self.is_active:
if self.pcm_op_long:
if self.state == SpeedLimitAssistState.capping:
return min(self.v_cruise_cluster, self._target_cap)
else:
if self._has_speed_limit and self.is_active:
return self._speed_limit_final_last
# Fallback
return V_CRUISE_UNSET
# TODO-SP: SLA's own output_a_target for planner
@@ -144,6 +193,9 @@ class SpeedLimitAssist:
self.is_metric = self.params.get_bool("IsMetric")
set_speed_limit_assist_availability(self.CP, self.CP_SP, self.params)
self.enabled = self.params.get("SpeedLimitMode", return_default=True) == Mode.assist
self._min_cap_floor = get_min_cap_floor(self.params, self.is_metric)
self._cap_upshift_accept = self.params.get("SpeedLimitUpshiftAccept", return_default=True)
self._cap_audio_cue_enabled = bool(self.params.get("SpeedLimitCapAudioCue", return_default=True))
def update_car_state(self, CS: car.CarState) -> None:
now = time.monotonic()
@@ -187,7 +239,12 @@ class SpeedLimitAssist:
cst_high
pcm_long_required_max_set_speed_conv = round(pcm_long_required_max * speed_conv)
self.target_set_speed_conv = pcm_long_required_max_set_speed_conv if self.pcm_op_long else self.speed_limit_final_last_conv
if self.pcm_op_long and self.state not in CAP_ACTIVE_STATES:
self.target_set_speed_conv = pcm_long_required_max_set_speed_conv
elif not self.pcm_op_long:
self.target_set_speed_conv = self.speed_limit_final_last_conv
else:
self.target_set_speed_conv = self.v_cruise_cluster_conv
@property
def apply_confirm_speed_threshold(self) -> bool:
@@ -232,74 +289,142 @@ class SpeedLimitAssist:
return self._get_button_release(req_plus, req_minus)
def update_state_machine_pcm_op_long(self):
self.long_engaged_timer = max(0, self.long_engaged_timer - 1)
self.pre_active_timer = max(0, self.pre_active_timer - 1)
def _cap_limit_change_held(self) -> bool:
"""Return True when limit-change hold period has elapsed."""
return self._cap_change_timer >= int(SPEED_LIMIT_CHANGED_HOLD_PERIOD / DT_MDL)
# ACTIVE, ADAPTING, PENDING, PRE_ACTIVE, INACTIVE
def _cap_upshift_release_edge(self) -> bool:
"""Return True when limit-raise hold period elapsed after gas release edge."""
if self._cap_upshift_pressed and not self._accel_pressed:
self._cap_upshift_release_timer = int(CAP_RAISE_HOLD_PERIOD / DT_MDL)
self._cap_upshift_pressed = self._accel_pressed
if self._cap_upshift_release_timer > 0:
self._cap_upshift_release_timer = max(0, self._cap_upshift_release_timer - 1)
if self._cap_upshift_release_timer <= 0:
return True
return False
def _go_disabled(self, reason: 'AssistDisableReason') -> None:
"""Transition to disabled state with given reason."""
self._cap_raise_accepted = False
self.state = SpeedLimitAssistState.disabled
self._disable_reason = reason
def _should_exit_temp_pause(self) -> bool:
"""Check if conditions warrant exiting temp pause state."""
limit_changed = self._speed_limit_final_last != self._speed_limit_final_last_at_pause
timer_expired = self._user_paused_timer <= 0
cluster_realigned = abs(self.v_cruise_cluster_conv - self.speed_limit_final_last_conv) <= RESUME_CLUSTER_DELTA_THRESHOLD
return limit_changed or timer_expired or cluster_realigned
def update_state_machine_cap(self, events_sp: EventsSP) -> tuple[bool, bool]:
"""Cap mode FSM for pcm_op_long cars. Returns (enabled, active)."""
# Bookkeeping: timers, override flags (unchanged)
self._cap_change_timer = min(self._cap_change_timer + 1,
int((SPEED_LIMIT_CHANGED_HOLD_PERIOD + 1) / DT_MDL))
self._cap_upshift_release_timer = max(0, self._cap_upshift_release_timer - 1)
self._user_paused_timer = max(0, self._user_paused_timer - 1)
if self._override_active_last and not self.long_override and self._was_cap_suspended:
self._cap_suspended_timer = int(CAP_SUSPEND_GUARD_PERIOD / DT_MDL)
elif not self.long_override:
self._cap_suspended_timer = max(0, self._cap_suspended_timer - 1)
self._override_active_last = self.long_override
self._cap_below_floor = self._has_speed_limit and self._speed_limit_final_last < self._min_cap_floor
# Gate checks FIRST: apply to all non-disabled states (including tempPaused)
if self.state != SpeedLimitAssistState.disabled:
if not self.long_enabled or not self.enabled:
self.state = SpeedLimitAssistState.disabled
if not self._gates_pass:
self._go_disabled(AssistDisableReason.gateDisabled)
self._was_cap_suspended = False
self._cap_suspended_timer = 0
elif self.long_override:
self._go_disabled(AssistDisableReason.longOverride)
self._was_cap_suspended = True
else:
# ACTIVE
if self.state == SpeedLimitAssistState.active:
if self.v_cruise_cluster_changed:
self.state = SpeedLimitAssistState.inactive
elif self.speed_limit_changed and self.apply_confirm_speed_threshold:
self.state = SpeedLimitAssistState.preActive
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
elif self._has_speed_limit and self.v_offset < LIMIT_SPEED_OFFSET_TH:
self.state = SpeedLimitAssistState.adapting
# Sub-state dispatch (only if gates passed)
elif self.state == SpeedLimitAssistState.tempPaused:
# Exit conditions: speed limit changed, timer expired, or cluster delta returns
if self._should_exit_temp_pause():
self._user_paused = False
self._user_paused_timer = 0
self._go_disabled(AssistDisableReason.autoResume)
# ADAPTING
elif self.state == SpeedLimitAssistState.adapting:
if self.v_cruise_cluster_changed:
self.state = SpeedLimitAssistState.inactive
elif self.speed_limit_changed and self.apply_confirm_speed_threshold:
self.state = SpeedLimitAssistState.preActive
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
elif self.v_offset >= LIMIT_SPEED_OFFSET_TH:
self.state = SpeedLimitAssistState.active
# PENDING
elif self.state == SpeedLimitAssistState.pending:
if self.target_set_speed_confirmed:
self._update_confirmed_state()
elif self.speed_limit_changed:
self.state = SpeedLimitAssistState.preActive
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
# PRE_ACTIVE
elif self.state == SpeedLimitAssistState.preActive:
if self.target_set_speed_confirmed:
self._update_confirmed_state()
elif self.pre_active_timer <= 0:
# Timeout - session ended
self.state = SpeedLimitAssistState.inactive
# INACTIVE
elif self.state == SpeedLimitAssistState.inactive:
pass
# DISABLED
elif self.state == SpeedLimitAssistState.disabled:
if self.long_enabled and self.enabled:
# start or reset preActive timer if initially enabled or manual set speed change detected
if not self.long_enabled_prev or self.v_cruise_cluster_changed:
self.long_engaged_timer = int(DISABLED_GUARD_PERIOD / DT_MDL)
elif self.long_engaged_timer <= 0:
if self.target_set_speed_confirmed:
self._update_confirmed_state()
elif self._has_speed_limit:
self.state = SpeedLimitAssistState.preActive
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
elif self.state == SpeedLimitAssistState.capping:
# Cluster delta entry: user nudged cruise control away from limit
if self.v_cruise_cluster_changed:
self._user_paused = True
self._user_paused_timer = USER_PAUSE_TIMEOUT_TICKS
self._speed_limit_final_last_at_pause = self._speed_limit_final_last
self.state = SpeedLimitAssistState.tempPaused
self._disable_reason = AssistDisableReason.userTempPause
elif not self._has_speed_limit:
self._cap_raise_accepted = False
self.state = SpeedLimitAssistState.pending
self._disable_reason = AssistDisableReason.mapGap
elif self._cap_below_floor:
self._cap_raise_accepted = False
self.state = SpeedLimitAssistState.pending
self._disable_reason = AssistDisableReason.belowFloor
elif self._speed_limit_final_last != self._target_cap and self._cap_limit_change_held():
old_cap = self._target_cap
self._target_cap = self._speed_limit_final_last
self._cap_change_timer = 0
self._cap_raise_accepted = False
if self._target_cap > old_cap:
if self._cap_upshift_accept == UpshiftAccept.NEVER_RAISE:
self._target_cap = old_cap
elif self._cap_upshift_accept == UpshiftAccept.ACCEL_PEDAL:
if not self._accel_pressed:
self._cap_raise_accepted = True
else:
self._target_cap = old_cap
else:
self._cap_upshift_release_edge()
else:
self.state = SpeedLimitAssistState.pending
self._cap_upshift_release_edge()
else:
self._cap_upshift_release_edge()
enabled = self.state in ENABLED_STATES
active = self.state in ACTIVE_STATES
elif self.state == SpeedLimitAssistState.pending:
if self._cap_entry_ready:
if not self._was_cap_suspended:
self._target_cap = self._speed_limit_final_last
self._cap_change_timer = 0
self._cap_audio_cue_fired = False
self._cap_raise_accepted = False
self._disable_reason = AssistDisableReason.none
self.state = SpeedLimitAssistState.capping
else:
# Disabled-entry logic: if gates pass + cap_suspended_timer clear, enter capping/pending
if self._cap_gates_pass:
if self._cap_entry_ready:
if not self._was_cap_suspended:
self._target_cap = self._speed_limit_final_last
self._cap_change_timer = 0
self._cap_audio_cue_fired = False
self._disable_reason = AssistDisableReason.none
self.state = SpeedLimitAssistState.capping
else:
self._disable_reason = AssistDisableReason.mapGap if not self._has_speed_limit else AssistDisableReason.belowFloor
self.state = SpeedLimitAssistState.pending
# Audio cue on capping entry
if self.state == SpeedLimitAssistState.capping and self._state_prev != SpeedLimitAssistState.capping:
# suppress audio cue on override-release re-entry
if self._cap_audio_cue_enabled and not self._was_cap_suspended:
events_sp.add(EventNameSP.speedLimitCapActive)
self._cap_audio_cue_fired = True
self._was_cap_suspended = False
enabled = self.state in CAP_ENABLED_STATES
active = self.state in CAP_ACTIVE_STATES
return enabled, active
@@ -360,13 +485,13 @@ class SpeedLimitAssist:
return enabled, active
def update_events(self, events_sp: EventsSP) -> None:
if self.state == SpeedLimitAssistState.preActive:
if not self.pcm_op_long and self.state == SpeedLimitAssistState.preActive:
events_sp.add(EventNameSP.speedLimitPreActive)
if self.state == SpeedLimitAssistState.pending and self._state_prev != SpeedLimitAssistState.pending:
events_sp.add(EventNameSP.speedLimitPending)
if self.is_active:
if not self.pcm_op_long and self.is_active:
if self._state_prev not in ACTIVE_STATES:
self.update_active_event(events_sp)
@@ -379,8 +504,9 @@ class SpeedLimitAssist:
self.update_active_event(events_sp)
def update(self, long_enabled: bool, long_override: bool, v_ego: float, a_ego: float, v_cruise_cluster: float, speed_limit: float,
speed_limit_final_last: float, has_speed_limit: bool, distance: float, events_sp: EventsSP) -> None:
speed_limit_final_last: float, has_speed_limit: bool, distance: float, events_sp: EventsSP, accel_pressed: bool = False) -> None:
self.long_enabled = long_enabled
self.long_override = long_override
self.v_ego = v_ego
self.a_ego = a_ego
@@ -388,13 +514,14 @@ class SpeedLimitAssist:
self._speed_limit = speed_limit
self._speed_limit_final_last = speed_limit_final_last
self._distance = distance
self._accel_pressed = accel_pressed
self.update_params()
self.update_calculations(v_cruise_cluster)
self._state_prev = self.state
if self.pcm_op_long:
self.is_enabled, self.is_active = self.update_state_machine_pcm_op_long()
self.is_enabled, self.is_active = self.update_state_machine_cap(events_sp)
else:
self.is_enabled, self.is_active = self.update_state_machine_non_pcm_long()
@@ -411,4 +538,9 @@ class SpeedLimitAssist:
self.output_v_target = self.get_v_target_from_control()
self.output_a_target = self.get_a_target_from_control()
if self.pcm_op_long and self.state == SpeedLimitAssistState.capping:
self.cap_delta = max(0.0, self.v_cruise_cluster - self._target_cap)
else:
self.cap_delta = 0.0
self.frame += 1

View File

@@ -0,0 +1,136 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from cereal import custom, car
from opendbc.car.car_helpers import interfaces
from opendbc.car.toyota.values import CAR as TOYOTA
from openpilot.common.constants import CV
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.car import interfaces as sunnypilot_interfaces
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import get_min_cap_floor
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
class CarParamsFactory:
@staticmethod
def create_car_interface(car_name: str = TOYOTA.TOYOTA_RAV4_TSS2) -> tuple[car.CarParams, custom.CarParamsSP, object]:
params = Params()
CarInterface = interfaces[car_name]
CP = CarInterface.get_non_essential_params(car_name)
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
CI = CarInterface(CP, CP_SP)
CI.CP.openpilotLongitudinalControl = True
sunnypilot_interfaces.setup_interfaces(CI, params)
return CI.CP, CI.CP_SP, CI
class SpeedLimitAssistScenario:
def __init__(self, CP: car.CarParams, CP_SP: custom.CarParamsSP, params: Params = None):
if params is None:
params = Params()
self.params = params
self.CP = CP
self.CP_SP = CP_SP
self.params.put("SpeedLimitMode", int(Mode.assist))
self.sla = SpeedLimitAssist(CP, CP_SP)
self.sla.update_params()
self.events_sp = EventsSP()
self.speed_conv = CV.MS_TO_KPH if self.sla.is_metric else CV.MS_TO_MPH
def set_state(self, state: int) -> "SpeedLimitAssistScenario":
self.sla.state = state
return self
def set_speed_limits(self, speed_limit: float, distance: float = 0, speed_limit_final_last: float = 0,
speed_limit_prev: float = 0) -> "SpeedLimitAssistScenario":
self.sla._speed_limit = speed_limit
self.sla._distance = distance
self.sla.speed_limit_prev = speed_limit_prev
return self
def set_cruise_speeds(self, v_cruise_cluster: float, v_cruise_cluster_prev: float = None) -> "SpeedLimitAssistScenario":
self.sla.v_cruise_cluster = v_cruise_cluster
if v_cruise_cluster_prev is None:
v_cruise_cluster_prev = v_cruise_cluster
self.sla.v_cruise_cluster_prev = v_cruise_cluster_prev
self.sla.prev_v_cruise_cluster_conv = round(v_cruise_cluster_prev * self.speed_conv)
return self
def set_engaged(self, op_engaged: bool) -> "SpeedLimitAssistScenario":
self.sla.op_engaged = op_engaged
return self
def set_param(self, key: str, value) -> "SpeedLimitAssistScenario":
# IntEnum instances carry a .value the Params API does not accept directly
if hasattr(value, 'value'):
value = value.value
if isinstance(value, bool):
self.params.put_bool(key, value)
elif isinstance(value, int):
self.params.put(key, value)
else:
self.params.put(key, str(value) if not isinstance(value, str) else value)
# Runtime caches these behind PARAMS_UPDATE_PERIOD; force-sync for tests
if key == "SpeedLimitMinCapFloor":
self.sla._min_cap_floor = get_min_cap_floor(self.sla.params, self.sla.is_metric)
elif key == "SpeedLimitUpshiftAccept":
self.sla._cap_upshift_accept = self.sla.params.get("SpeedLimitUpshiftAccept", return_default=True)
elif key == "SpeedLimitCapAudioCue":
self.sla._cap_audio_cue_enabled = bool(self.sla.params.get("SpeedLimitCapAudioCue", return_default=True))
elif key == "SpeedLimitMode":
self.sla.enabled = self.sla.params.get("SpeedLimitMode", return_default=True) == Mode.assist
elif key == "IsMetric":
self.sla.is_metric = self.sla.params.get_bool("IsMetric")
return self
def clear_events(self) -> "SpeedLimitAssistScenario":
self.events_sp.clear()
return self
def reset_state(self) -> "SpeedLimitAssistScenario":
self.sla.state = SpeedLimitAssistState.disabled
self.sla.frame = -1
self.sla.long_enabled = False
self.sla.long_enabled_prev = False
self.sla._speed_limit = 0.0
self.sla.speed_limit_prev = 0.0
self.sla._speed_limit_final_last = 0.0
self.sla._distance = 0.0
self.sla.long_engaged_timer = 0
self.sla.pre_active_timer = 0
self.events_sp.clear()
return self
def build(self) -> "SpeedLimitAssistScenario":
return self
@pytest.fixture
def params():
p = Params()
yield p
@pytest.fixture
def car_params_factory():
return CarParamsFactory()
@pytest.fixture
def scenario_builder(params):
def builder(car_name: str = TOYOTA.TOYOTA_RAV4_TSS2) -> SpeedLimitAssistScenario:
CP, CP_SP, _ = CarParamsFactory.create_car_interface(car_name)
return SpeedLimitAssistScenario(CP, CP_SP, params)
return builder

View File

@@ -0,0 +1,362 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from cereal import car, custom
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist, \
SpeedLimitAssistState
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
def make_mock_car_params(mocker, pcm_op_long: bool = True) -> car.CarParams:
"""Create a minimal CarParams mock with pcm_op_long setting."""
cp = mocker.MagicMock(spec=car.CarParams)
cp.openpilotLongitudinalControl = pcm_op_long
cp.pcmCruise = pcm_op_long
cp.brand = "generic"
return cp
def make_mock_car_params_sp(mocker) -> custom.CarParamsSP:
"""Create a minimal CarParamsSP mock."""
cp_sp = mocker.MagicMock(spec=custom.CarParamsSP)
return cp_sp
def get_event_name_sp():
"""Get EventNameSP enum."""
return custom.OnroadEventSP.EventName
def make_sla_factory(mocker, pcm_op_long: bool = True) -> SpeedLimitAssist:
"""Factory: create a SpeedLimitAssist instance with mocked params."""
cp = make_mock_car_params(mocker, pcm_op_long)
cp_sp = make_mock_car_params_sp(mocker)
mock_params = mocker.MagicMock()
mock_params.get_bool.return_value = True
mock_params.get.return_value = True
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist.Params",
return_value=mock_params)
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers.set_speed_limit_assist_availability")
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers.get_min_cap_floor", return_value=5.0)
sla = SpeedLimitAssist(cp, cp_sp)
return sla
class TestBug1OverrideNoOscillation:
"""Override falling edge arms suspend timer, preventing oscillation."""
def test_sustained_override_stays_disabled(self, mocker):
"""5s sustained override: state never enters capping."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.capping
sla._state_prev = SpeedLimitAssistState.capping
sla._target_cap = 25.0
events_sp = EventsSP()
num_ticks = int(5.0 / DT_MDL)
for _ in range(num_ticks):
sla.long_override = True
sla.update_state_machine_cap(events_sp)
assert (sla.state != SpeedLimitAssistState.capping), "State entered capping during sustained override"
assert sla.state == SpeedLimitAssistState.disabled
def test_override_release_enters_capping_after_guard(self, mocker):
"""Release override: state enters capping after guard period."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.disabled
sla._state_prev = SpeedLimitAssistState.disabled
sla._target_cap = 25.0
sla._was_cap_suspended = True
sla._override_active_last = True
sla.long_override = True
events_sp = EventsSP()
sla.update_state_machine_cap(events_sp)
sla.long_override = False
guard_ticks = int(1.0 / DT_MDL)
for i in range(guard_ticks + 1):
sla.update_state_machine_cap(events_sp)
if i < guard_ticks:
assert sla.state == SpeedLimitAssistState.disabled
assert sla.state == SpeedLimitAssistState.capping
class TestBug2TargetCapPreserved:
"""Target cap preserved across override-release cycles."""
def test_target_cap_preserved_on_override_suspension(self, mocker):
"""Override pulse: _target_cap preserved, not reset to new limit."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.capping
sla._state_prev = SpeedLimitAssistState.capping
sla._target_cap = 25.0
sla._cap_change_timer = 0
events_sp = EventsSP()
sla._speed_limit_final_last = 30.0
sla.long_override = True
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.disabled
assert sla._was_cap_suspended is True
sla.long_override = False
guard_ticks = int(1.0 / DT_MDL)
for _ in range(guard_ticks + 1):
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.capping
assert sla._target_cap == 25.0, f"Expected 25.0, got {sla._target_cap}"
class TestBug3AccelPressedWired:
"""Accel_pressed parameter wired to sla.update()."""
def test_accel_pressed_parameter_received(self, mocker):
"""sla.update() receives accel_pressed=True."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla.v_ego = 20.0
sla.a_ego = 0.0
sla.v_cruise_cluster = 30.0
events_sp = EventsSP()
sla.update(
long_enabled=True,
long_override=False,
v_ego=20.0,
a_ego=0.0,
v_cruise_cluster=30.0,
speed_limit=30.0,
speed_limit_final_last=30.0,
has_speed_limit=True,
distance=100.0,
events_sp=events_sp,
accel_pressed=True,
)
assert sla._accel_pressed is True
def test_accel_pressed_false_by_default(self, mocker):
"""sla.update() with accel_pressed=False (default)."""
sla = make_sla_factory(mocker, pcm_op_long=True)
events_sp = EventsSP()
sla.update(
long_enabled=True,
long_override=False,
v_ego=20.0,
a_ego=0.0,
v_cruise_cluster=30.0,
speed_limit=30.0,
speed_limit_final_last=30.0,
has_speed_limit=True,
distance=100.0,
events_sp=events_sp,
)
assert sla._accel_pressed is False
class TestBug4NoAudioCueOnOverrideReentry:
"""Audio cue suppressed on override-release re-entry."""
def test_cue_fires_on_cold_entry(self, mocker):
"""Fresh engagement: audio cue fires exactly once."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.disabled
sla._state_prev = SpeedLimitAssistState.disabled
sla._was_cap_suspended = False
sla._cap_audio_cue_enabled = True
events_sp = EventsSP()
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.capping
assert sla._cap_audio_cue_fired is True
event_name_sp = get_event_name_sp()
assert event_name_sp.speedLimitCapActive in events_sp.events, "Audio cue event not fired"
def test_no_cue_on_override_reentry(self, mocker):
"""Override-release re-entry: audio cue NOT fired."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.disabled
sla._state_prev = SpeedLimitAssistState.capping
sla._target_cap = 25.0
sla._was_cap_suspended = True
sla._override_active_last = False
sla._cap_audio_cue_enabled = True
sla._cap_audio_cue_fired = True
events_sp = EventsSP()
sla._cap_suspended_timer = int(1.0 / DT_MDL)
guard_ticks = int(1.0 / DT_MDL) + 1
for _ in range(guard_ticks):
sla.update_state_machine_cap(events_sp)
sla._state_prev = sla.state
assert sla.state == SpeedLimitAssistState.capping
event_name_sp = get_event_name_sp()
assert (event_name_sp.speedLimitCapActive not in events_sp.events), "Audio cue should not fire on override re-entry"
class TestEdgeACases:
"""Edge A: No spurious timer on non-capping override."""
def test_edge_a_no_spurious_timer_on_disabled_override(self, mocker):
"""Override during disabled state should not arm timer."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = False
sla._was_cap_suspended = False
sla.state = SpeedLimitAssistState.disabled
sla._state_prev = SpeedLimitAssistState.disabled
events_sp = EventsSP()
sla.long_override = True
sla.update_state_machine_cap(events_sp)
sla.long_override = False
sla.update_state_machine_cap(events_sp)
assert sla._cap_suspended_timer == 0
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.capping
class TestEdgeBCases:
"""Edge B: _target_cap preserved via pending."""
def test_edge_b_target_cap_preserved_via_pending(self, mocker):
"""Target cap preserved when transiting pending->capping after suspension."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla.state = SpeedLimitAssistState.capping
sla._state_prev = SpeedLimitAssistState.capping
sla._target_cap = 25.0
sla._was_cap_suspended = True
events_sp = EventsSP()
sla._has_speed_limit = False
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.pending
sla._has_speed_limit = True
sla._speed_limit_final_last = 30.0
sla.update_state_machine_cap(events_sp)
assert sla.state == SpeedLimitAssistState.capping
assert sla._target_cap == 25.0
class TestEdgeCCases:
"""Edge C: Timer cleared on disengage."""
def test_edge_c_timer_cleared_on_disengage(self, mocker):
"""Timer cleared when long_enabled=False."""
sla = make_sla_factory(mocker, pcm_op_long=True)
sla.long_enabled = True
sla.enabled = True
sla._has_speed_limit = True
sla._speed_limit_final_last = 25.0
sla._min_cap_floor = 5.0
sla._cap_suspended_timer = 50
sla.state = SpeedLimitAssistState.capping
sla._state_prev = SpeedLimitAssistState.capping
events_sp = EventsSP()
sla.long_enabled = False
sla.update_state_machine_cap(events_sp)
assert sla._cap_suspended_timer == 0
assert sla.state == SpeedLimitAssistState.disabled
class TestTargetCapPublished:
"""Target cap published in cereal message."""
def test_target_cap_written_to_assist(self, mocker):
"""_write_assist_fields writes sla._target_cap to assist.targetCap."""
from openpilot.sunnypilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlannerSP
sla = make_sla_factory(mocker, pcm_op_long=True)
sla._target_cap = 22.35
sla.output_v_target = 10.0
sla.output_a_target = 0.0
sla.cap_delta = 0.0
sla.is_enabled = True
sla.is_active = False
sla.state = SpeedLimitAssistState.disabled
planner = mocker.MagicMock()
planner.sla = sla
msg = custom.LongitudinalPlanSP.new_message()
assist = msg.speedLimit.assist
LongitudinalPlannerSP._write_assist_fields(planner, assist)
assert assist.targetCap == pytest.approx(22.35)
assert assist.enabled is True
assert assist.active is False
assert assist.vTarget == pytest.approx(10.0)
assert assist.aTarget == pytest.approx(0.0)
assert assist.capDelta == pytest.approx(0.0)

View File

@@ -7,7 +7,7 @@ See the LICENSE.md file in the root directory for more details.
import pytest
from cereal import custom
from cereal import custom, car
from opendbc.car.car_helpers import interfaces
from opendbc.car.rivian.values import CAR as RIVIAN
from opendbc.car.tesla.values import CAR as TESLA
@@ -25,6 +25,7 @@ from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
ButtonType = car.CarState.ButtonEvent.Type
ALL_STATES = tuple(SpeedLimitAssistState.schema.enumerants.values())
@@ -71,6 +72,7 @@ class TestSpeedLimitAssist:
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
CI = CarInterface(CP, CP_SP)
CI.CP.openpilotLongitudinalControl = True # always assume it's openpilot longitudinal
CI.CP.pcmCruise = False # test non-PCM FSM path (preActive, pending, adapting, active)
sunnypilot_interfaces.setup_interfaces(CI, self.params)
return CI
@@ -83,13 +85,17 @@ class TestSpeedLimitAssist:
def reset_state(self):
self.sla.state = SpeedLimitAssistState.disabled
self.sla._state_prev = SpeedLimitAssistState.disabled
self.sla.frame = -1
self.sla.last_op_engaged_frame = 0
self.sla.op_engaged = False
self.sla.op_engaged_prev = False
self.sla.long_enabled = False
self.sla.long_enabled_prev = False
self.sla.long_engaged_timer = 0
self.sla.pre_active_timer = 0
self.sla._speed_limit = 0.
self.sla._speed_limit_final_last = 0.
self.sla.speed_limit_prev = 0.
self.sla.last_valid_speed_limit_offsetted = 0.
self.sla.v_cruise_cluster = 0.
self.sla.v_cruise_cluster_prev = 0.
self.sla._distance = 0.
self.events_sp.clear()
@@ -133,20 +139,6 @@ class TestSpeedLimitAssist:
assert self.sla.state == SpeedLimitAssistState.preActive
assert self.sla.is_enabled and not self.sla.is_active
def test_transition_disabled_to_pending_no_speed_limit_not_max_initial_set_speed(self):
for _ in range(int(3. / DT_MDL)):
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, SPEED_LIMITS['city'], 0, 0, False, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.pending
assert self.sla.is_enabled and not self.sla.is_active
def test_preactive_to_active_with_max_speed_confirmation(self):
self.sla.state = SpeedLimitAssistState.preActive
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
SPEED_LIMITS['highway'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.active
assert self.sla.is_enabled and self.sla.is_active
assert self.sla.output_v_target == SPEED_LIMITS['highway']
def test_preactive_timeout_to_inactive(self):
self.sla.state = SpeedLimitAssistState.preActive
self.sla.update(True, False, SPEED_LIMITS['city'], 0, SPEED_LIMITS['highway'], SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
@@ -155,47 +147,6 @@ class TestSpeedLimitAssist:
self.sla.update(True, False, SPEED_LIMITS['city'], 0, SPEED_LIMITS['highway'], SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.inactive
def test_preactive_to_pending_no_speed_limit(self):
self.sla.state = SpeedLimitAssistState.preActive
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, self.pcm_long_max_set_speed, 0, 0, False, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.pending
assert self.sla.is_enabled and not self.sla.is_active
def test_pending_to_active_when_speed_limit_available(self):
self.sla.state = SpeedLimitAssistState.pending
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['highway'], SPEED_LIMITS['highway'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.active
def test_pending_to_adapting_when_below_speed_limit(self):
self.sla.state = SpeedLimitAssistState.pending
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
self.sla.update(True, False, SPEED_LIMITS['highway'] + 5, 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['highway'], SPEED_LIMITS['highway'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.adapting
assert self.sla.is_enabled and self.sla.is_active
def test_active_to_adapting_transition(self):
self.initialize_active_state(self.pcm_long_max_set_speed)
self.sla.update(True, False, SPEED_LIMITS['highway'] + 2, 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
SPEED_LIMITS['highway'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.adapting
def test_adapting_to_active_transition(self):
self.sla.state = SpeedLimitAssistState.adapting
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
SPEED_LIMITS['highway'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.active
def test_manual_cruise_change_detection(self):
self.sla.state = SpeedLimitAssistState.active
expected_cruise = SPEED_LIMITS['highway']
@@ -203,6 +154,7 @@ class TestSpeedLimitAssist:
different_cruise = SPEED_LIMITS['highway'] + 5
self.sla.update(True, False, SPEED_LIMITS['city'], 0, different_cruise, SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
# In non-pcm mode, manual cruise change transitions to inactive (not tempPaused)
assert self.sla.state == SpeedLimitAssistState.inactive
# TODO-SP: test lower CST cases
@@ -276,3 +228,236 @@ class TestSpeedLimitAssist:
assert self.sla.state in [SpeedLimitAssistState.preActive, SpeedLimitAssistState.active]
elif initial_state in ACTIVE_STATES:
assert self.sla.state in ACTIVE_STATES
def test_non_pcm_regression_method_exists(self):
"""Regression: update_state_machine_non_pcm_long() method exists unchanged."""
assert hasattr(self.sla, "update_state_machine_non_pcm_long")
# Verify it is callable
assert callable(self.sla.update_state_machine_non_pcm_long)
# Verify method is not affected by cap mode refactor (signature check)
import inspect
inspect.signature(self.sla.update_state_machine_non_pcm_long)
# Non-PCM method should have expected parameters (varies by impl, just verify it's present)
class TestSpeedLimitAssistTempPaused:
"""Tests for tempPaused state functionality (cap mode)."""
def setup_method(self, method):
self.params = Params()
self.reset_custom_params()
self.events_sp = EventsSP()
CI = self._setup_platform(DEFAULT_CAR)
self.sla = SpeedLimitAssist(CI.CP, CI.CP_SP)
self.sla.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.sla.pcm_op_long] / DT_MDL)
self.pcm_long_max_set_speed = PCM_LONG_REQUIRED_MAX_SET_SPEED[self.sla.is_metric][1] # use 80 MPH for now
self.speed_conv = CV.MS_TO_KPH if self.sla.is_metric else CV.MS_TO_MPH
# For temp paused tests, use pcm_op_long = True
self.sla.pcm_op_long = True
self.sla.enabled = True
def teardown_method(self, method):
self.reset_state()
def _setup_platform(self, car_name):
CarInterface = interfaces[car_name]
CP = CarInterface.get_non_essential_params(car_name)
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
CI = CarInterface(CP, CP_SP)
CI.CP.openpilotLongitudinalControl = True # always assume it's openpilot longitudinal
CI.CP.pcmCruise = False # test non-PCM FSM path (preActive, pending, adapting, active)
sunnypilot_interfaces.setup_interfaces(CI, self.params)
return CI
def reset_custom_params(self):
self.params.put("IsReleaseSpBranch", True)
self.params.put("SpeedLimitMode", int(Mode.assist))
self.params.put_bool("IsMetric", False)
self.params.put("SpeedLimitOffsetType", 0)
self.params.put("SpeedLimitValueOffset", 0)
def reset_state(self):
self.sla.state = SpeedLimitAssistState.disabled
self.sla._state_prev = SpeedLimitAssistState.disabled
self.sla.frame = -1
self.sla.long_enabled = False
self.sla.long_enabled_prev = False
self.sla.long_engaged_timer = 0
self.sla.pre_active_timer = 0
self.sla._speed_limit = 0.
self.sla._speed_limit_final_last = 0.
self.sla.speed_limit_prev = 0.
self.sla.v_cruise_cluster = 0.
self.sla.v_cruise_cluster_prev = 0.
self.sla._distance = 0.
self.events_sp.clear()
def test_temp_paused_entry_capping_state(self):
"""Test that tempPaused entry occurs during state machine when user paused."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla._user_paused = True
self.sla._user_paused_timer = 1000
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
def test_temp_paused_exit_on_speed_limit_change(self):
"""Test exiting tempPaused when speed limit changes."""
self.sla.state = SpeedLimitAssistState.tempPaused
self.sla._user_paused = True
self.sla._user_paused_timer = 1000
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla.long_enabled = True
self.sla.enabled = True
# Change speed limit
new_limit = SPEED_LIMITS['highway']
self.sla.update(True, False, new_limit, 0, self.pcm_long_max_set_speed,
new_limit, new_limit, True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.disabled
assert self.sla._user_paused == False
def test_temp_paused_exit_on_timer_expiry(self):
"""Test exiting tempPaused when 5-minute timer expires."""
self.sla.state = SpeedLimitAssistState.tempPaused
self.sla._user_paused = True
self.sla._user_paused_timer = 1
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
# Trigger update with timer expiry
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.disabled
assert self.sla._user_paused_timer <= 0
def test_disable_reason_user_cancel(self):
"""Test disable_reason set to userCancel on pause."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._user_paused = True
self.sla._user_paused_timer = 1000
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
def test_disable_reason_long_override(self):
"""Test disable_reason set to longOverride."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._target_cap = SPEED_LIMITS['city']
self.sla.update(True, True, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.disabled
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.longOverride
def test_disable_reason_below_floor(self):
"""Test disable_reason set to belowFloor."""
min_floor = self.sla._min_cap_floor
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._speed_limit_final_last = min_floor - 1
self.sla.long_enabled = True
self.sla.enabled = True
self.sla.v_cruise_cluster = self.pcm_long_max_set_speed
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
self.sla.update(True, False, min_floor - 1, 0, self.pcm_long_max_set_speed,
min_floor - 1, min_floor - 1, True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.pending
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.belowFloor
def test_temp_paused_entry_cap_cluster_nudge_plus(self):
"""Test tempPaused entry in capping state when cluster nudged above limit."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla._target_cap = SPEED_LIMITS['city']
self.sla.long_enabled = True
self.sla.enabled = True
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
# Nudge cluster up (simulate user pressing accel)
nudged_cruise = SPEED_LIMITS['city'] + 1.0
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
assert self.sla._user_paused == True
assert self.sla._user_paused_timer > 0
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
def test_temp_paused_entry_cap_cluster_nudge_minus(self):
"""Test tempPaused entry in capping state when cluster nudged below limit."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla._target_cap = SPEED_LIMITS['city']
self.sla.long_enabled = True
self.sla.enabled = True
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
# Nudge cluster down (simulate user pressing brake/decel)
nudged_cruise = SPEED_LIMITS['city'] - 1.0
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
assert self.sla._user_paused == True
assert self.sla._user_paused_timer > 0
def test_temp_paused_exit_cluster_returns_to_limit(self):
"""Test exiting tempPaused when cluster returns within ±1 of limit."""
self.sla.state = SpeedLimitAssistState.tempPaused
self.sla._user_paused = True
self.sla._user_paused_timer = 1000
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla.long_enabled = True
self.sla.enabled = True
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city'] + 5
self.sla.prev_v_cruise_cluster_conv = round((SPEED_LIMITS['city'] + 5) * self.speed_conv)
# Return cluster to within ±1 of limit
returned_cruise = SPEED_LIMITS['city'] + 0.5
self.sla.update(True, False, SPEED_LIMITS['city'], 0, returned_cruise,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.disabled
assert self.sla._user_paused == False
def test_temp_paused_sticky_double_nudge(self):
"""Test that multiple nudges keep state in tempPaused."""
self.sla.state = SpeedLimitAssistState.capping
self.sla._has_speed_limit = True
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
self.sla._target_cap = SPEED_LIMITS['city']
self.sla.long_enabled = True
self.sla.enabled = True
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
# First nudge
nudged_cruise_1 = SPEED_LIMITS['city'] + 2.0
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise_1,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
assert self.sla._user_paused_timer > 0
saved_timer_1 = self.sla._user_paused_timer
# Second nudge while in tempPaused (shouldn't trigger another entry)
nudged_cruise_2 = SPEED_LIMITS['city'] + 3.0
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise_2,
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
assert self.sla.state == SpeedLimitAssistState.tempPaused
# Timer should have been decremented by one update call
assert self.sla._user_paused_timer == saved_timer_1 - 1

View File

@@ -0,0 +1,585 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
Test suite for Speed Limit Assist cap mode (pcm_op_long only).
Covers 20 edge cases for FSM, debounce, upshift, pedal release, and audio cue.
"""
import pytest
from cereal import custom
from opendbc.car.toyota.values import CAR as TOYOTA
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import UpshiftAccept
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist, \
CAP_ACTIVE_STATES
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
class TestSpeedLimitCapMode:
def test_cap_mode_applies_to_pcm_op_long(self, scenario_builder):
"""Cap mode applies to pcm_op_long cars (pcmCruise=True + openpilotLongitudinalControl=True)."""
scenario = scenario_builder(TOYOTA.TOYOTA_RAV4_TSS2)
scenario.set_state(SpeedLimitAssistState.disabled)
assert scenario.sla.pcm_op_long is True
assert scenario.sla.state == SpeedLimitAssistState.disabled
def test_disabled_to_capping_transition(self, scenario_builder):
"""FSM: disabled -> capping when speed limit available and engaged."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.disabled)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Update to enter capping
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
assert scenario.sla.is_active
def test_capping_to_disabled_on_disengagement(self, scenario_builder):
"""FSM: capping -> disabled when user disengages (manual override)."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Simulate manual override (long_override=True)
scenario.sla.update(
True, True, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
assert scenario.sla.state == SpeedLimitAssistState.disabled
assert scenario.sla.output_v_target == V_CRUISE_UNSET
def test_below_floor_pause_transition(self, scenario_builder):
"""FSM: capping exits to pending (no cap emitted) when posted limit is below min cap floor."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitMinCapFloor", 25) # 25 mph floor
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
# Posted limit 20 mph (below 25 mph floor) -> cap must pause
low_limit_ms = 20 * CV.MPH_TO_MS
scenario.set_speed_limits(low_limit_ms, 0)
scenario.set_cruise_speeds(30 * CV.MPH_TO_MS)
for _ in range(int(0.5 / DT_MDL)):
scenario.sla.update(
True, False, 30 * CV.MPH_TO_MS, 0,
30 * CV.MPH_TO_MS, low_limit_ms, low_limit_ms,
True, 0, scenario.events_sp
)
# Below-floor condition pauses cap: state is not capping, v_target is unset (no cap)
assert scenario.sla.state != SpeedLimitAssistState.capping
assert scenario.sla.output_v_target == V_CRUISE_UNSET
def test_resume_from_pause_above_floor(self, scenario_builder):
"""FSM: pending (below-floor pause) -> capping when vehicle speed rises above min cap floor."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitMinCapFloor", 25) # 25 mph
scenario.set_state(SpeedLimitAssistState.pending)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Update with speed above floor
scenario.sla.update(
True, False, 30 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Should resume capping if conditions allow
assert scenario.sla.state in CAP_ACTIVE_STATES or scenario.sla.state == SpeedLimitAssistState.pending
def test_change_debounce_hold_new_limit(self, scenario_builder):
"""FSM: New speed limit held for 1s before accepting (debounce)."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Start in capping with initial cap at 25 mph
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
scenario.sla._has_speed_limit = True
# First: establish baseline with 25 mph and pressed pedal (to establish state for edge detection)
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=True
)
# Then: change limit to 35 mph, press pedal briefly, then release and wait for debounce to expire
# Press pedal for first 0.5s (speed_limit=25 but speed_limit_final_last=35 simulates detection)
for _ in range(int(0.5 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=True
)
# Release pedal and wait for cap debounce + accel debounce to complete (0.7s more = 1.2s total)
for _ in range(int(0.7 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=False
)
# Cap should now be 35 mph after debounce and pedal release edge
assert abs(scenario.sla._target_cap - 35 * CV.MPH_TO_MS) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_upshift_never_raise_keeps_old_cap(self, scenario_builder):
"""FSM: NEVER_RAISE mode keeps cap unchanged when limit increases."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.NEVER_RAISE)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Start with cap at 25 mph
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Increase limit to 35 mph, wait for debounce to expire
for _ in range(int(1.2 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# In NEVER_RAISE mode, cap stays at 25 mph (old value)
assert abs(scenario.sla._target_cap - 25 * CV.MPH_TO_MS) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_upshift_accel_pedal_requires_release(self, scenario_builder):
"""FSM: ACCEL_PEDAL mode accepts new cap only on pedal release."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Start with cap at 25 mph
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Pedal pressed with new limit 35 mph, wait 1.1s (exceeds cap debounce) but pedal still pressed
for _ in range(int(1.1 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=True
)
# Cap should still be 25 mph (pedal pressed, upshift rejected despite debounce)
assert abs(scenario.sla._target_cap - 25 * CV.MPH_TO_MS) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_pedal_release_debounce_200ms(self, scenario_builder):
"""FSM: Accel pedal release edge requires 200ms debounce."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# First establish the limit change and wait for cap change debounce to start
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=True
)
# Pedal pressed, wait 0.6s, then release and wait 0.5s (total 1.1s > 1.0s cap debounce + 0.2s accel debounce)
for _ in range(int(0.6 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=True
)
# Release and wait for both debounces to complete
for _ in range(int(0.5 / DT_MDL)):
scenario.sla.update(
True, False, 35 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
True, 0, scenario.events_sp, accel_pressed=False
)
# Now cap should be updated to 35 mph (cap debounce complete + accel release edge detected and debounced)
assert abs(scenario.sla._target_cap - 35 * CV.MPH_TO_MS) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_suspended_timer_after_long_override(self, scenario_builder):
"""FSM: 1s suspension window after manual override release."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
scenario.sla.update(
True, True, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
assert scenario.sla.state == SpeedLimitAssistState.disabled
assert scenario.sla._cap_suspended_timer > 0 # frame counter
# Attempt to re-engage within suspension window (0.5s)
for _ in range(int(0.5 / DT_MDL)):
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Should still be disabled (suspension active)
assert scenario.sla.state == SpeedLimitAssistState.disabled
# Wait for suspension to expire (1.1s total)
for _ in range(int(0.7 / DT_MDL)):
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Now cap can re-engage and transition to capping
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_audio_cue_fires_once_on_capping_entry(self, scenario_builder):
"""Event: speedLimitCapActive fires once on entry to capping state."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitCapAudioCue", True)
scenario.set_state(SpeedLimitAssistState.disabled)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Enter capping state
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Verify we transitioned to capping
assert scenario.sla.state == SpeedLimitAssistState.capping
# Audio cue flag should be set
assert scenario.sla._cap_audio_cue_fired is True
def test_audio_cue_disabled_no_fire(self, scenario_builder):
"""Event: speedLimitCapActive suppressed when audio cue disabled."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitCapAudioCue", False)
scenario.set_state(SpeedLimitAssistState.disabled)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
# Enter capping state
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Verify we transitioned to capping
assert scenario.sla.state == SpeedLimitAssistState.capping
# Audio cue flag should NOT be set (disabled)
assert scenario.sla._cap_audio_cue_fired is False
def test_cap_delta_ui_feedback(self, scenario_builder):
"""UI: target_cap tracks posted limit, delta = v_cruise_cluster - target_cap (positive when cap below driver intent)."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
limit_ms = 25 * CV.MPH_TO_MS
cruise_ms = 35 * CV.MPH_TO_MS
scenario.set_speed_limits(limit_ms, 0)
scenario.set_cruise_speeds(cruise_ms)
scenario.sla._target_cap = limit_ms
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
cruise_ms, limit_ms, limit_ms,
True, 0, scenario.events_sp
)
assert abs(scenario.sla.output_v_target - limit_ms) < 0.1
assert abs(scenario.sla._target_cap - limit_ms) < 0.1
cap_delta = max(0., cruise_ms - scenario.sla._target_cap)
assert cap_delta > 0
assert abs(cap_delta - 10 * CV.MPH_TO_MS) < 0.1
assert abs(scenario.sla.cap_delta - cap_delta) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_min_cap_floor_zero_disables_pause(self, scenario_builder):
"""FSM: min cap floor 0 disables pause-on-low-speed behavior."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitMinCapFloor", 0)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(5 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Very low speed (5 mph), but floor is 0
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
5 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Should continue capping (floor 0 means no pause even at low speed)
assert scenario.sla.state == SpeedLimitAssistState.capping
assert scenario.sla._cap_below_floor is False
def test_min_cap_floor_max_value_40_mph(self, scenario_builder):
"""FSM: min cap floor clamped to 40 mph (reasonable max)."""
scenario = scenario_builder()
scenario.set_param("IsMetric", False)
scenario.set_param("SpeedLimitMinCapFloor", 40)
scenario.sla.is_metric = False
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(30 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Speed 25 mph (limit) is below floor 40 mph, should pause
for _ in range(int(0.5 / DT_MDL)):
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
30 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# Should transition to pending (below floor)
assert scenario.sla.state == SpeedLimitAssistState.pending
assert scenario.sla._cap_below_floor is True
def test_v_target_clamped_to_cruise_when_capping(self, scenario_builder):
"""Output: v_target = min(v_cruise, cap) when actively capping."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
# v_target should be min(20, 25) = 20 m/s (driver cruise is limiting factor)
assert abs(scenario.sla.output_v_target - 20 * CV.MPH_TO_MS) < 0.1
assert scenario.sla.state == SpeedLimitAssistState.capping
# Missing edge cases from matrix (Issues #1, #2, #5, #7, #16, #18, #19)
def test_school_zone_false_positive_25_mph_floor(self, scenario_builder):
"""Edge case #1: OSM school zone 25 mph at inactive hours blocked by floor."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitMinCapFloor", 25)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(11.18, 0) # 25 mph -> m/s
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 11.18
# Speed limit 25 mph = 11.18 m/s is at floor (not strictly above)
scenario.sla.update(
True, False, 11.18, 0,
20 * CV.MPH_TO_MS, 11.18, 11.18,
True, 0, scenario.events_sp
)
# Should not pause at exactly floor (must be strictly above)
assert scenario.sla.state == SpeedLimitAssistState.capping
def test_construction_stale_25_mph_pauses(self, scenario_builder):
"""Edge case #2: Construction temp speed limit 25 mph pauses on stale data."""
scenario = scenario_builder()
scenario.set_param("SpeedLimitMinCapFloor", 25)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
# Stale construction limit (25 mph, below floor by resolver check)
scenario.set_speed_limits(11.18, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 11.18
# Simulate old data with has_speed_limit=False (resolver cleared it)
scenario.sla.update(
True, False, 11.18, 0,
20 * CV.MPH_TO_MS, 11.18, 11.18,
False, 0, scenario.events_sp
)
# Should transition to pending (lost limit)
assert scenario.sla.state == SpeedLimitAssistState.pending
@pytest.mark.parametrize("is_metric", [True, False])
def test_kmh_mph_unit_conversion_border(self, scenario_builder, is_metric):
"""Edge case #5: km/h ↔ mph border crossing unit conversion."""
scenario = scenario_builder()
scenario.params.put_bool("IsMetric", is_metric)
# Reload SLA to pick up metric setting
from opendbc.car.toyota.values import CAR as TOYOTA
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.tests.conftest import CarParamsFactory
CP, CP_SP, _ = CarParamsFactory.create_car_interface(TOYOTA.TOYOTA_RAV4_TSS2)
scenario.sla = SpeedLimitAssist(CP, CP_SP)
scenario.sla.params.put_bool("IsMetric", is_metric)
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
if is_metric:
# 65 km/h ≈ 40.4 mph
scenario.set_speed_limits(65 * CV.KPH_TO_MS, 0)
scenario.set_cruise_speeds(60 * CV.KPH_TO_MS)
scenario.sla._target_cap = 65 * CV.KPH_TO_MS
else:
# 40 mph ≈ 64.4 km/h
scenario.set_speed_limits(40 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(35 * CV.MPH_TO_MS)
scenario.sla._target_cap = 40 * CV.MPH_TO_MS
scenario.sla.update(
True, False, scenario.sla._target_cap, 0,
scenario.sla.v_cruise_cluster, scenario.sla._speed_limit,
scenario.sla._speed_limit, True, 0, scenario.events_sp
)
# Should remain in capping with correct unit-aware cap
assert scenario.sla.state == SpeedLimitAssistState.capping
assert scenario.sla._has_speed_limit is True
def test_gps_tunnel_fade_10s_age_limit(self, scenario_builder):
"""Edge case #7: GPS tunnel fade stale limit via LIMIT_MAX_MAP_DATA_AGE."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Simulate has_speed_limit=False (resolver expired data after 10s)
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
False, 0, scenario.events_sp
)
# Should transition to pending (lost limit)
assert scenario.sla.state == SpeedLimitAssistState.pending
assert scenario.sla.output_v_target == V_CRUISE_UNSET
def test_lost_speed_limit_mid_cap_to_pending(self, scenario_builder):
"""Edge case #16: Lost speed limit mid-cap transitions to pending."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# Simulate limit suddenly unavailable
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
False, 0, scenario.events_sp
)
# Should transition to pending, emit V_CRUISE_UNSET
assert scenario.sla.state == SpeedLimitAssistState.pending
assert scenario.sla.output_v_target == V_CRUISE_UNSET
def test_mode_param_off_disables_sla_on_pcm_op_long(self, scenario_builder):
"""Edge case #18: SpeedLimitMode controls engagement on pcm_op_long cars."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.disabled)
scenario.set_engaged(True)
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.set_param("SpeedLimitMode", 0)
scenario.sla.update(
True, False, 25 * CV.MPH_TO_MS, 0,
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
True, 0, scenario.events_sp
)
assert scenario.sla.state == SpeedLimitAssistState.disabled
def test_speed_limit_final_zero_with_has_limit_edge(self, scenario_builder):
"""Edge case #19: has_speed_limit=true but speed_limit_final=0 -> pending."""
scenario = scenario_builder()
scenario.set_state(SpeedLimitAssistState.capping)
scenario.set_engaged(True)
scenario.set_speed_limits(0, 0) # Zero speed limit (bad data)
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
# has_speed_limit=True but speed_limit_final_last=0
scenario.sla.update(
True, False, 0, 0,
20 * CV.MPH_TO_MS, 0, 0,
True, 0, scenario.events_sp
)
# Should transition to pending (bad/zero limit treated as no limit)
assert scenario.sla.state == SpeedLimitAssistState.pending
def test_non_pcm_path_unchanged(self, scenario_builder):
"""Regression: Non-PCM update_state_machine_non_pcm_long() unchanged."""
scenario = scenario_builder()
# Verify method exists and has correct signature
assert hasattr(scenario.sla, "update_state_machine_non_pcm_long")
# Method signature check (internal implementation detail)

View File

@@ -216,6 +216,14 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
Priority.LOW, VisualAlert.none, AudibleAlertSP.promptSingleHigh, 5.),
},
EventNameSP.speedLimitCapActive: {
ET.WARNING: Alert(
"Speed Limit Capping",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlertSP.promptSingleHigh, 5.),
},
EventNameSP.speedLimitChanged: {
ET.WARNING: Alert(
"Set speed changed",

View File

@@ -1227,6 +1227,41 @@
"max": 30,
"step": 1
},
"SpeedLimitUpshiftAccept": {
"title": "Speed Limit Cap Upshift",
"description": "Mode for accepting new speed limit changes in cap mode.",
"options": [
{
"value": 0,
"label": "Never Raise"
},
{
"value": 1,
"label": "Accel Pedal Confirm"
}
]
},
"SpeedLimitMinCapFloor": {
"title": "Speed Limit Cap Floor",
"description": "Minimum speed below which speed limit capping is paused.",
"min": 0,
"max": 40,
"step": 1
},
"SpeedLimitCapAudioCue": {
"title": "Speed Limit Cap Audio Cue",
"description": "Enable audio cue when entering speed limit capping mode.",
"options": [
{
"value": 0,
"label": "Off"
},
{
"value": 1,
"label": "On"
}
]
},
"SshEnabled": {
"title": "Enable SSH",
"description": ""