mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-10 06:35:03 +08:00
Compare commits
5 Commits
accel-cont
...
sla-3.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e698e8aba | ||
|
|
e6f6e32ccf | ||
|
|
6b9a9b9075 | ||
|
|
f6b02693a1 | ||
|
|
e0a7c57fab |
@@ -267,6 +267,9 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
|
||||
active @2 :Bool;
|
||||
vTarget @3 :Float32;
|
||||
aTarget @4 :Float32;
|
||||
capDelta @5 :Float32; # Difference between cluster set-speed and cap (m/s), positive = driver above cap
|
||||
targetCap @6 :Float32; # Speed limit cap being enforced (m/s)
|
||||
disableReason @7 :AssistDisableReason;
|
||||
}
|
||||
|
||||
enum Source {
|
||||
@@ -282,6 +285,19 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
|
||||
pending @3; # Awaiting new speed limit.
|
||||
adapting @4; # Reducing speed to match new speed limit.
|
||||
active @5; # Cruising at speed limit.
|
||||
capping @6; # Silently capping speed based on limit.
|
||||
tempPaused @7; # Temporarily paused by user.
|
||||
}
|
||||
|
||||
enum AssistDisableReason {
|
||||
none @0;
|
||||
userCancel @1;
|
||||
userTempPause @2;
|
||||
longOverride @3;
|
||||
belowFloor @4;
|
||||
autoResume @5;
|
||||
mapGap @6;
|
||||
gateDisabled @7;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,6 +358,7 @@ struct OnroadEventSP @0xda96579883444c35 {
|
||||
speedLimitChanged @21;
|
||||
speedLimitPending @22;
|
||||
e2eChime @23;
|
||||
speedLimitCapActive @24;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -261,6 +261,9 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
|
||||
{"SpeedLimitOffsetType", {PERSISTENT | BACKUP, INT, "0"}},
|
||||
{"SpeedLimitPolicy", {PERSISTENT | BACKUP, INT, "3"}},
|
||||
{"SpeedLimitValueOffset", {PERSISTENT | BACKUP, INT, "0"}},
|
||||
{"SpeedLimitUpshiftAccept", {PERSISTENT | BACKUP, INT, "0"}},
|
||||
{"SpeedLimitMinCapFloor", {PERSISTENT | BACKUP, INT, "25"}},
|
||||
{"SpeedLimitCapAudioCue", {PERSISTENT | BACKUP, INT, "1"}},
|
||||
|
||||
// Smart Cruise Control
|
||||
{"MapTargetVelocities", {CLEAR_ON_ONROAD_TRANSITION, STRING}},
|
||||
|
||||
@@ -21,6 +21,8 @@ from openpilot.system.ui.widgets.scroller_tici import Scroller
|
||||
|
||||
SPEED_LIMIT_MODE_BUTTONS = [tr("Off"), tr("Info"), tr("Warning"), tr("Assist")]
|
||||
SPEED_LIMIT_OFFSET_TYPE_BUTTONS = [tr("None"), tr("Fixed"), tr("%")]
|
||||
SPEED_LIMIT_UPSHIFT_ACCEPT_BUTTONS = [tr("Never Raise"), tr("Accel Pedal Confirm")]
|
||||
SPEED_LIMIT_CAP_AUDIO_CUE_BUTTONS = [tr("Off"), tr("On")]
|
||||
|
||||
SPEED_LIMIT_MODE_DESCRIPTIONS = [
|
||||
tr("Off: Disables the Speed Limit functions."),
|
||||
@@ -35,6 +37,16 @@ SPEED_LIMIT_OFFSET_DESCRIPTIONS = [
|
||||
tr("Percent: Adds a percent offset [Speed Limit + (Offset % Speed Limit)]"),
|
||||
]
|
||||
|
||||
SPEED_LIMIT_UPSHIFT_ACCEPT_DESCRIPTIONS = [
|
||||
tr("Never Raise: Keeps the current cap when the speed limit changes."),
|
||||
tr("Accel Pedal Confirm: Accepts new speed limit cap when you release the accelerator pedal."),
|
||||
]
|
||||
|
||||
SPEED_LIMIT_CAP_AUDIO_CUE_DESCRIPTIONS = [
|
||||
tr("Off: No audio cue when entering speed limit capping mode."),
|
||||
tr("On: Plays a low chime when entering speed limit capping mode."),
|
||||
]
|
||||
|
||||
|
||||
class PanelType(IntEnum):
|
||||
SETTINGS = 0
|
||||
@@ -86,13 +98,42 @@ class SpeedLimitSettingsLayout(Widget):
|
||||
label_callback=self._get_offset_label,
|
||||
)
|
||||
|
||||
self._speed_limit_upshift_accept = multiple_button_item_sp(
|
||||
title=lambda: tr("Speed Limit Cap Upshift"),
|
||||
description=self._get_upshift_accept_description,
|
||||
buttons=SPEED_LIMIT_UPSHIFT_ACCEPT_BUTTONS,
|
||||
param="SpeedLimitUpshiftAccept",
|
||||
button_width=500,
|
||||
)
|
||||
|
||||
self._speed_limit_min_cap_floor = option_item_sp(
|
||||
title=lambda: tr("Speed Limit Cap Floor"),
|
||||
param="SpeedLimitMinCapFloor",
|
||||
min_value=0,
|
||||
max_value=40,
|
||||
description=self._get_min_cap_floor_description,
|
||||
label_callback=self._get_min_cap_floor_label,
|
||||
)
|
||||
|
||||
self._speed_limit_cap_audio_cue = multiple_button_item_sp(
|
||||
title=lambda: tr("Speed Limit Cap Audio Cue"),
|
||||
description=self._get_cap_audio_cue_description,
|
||||
buttons=SPEED_LIMIT_CAP_AUDIO_CUE_BUTTONS,
|
||||
param="SpeedLimitCapAudioCue",
|
||||
button_width=450,
|
||||
)
|
||||
|
||||
items = [
|
||||
self._speed_limit_mode,
|
||||
LineSeparatorSP(40),
|
||||
self._source_button,
|
||||
LineSeparatorSP(40),
|
||||
self._speed_limit_offset_type,
|
||||
self._speed_limit_value_offset
|
||||
self._speed_limit_value_offset,
|
||||
LineSeparatorSP(40),
|
||||
self._speed_limit_upshift_accept,
|
||||
self._speed_limit_min_cap_floor,
|
||||
self._speed_limit_cap_audio_cue,
|
||||
]
|
||||
return items
|
||||
|
||||
@@ -120,6 +161,23 @@ class SpeedLimitSettingsLayout(Widget):
|
||||
return f"{value} {unit}"
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def _get_upshift_accept_description():
|
||||
return get_highlighted_description(ui_state.params, "SpeedLimitUpshiftAccept", SPEED_LIMIT_UPSHIFT_ACCEPT_DESCRIPTIONS)
|
||||
|
||||
@staticmethod
|
||||
def _get_min_cap_floor_description():
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def _get_min_cap_floor_label(value):
|
||||
unit = tr("km/h") if ui_state.is_metric else tr("mph")
|
||||
return f"{value} {unit}"
|
||||
|
||||
@staticmethod
|
||||
def _get_cap_audio_cue_description():
|
||||
return get_highlighted_description(ui_state.params, "SpeedLimitCapAudioCue", SPEED_LIMIT_CAP_AUDIO_CUE_DESCRIPTIONS)
|
||||
|
||||
def _update_state(self):
|
||||
super()._update_state()
|
||||
|
||||
@@ -128,6 +186,7 @@ class SpeedLimitSettingsLayout(Widget):
|
||||
brand = ui_state.CP.brand
|
||||
has_long = ui_state.has_longitudinal_control
|
||||
has_icbm = ui_state.has_icbm
|
||||
pcm_op_long = has_long and ui_state.CP.pcmCruise
|
||||
|
||||
"""
|
||||
Speed Limit Assist is available when:
|
||||
@@ -144,6 +203,7 @@ class SpeedLimitSettingsLayout(Widget):
|
||||
|
||||
else:
|
||||
sla_available = False
|
||||
pcm_op_long = False
|
||||
|
||||
if not sla_available:
|
||||
self._speed_limit_mode.action_item.set_enabled_buttons({
|
||||
@@ -157,6 +217,10 @@ class SpeedLimitSettingsLayout(Widget):
|
||||
offset_type = ui_state.params.get("SpeedLimitOffsetType", return_default=True)
|
||||
self._speed_limit_value_offset.set_visible(offset_type != int(SpeedLimitOffsetType.off))
|
||||
|
||||
self._speed_limit_upshift_accept.set_visible(pcm_op_long)
|
||||
self._speed_limit_min_cap_floor.set_visible(pcm_op_long)
|
||||
self._speed_limit_cap_audio_cue.set_visible(pcm_op_long)
|
||||
|
||||
def _render(self, rect):
|
||||
if self._current_panel == PanelType.POLICY:
|
||||
self._policy_layout.render(rect)
|
||||
|
||||
@@ -6,6 +6,7 @@ See the LICENSE.md file in the root directory for more details.
|
||||
"""
|
||||
import pyray as rl
|
||||
|
||||
from cereal import custom
|
||||
from openpilot.common.constants import CV
|
||||
from openpilot.selfdrive.ui.mici.onroad.torque_bar import TorqueBar
|
||||
from openpilot.selfdrive.ui.sunnypilot.onroad.developer_ui import DeveloperUiRenderer, DeveloperUiState, get_bottom_dev_ui_offset
|
||||
@@ -23,6 +24,7 @@ from openpilot.system.ui.lib.multilang import tr
|
||||
from openpilot.system.ui.lib.text_measure import measure_text_cached
|
||||
|
||||
SLA_ACTIVE_COLOR = rl.Color(0x91, 0x9b, 0x95, 0xff)
|
||||
AssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
|
||||
|
||||
class HudRendererSP(HudRenderer):
|
||||
@@ -89,9 +91,14 @@ class HudRendererSP(HudRenderer):
|
||||
set_speed_color = COLORS.DARK_GREY
|
||||
if self.is_cruise_set:
|
||||
set_speed_color = COLORS.WHITE
|
||||
if long_plan_sp.speedLimit.assist.active:
|
||||
assist_state = long_plan_sp.speedLimit.assist.state
|
||||
# Green for active/adapting/capping states, grey for tempPaused when override, else normal
|
||||
if assist_state in (AssistState.active, AssistState.adapting, AssistState.capping):
|
||||
set_speed_color = SLA_ACTIVE_COLOR if long_override else rl.Color(0, 0xff, 0, 0xff)
|
||||
max_color = SLA_ACTIVE_COLOR if long_override else rl.Color(0x80, 0xd8, 0xa6, 0xff)
|
||||
elif assist_state == AssistState.tempPaused and long_override:
|
||||
set_speed_color = SLA_ACTIVE_COLOR
|
||||
max_color = SLA_ACTIVE_COLOR
|
||||
else:
|
||||
if ui_state.status == UIStatus.ENGAGED:
|
||||
max_color = COLORS.ENGAGED
|
||||
|
||||
@@ -7,6 +7,7 @@ See the LICENSE.md file in the root directory for more details.
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
import time
|
||||
import pyray as rl
|
||||
|
||||
from cereal import custom
|
||||
@@ -28,6 +29,7 @@ SET_SPEED_NA = 255
|
||||
KM_TO_MILE = 0.621371
|
||||
|
||||
AssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
AssistDisableReason = custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason
|
||||
SpeedLimitSource = custom.LongitudinalPlanSP.SpeedLimit.Source
|
||||
|
||||
|
||||
@@ -40,6 +42,7 @@ class Colors:
|
||||
DARK_GREY = rl.Color(77, 77, 77, 255)
|
||||
SUB_BG = rl.Color(0, 0, 0, 180)
|
||||
MUTCD_LINES = rl.Color(255, 255, 255, 100)
|
||||
AMBER = rl.Color(255, 176, 0, 255)
|
||||
|
||||
|
||||
class IconSide(StrEnum):
|
||||
@@ -110,6 +113,11 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
|
||||
self.speed_limit_ahead_valid = False
|
||||
self.speed_limit_ahead_frame = 0
|
||||
|
||||
self.cap_delta = 0.0
|
||||
self.target_cap = 0.0
|
||||
self.disable_reason = AssistDisableReason.none
|
||||
self.disable_reason_timestamp = 0.0
|
||||
|
||||
self.is_cruise_set: bool = False
|
||||
self.is_cruise_available: bool = True
|
||||
self.set_speed: float = SET_SPEED_NA
|
||||
@@ -145,6 +153,10 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
|
||||
self.speed_limit_final_last = resolver.speedLimitFinalLast * self.speed_conv
|
||||
self.speed_limit_source = resolver.source
|
||||
self.speed_limit_assist_state = assist.state
|
||||
self.cap_delta = assist.capDelta
|
||||
self.target_cap = assist.targetCap * self.speed_conv
|
||||
self.disable_reason = assist.disableReason
|
||||
self.disable_reason_timestamp = time.monotonic()
|
||||
|
||||
if sm.updated["liveMapDataSP"]:
|
||||
lmd = sm["liveMapDataSP"]
|
||||
@@ -194,6 +206,15 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
|
||||
self._draw_sign_main(sign_rect, alpha)
|
||||
if self.speed_limit_assist_state == AssistState.preActive:
|
||||
self._draw_pre_active_arrow(sign_rect)
|
||||
elif self.speed_limit_assist_state == AssistState.tempPaused:
|
||||
self._draw_temp_paused_icon(sign_rect)
|
||||
elif self.speed_limit_assist_state == AssistState.capping:
|
||||
self._draw_cap_badge(sign_rect)
|
||||
# Also draw ahead info if valid and different from cap (mutual exclusion fix)
|
||||
if self.speed_limit_ahead_valid and round(self.speed_limit_ahead) != round(self.target_cap):
|
||||
ahead_info_y = sign_rect.y + sign_rect.height + 10 + 160 + 10
|
||||
ahead_rect = rl.Rectangle(sign_rect.x, ahead_info_y, sign_rect.width, 100)
|
||||
self._draw_ahead_info(ahead_rect)
|
||||
else:
|
||||
self._draw_ahead_info(sign_rect)
|
||||
|
||||
@@ -229,6 +250,38 @@ class SpeedLimitRenderer(Widget, SpeedLimitAlertRenderer):
|
||||
color = rl.Color(255, 255, 255, int(icon_alpha))
|
||||
rl.draw_texture_ex(txt_icon, rl.Vector2(arrow_x, arrow_y), 0.0, 1.0, color)
|
||||
|
||||
def _draw_temp_paused_icon(self, sign_rect):
|
||||
"""Draw greyed preActive icon when tempPaused."""
|
||||
# Reuse preActive icon with grey alpha
|
||||
icon_alpha = 128 # 50% opacity for paused state
|
||||
txt_icon = self.arrow_blank # Use blank/greyed version
|
||||
sign_margin = 12
|
||||
arrow_spacing = int(sign_margin * 1.4)
|
||||
arrow_x = sign_rect.x + sign_rect.width + arrow_spacing
|
||||
arrow_y = sign_rect.y + (sign_rect.height - txt_icon.height) / 2
|
||||
color = rl.Color(145, 155, 149, icon_alpha) # GREY color with alpha
|
||||
rl.draw_texture_ex(txt_icon, rl.Vector2(arrow_x, arrow_y), 0.0, 1.0, color)
|
||||
|
||||
def _draw_cap_badge(self, sign_rect):
|
||||
"""Draw CAP info panel below speed limit sign during capping."""
|
||||
rect = rl.Rectangle(sign_rect.x + (sign_rect.width - 170) / 2, sign_rect.y + sign_rect.height + 10, 170, 160)
|
||||
rl.draw_rectangle_rounded(rect, 0.35, 10, Colors.SUB_BG)
|
||||
rl.draw_rectangle_rounded_lines_ex(rect, 0.35, 10, 3, Colors.MUTCD_LINES)
|
||||
|
||||
mid_x = rect.x + rect.width / 2
|
||||
|
||||
label_color = Colors.AMBER if self.cap_delta > 0.5 else Colors.GREY
|
||||
self._draw_text_centered(self.font_demi, "CAP", 40, rl.Vector2(mid_x, rect.y + 28), label_color)
|
||||
|
||||
cap_speed = round(self.target_cap)
|
||||
self._draw_text_centered(self.font_bold, str(cap_speed), 70, rl.Vector2(mid_x, rect.y + 82), Colors.WHITE)
|
||||
|
||||
if self.cap_delta > 0.5:
|
||||
delta_display = round(self.cap_delta * self.speed_conv)
|
||||
delta_unit = 'km/h' if ui_state.is_metric else 'mph'
|
||||
delta_text = f'-{delta_display} {delta_unit}'
|
||||
self._draw_text_centered(self.font_norm, delta_text, 36, rl.Vector2(mid_x, rect.y + 134), Colors.GREY)
|
||||
|
||||
def _render_vienna(self, rect, val, sub, color, has_limit, alpha=1.0):
|
||||
center = rl.Vector2(rect.x + rect.width / 2, rect.y + rect.height / 2)
|
||||
radius = (rect.width + 18) / 2
|
||||
|
||||
@@ -60,7 +60,7 @@ class LongitudinalPlannerSP:
|
||||
# Speed Limit Assist
|
||||
has_speed_limit = self.resolver.speed_limit_valid or self.resolver.speed_limit_last_valid
|
||||
self.sla.update(long_enabled, long_override, v_ego, a_ego, v_cruise_cluster, self.resolver.speed_limit,
|
||||
self.resolver.speed_limit_final_last, has_speed_limit, self.resolver.distance, self.events_sp)
|
||||
self.resolver.speed_limit_final_last, has_speed_limit, self.resolver.distance, self.events_sp, CS.gasPressed)
|
||||
|
||||
targets = {
|
||||
LongitudinalPlanSource.cruise: (v_cruise, a_ego),
|
||||
@@ -132,6 +132,9 @@ class LongitudinalPlannerSP:
|
||||
assist.active = self.sla.is_active
|
||||
assist.vTarget = float(self.sla.output_v_target)
|
||||
assist.aTarget = float(self.sla.output_a_target)
|
||||
assist.capDelta = float(self.sla.cap_delta)
|
||||
assist.targetCap = float(self.sla._target_cap)
|
||||
assist.disableReason = self.sla.disable_reason
|
||||
|
||||
# E2E Alerts
|
||||
e2eAlerts = longitudinalPlanSP.e2eAlerts
|
||||
|
||||
@@ -17,3 +17,8 @@ CONFIRM_SPEED_THRESHOLD = {
|
||||
True: 80, # km/h
|
||||
False: 50, # mph
|
||||
}
|
||||
|
||||
MIN_CAP_FLOOR_MAX = {
|
||||
True: 64, # km/h
|
||||
False: 40, # mph
|
||||
}
|
||||
|
||||
@@ -27,3 +27,8 @@ class Mode(IntEnumBase):
|
||||
information = 1
|
||||
warning = 2
|
||||
assist = 3
|
||||
|
||||
|
||||
class UpshiftAccept(IntEnumBase):
|
||||
NEVER_RAISE = 0
|
||||
ACCEL_PEDAL = 1
|
||||
|
||||
@@ -9,6 +9,7 @@ from cereal import custom, car
|
||||
from openpilot.common.constants import CV
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode as SpeedLimitMode
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit import MIN_CAP_FLOOR_MAX
|
||||
|
||||
|
||||
def compare_cluster_target(v_cruise_cluster: float, target_set_speed: float, is_metric: bool) -> tuple[bool, bool]:
|
||||
@@ -42,3 +43,9 @@ def set_speed_limit_assist_availability(CP: car.CarParams, CP_SP: custom.CarPara
|
||||
params.put("SpeedLimitMode", int(SpeedLimitMode.warning))
|
||||
|
||||
return allowed
|
||||
|
||||
|
||||
def get_min_cap_floor(params: Params, is_metric: bool) -> float:
|
||||
value = params.get("SpeedLimitMinCapFloor", return_default=True)
|
||||
value = max(0, min(value, MIN_CAP_FLOOR_MAX[is_metric]))
|
||||
return value * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)
|
||||
|
||||
@@ -15,16 +15,20 @@ from openpilot.selfdrive.modeld.constants import ModelConstants
|
||||
from openpilot.sunnypilot import PARAMS_UPDATE_PERIOD
|
||||
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit import PCM_LONG_REQUIRED_MAX_SET_SPEED, CONFIRM_SPEED_THRESHOLD
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import compare_cluster_target, set_speed_limit_assist_availability
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode, UpshiftAccept
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import compare_cluster_target, set_speed_limit_assist_availability, \
|
||||
get_min_cap_floor
|
||||
|
||||
ButtonType = car.CarState.ButtonEvent.Type
|
||||
EventNameSP = custom.OnroadEventSP.EventName
|
||||
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
AssistDisableReason = custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason
|
||||
SpeedLimitSource = custom.LongitudinalPlanSP.SpeedLimit.Source
|
||||
|
||||
ACTIVE_STATES = (SpeedLimitAssistState.active, SpeedLimitAssistState.adapting)
|
||||
ENABLED_STATES = (SpeedLimitAssistState.preActive, SpeedLimitAssistState.pending, *ACTIVE_STATES)
|
||||
CAP_ACTIVE_STATES = (SpeedLimitAssistState.capping,)
|
||||
CAP_ENABLED_STATES = CAP_ACTIVE_STATES # cap mode has no partially-engaged state
|
||||
|
||||
DISABLED_GUARD_PERIOD = 0.5 # secs.
|
||||
# secs. Time to wait after activation before considering temp deactivation signal.
|
||||
@@ -33,6 +37,10 @@ PRE_ACTIVE_GUARD_PERIOD = {
|
||||
False: 5,
|
||||
}
|
||||
SPEED_LIMIT_CHANGED_HOLD_PERIOD = 1 # secs. Time to wait after speed limit change before switching to preActive.
|
||||
CAP_RAISE_HOLD_PERIOD = 0.2 # secs. Time to confirm limit raise before upshifting.
|
||||
CAP_SUSPEND_GUARD_PERIOD = 1.0 # secs. Time to hold cap disabled after long_override release.
|
||||
USER_PAUSE_TIMEOUT_TICKS = 6000 # 5 min / DT_MDL (0.05 s) = 6000 ticks
|
||||
RESUME_CLUSTER_DELTA_THRESHOLD = 1 # integer display-unit delta (kph or mph)
|
||||
|
||||
LIMIT_MIN_ACC = -1.5 # m/s^2 Maximum deceleration allowed for limit controllers to provide.
|
||||
LIMIT_MAX_ACC = 1.0 # m/s^2 Maximum acceleration allowed for limit controllers to provide while active.
|
||||
@@ -51,6 +59,7 @@ class SpeedLimitAssist:
|
||||
v_ego: float
|
||||
a_ego: float
|
||||
v_offset: float
|
||||
cap_delta: float
|
||||
|
||||
def __init__(self, CP: car.CarParams, CP_SP: custom.CarParamsSP):
|
||||
self.params = Params()
|
||||
@@ -64,10 +73,12 @@ class SpeedLimitAssist:
|
||||
self.enabled = self.params.get("SpeedLimitMode", return_default=True) == Mode.assist
|
||||
self.long_enabled = False
|
||||
self.long_enabled_prev = False
|
||||
self.long_override = False
|
||||
self.is_enabled = False
|
||||
self.is_active = False
|
||||
self.output_v_target = V_CRUISE_UNSET
|
||||
self.output_a_target = 0.
|
||||
self.cap_delta = 0.0
|
||||
self.v_ego = 0.
|
||||
self.a_ego = 0.
|
||||
self.v_offset = 0.
|
||||
@@ -92,8 +103,28 @@ class SpeedLimitAssist:
|
||||
self._minus_hold = 0.
|
||||
self._last_carstate_ts = 0.
|
||||
|
||||
self._cap_change_timer = 0
|
||||
self._cap_suspended_timer = 0
|
||||
self._cap_below_floor = False
|
||||
self._target_cap = 0.0
|
||||
self._cap_upshift_pressed = False
|
||||
self._cap_upshift_release_timer = 0
|
||||
self._cap_audio_cue_fired = False
|
||||
self._cap_raise_accepted = False
|
||||
self._accel_pressed = False
|
||||
self._was_cap_suspended = False
|
||||
self._override_active_last = False
|
||||
self._min_cap_floor = get_min_cap_floor(self.params, self.is_metric)
|
||||
self._cap_upshift_accept = self.params.get("SpeedLimitUpshiftAccept", return_default=True)
|
||||
self._cap_audio_cue_enabled = bool(self.params.get("SpeedLimitCapAudioCue", return_default=True))
|
||||
|
||||
self._user_paused: bool = False
|
||||
self._user_paused_timer: int = 0
|
||||
self._disable_reason = AssistDisableReason.none
|
||||
self._speed_limit_final_last_at_pause = 0.
|
||||
self.tempPaused_count = 0 # diagnostic counter for tests
|
||||
|
||||
# TODO-SP: SLA's own output_a_target for planner
|
||||
# Solution functions mapped to respective states
|
||||
self.acceleration_solutions = {
|
||||
SpeedLimitAssistState.disabled: self.get_current_acceleration_as_target,
|
||||
SpeedLimitAssistState.inactive: self.get_current_acceleration_as_target,
|
||||
@@ -101,8 +132,26 @@ class SpeedLimitAssist:
|
||||
SpeedLimitAssistState.pending: self.get_current_acceleration_as_target,
|
||||
SpeedLimitAssistState.adapting: self.get_adapting_state_target_acceleration,
|
||||
SpeedLimitAssistState.active: self.get_active_state_target_acceleration,
|
||||
SpeedLimitAssistState.capping: self.get_current_acceleration_as_target,
|
||||
SpeedLimitAssistState.tempPaused: self.get_current_acceleration_as_target,
|
||||
}
|
||||
|
||||
@property
|
||||
def disable_reason(self):
|
||||
return self._disable_reason
|
||||
|
||||
@property
|
||||
def _gates_pass(self) -> bool:
|
||||
return self.long_enabled and self.enabled
|
||||
|
||||
@property
|
||||
def _cap_gates_pass(self) -> bool:
|
||||
return self._gates_pass and not self.long_override and self._cap_suspended_timer <= 0
|
||||
|
||||
@property
|
||||
def _cap_entry_ready(self) -> bool:
|
||||
return self._has_speed_limit and not self._cap_below_floor
|
||||
|
||||
@property
|
||||
def speed_limit_changed(self) -> bool:
|
||||
return self._has_speed_limit and bool(self._speed_limit != self.speed_limit_prev)
|
||||
@@ -126,13 +175,13 @@ class SpeedLimitAssist:
|
||||
events_sp.add(EventNameSP.speedLimitActive)
|
||||
|
||||
def get_v_target_from_control(self) -> float:
|
||||
if self._has_speed_limit:
|
||||
if self.pcm_op_long and self.is_enabled:
|
||||
return self._speed_limit_final_last
|
||||
if not self.pcm_op_long and self.is_active:
|
||||
if self.pcm_op_long:
|
||||
if self.state == SpeedLimitAssistState.capping:
|
||||
return min(self.v_cruise_cluster, self._target_cap)
|
||||
else:
|
||||
if self._has_speed_limit and self.is_active:
|
||||
return self._speed_limit_final_last
|
||||
|
||||
# Fallback
|
||||
return V_CRUISE_UNSET
|
||||
|
||||
# TODO-SP: SLA's own output_a_target for planner
|
||||
@@ -144,6 +193,9 @@ class SpeedLimitAssist:
|
||||
self.is_metric = self.params.get_bool("IsMetric")
|
||||
set_speed_limit_assist_availability(self.CP, self.CP_SP, self.params)
|
||||
self.enabled = self.params.get("SpeedLimitMode", return_default=True) == Mode.assist
|
||||
self._min_cap_floor = get_min_cap_floor(self.params, self.is_metric)
|
||||
self._cap_upshift_accept = self.params.get("SpeedLimitUpshiftAccept", return_default=True)
|
||||
self._cap_audio_cue_enabled = bool(self.params.get("SpeedLimitCapAudioCue", return_default=True))
|
||||
|
||||
def update_car_state(self, CS: car.CarState) -> None:
|
||||
now = time.monotonic()
|
||||
@@ -187,7 +239,12 @@ class SpeedLimitAssist:
|
||||
cst_high
|
||||
pcm_long_required_max_set_speed_conv = round(pcm_long_required_max * speed_conv)
|
||||
|
||||
self.target_set_speed_conv = pcm_long_required_max_set_speed_conv if self.pcm_op_long else self.speed_limit_final_last_conv
|
||||
if self.pcm_op_long and self.state not in CAP_ACTIVE_STATES:
|
||||
self.target_set_speed_conv = pcm_long_required_max_set_speed_conv
|
||||
elif not self.pcm_op_long:
|
||||
self.target_set_speed_conv = self.speed_limit_final_last_conv
|
||||
else:
|
||||
self.target_set_speed_conv = self.v_cruise_cluster_conv
|
||||
|
||||
@property
|
||||
def apply_confirm_speed_threshold(self) -> bool:
|
||||
@@ -232,74 +289,142 @@ class SpeedLimitAssist:
|
||||
|
||||
return self._get_button_release(req_plus, req_minus)
|
||||
|
||||
def update_state_machine_pcm_op_long(self):
|
||||
self.long_engaged_timer = max(0, self.long_engaged_timer - 1)
|
||||
self.pre_active_timer = max(0, self.pre_active_timer - 1)
|
||||
def _cap_limit_change_held(self) -> bool:
|
||||
"""Return True when limit-change hold period has elapsed."""
|
||||
return self._cap_change_timer >= int(SPEED_LIMIT_CHANGED_HOLD_PERIOD / DT_MDL)
|
||||
|
||||
# ACTIVE, ADAPTING, PENDING, PRE_ACTIVE, INACTIVE
|
||||
def _cap_upshift_release_edge(self) -> bool:
|
||||
"""Return True when limit-raise hold period elapsed after gas release edge."""
|
||||
if self._cap_upshift_pressed and not self._accel_pressed:
|
||||
self._cap_upshift_release_timer = int(CAP_RAISE_HOLD_PERIOD / DT_MDL)
|
||||
|
||||
self._cap_upshift_pressed = self._accel_pressed
|
||||
|
||||
if self._cap_upshift_release_timer > 0:
|
||||
self._cap_upshift_release_timer = max(0, self._cap_upshift_release_timer - 1)
|
||||
if self._cap_upshift_release_timer <= 0:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _go_disabled(self, reason: 'AssistDisableReason') -> None:
|
||||
"""Transition to disabled state with given reason."""
|
||||
self._cap_raise_accepted = False
|
||||
self.state = SpeedLimitAssistState.disabled
|
||||
self._disable_reason = reason
|
||||
|
||||
def _should_exit_temp_pause(self) -> bool:
|
||||
"""Check if conditions warrant exiting temp pause state."""
|
||||
limit_changed = self._speed_limit_final_last != self._speed_limit_final_last_at_pause
|
||||
timer_expired = self._user_paused_timer <= 0
|
||||
cluster_realigned = abs(self.v_cruise_cluster_conv - self.speed_limit_final_last_conv) <= RESUME_CLUSTER_DELTA_THRESHOLD
|
||||
return limit_changed or timer_expired or cluster_realigned
|
||||
|
||||
def update_state_machine_cap(self, events_sp: EventsSP) -> tuple[bool, bool]:
|
||||
"""Cap mode FSM for pcm_op_long cars. Returns (enabled, active)."""
|
||||
# Bookkeeping: timers, override flags (unchanged)
|
||||
self._cap_change_timer = min(self._cap_change_timer + 1,
|
||||
int((SPEED_LIMIT_CHANGED_HOLD_PERIOD + 1) / DT_MDL))
|
||||
self._cap_upshift_release_timer = max(0, self._cap_upshift_release_timer - 1)
|
||||
self._user_paused_timer = max(0, self._user_paused_timer - 1)
|
||||
|
||||
if self._override_active_last and not self.long_override and self._was_cap_suspended:
|
||||
self._cap_suspended_timer = int(CAP_SUSPEND_GUARD_PERIOD / DT_MDL)
|
||||
elif not self.long_override:
|
||||
self._cap_suspended_timer = max(0, self._cap_suspended_timer - 1)
|
||||
|
||||
self._override_active_last = self.long_override
|
||||
|
||||
self._cap_below_floor = self._has_speed_limit and self._speed_limit_final_last < self._min_cap_floor
|
||||
|
||||
# Gate checks FIRST: apply to all non-disabled states (including tempPaused)
|
||||
if self.state != SpeedLimitAssistState.disabled:
|
||||
if not self.long_enabled or not self.enabled:
|
||||
self.state = SpeedLimitAssistState.disabled
|
||||
if not self._gates_pass:
|
||||
self._go_disabled(AssistDisableReason.gateDisabled)
|
||||
self._was_cap_suspended = False
|
||||
self._cap_suspended_timer = 0
|
||||
elif self.long_override:
|
||||
self._go_disabled(AssistDisableReason.longOverride)
|
||||
self._was_cap_suspended = True
|
||||
|
||||
else:
|
||||
# ACTIVE
|
||||
if self.state == SpeedLimitAssistState.active:
|
||||
if self.v_cruise_cluster_changed:
|
||||
self.state = SpeedLimitAssistState.inactive
|
||||
elif self.speed_limit_changed and self.apply_confirm_speed_threshold:
|
||||
self.state = SpeedLimitAssistState.preActive
|
||||
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
|
||||
elif self._has_speed_limit and self.v_offset < LIMIT_SPEED_OFFSET_TH:
|
||||
self.state = SpeedLimitAssistState.adapting
|
||||
# Sub-state dispatch (only if gates passed)
|
||||
elif self.state == SpeedLimitAssistState.tempPaused:
|
||||
# Exit conditions: speed limit changed, timer expired, or cluster delta returns
|
||||
if self._should_exit_temp_pause():
|
||||
self._user_paused = False
|
||||
self._user_paused_timer = 0
|
||||
self._go_disabled(AssistDisableReason.autoResume)
|
||||
|
||||
# ADAPTING
|
||||
elif self.state == SpeedLimitAssistState.adapting:
|
||||
if self.v_cruise_cluster_changed:
|
||||
self.state = SpeedLimitAssistState.inactive
|
||||
elif self.speed_limit_changed and self.apply_confirm_speed_threshold:
|
||||
self.state = SpeedLimitAssistState.preActive
|
||||
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
|
||||
elif self.v_offset >= LIMIT_SPEED_OFFSET_TH:
|
||||
self.state = SpeedLimitAssistState.active
|
||||
|
||||
# PENDING
|
||||
elif self.state == SpeedLimitAssistState.pending:
|
||||
if self.target_set_speed_confirmed:
|
||||
self._update_confirmed_state()
|
||||
elif self.speed_limit_changed:
|
||||
self.state = SpeedLimitAssistState.preActive
|
||||
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
|
||||
|
||||
# PRE_ACTIVE
|
||||
elif self.state == SpeedLimitAssistState.preActive:
|
||||
if self.target_set_speed_confirmed:
|
||||
self._update_confirmed_state()
|
||||
elif self.pre_active_timer <= 0:
|
||||
# Timeout - session ended
|
||||
self.state = SpeedLimitAssistState.inactive
|
||||
|
||||
# INACTIVE
|
||||
elif self.state == SpeedLimitAssistState.inactive:
|
||||
pass
|
||||
|
||||
# DISABLED
|
||||
elif self.state == SpeedLimitAssistState.disabled:
|
||||
if self.long_enabled and self.enabled:
|
||||
# start or reset preActive timer if initially enabled or manual set speed change detected
|
||||
if not self.long_enabled_prev or self.v_cruise_cluster_changed:
|
||||
self.long_engaged_timer = int(DISABLED_GUARD_PERIOD / DT_MDL)
|
||||
|
||||
elif self.long_engaged_timer <= 0:
|
||||
if self.target_set_speed_confirmed:
|
||||
self._update_confirmed_state()
|
||||
elif self._has_speed_limit:
|
||||
self.state = SpeedLimitAssistState.preActive
|
||||
self.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.pcm_op_long] / DT_MDL)
|
||||
elif self.state == SpeedLimitAssistState.capping:
|
||||
# Cluster delta entry: user nudged cruise control away from limit
|
||||
if self.v_cruise_cluster_changed:
|
||||
self._user_paused = True
|
||||
self._user_paused_timer = USER_PAUSE_TIMEOUT_TICKS
|
||||
self._speed_limit_final_last_at_pause = self._speed_limit_final_last
|
||||
self.state = SpeedLimitAssistState.tempPaused
|
||||
self._disable_reason = AssistDisableReason.userTempPause
|
||||
elif not self._has_speed_limit:
|
||||
self._cap_raise_accepted = False
|
||||
self.state = SpeedLimitAssistState.pending
|
||||
self._disable_reason = AssistDisableReason.mapGap
|
||||
elif self._cap_below_floor:
|
||||
self._cap_raise_accepted = False
|
||||
self.state = SpeedLimitAssistState.pending
|
||||
self._disable_reason = AssistDisableReason.belowFloor
|
||||
elif self._speed_limit_final_last != self._target_cap and self._cap_limit_change_held():
|
||||
old_cap = self._target_cap
|
||||
self._target_cap = self._speed_limit_final_last
|
||||
self._cap_change_timer = 0
|
||||
self._cap_raise_accepted = False
|
||||
if self._target_cap > old_cap:
|
||||
if self._cap_upshift_accept == UpshiftAccept.NEVER_RAISE:
|
||||
self._target_cap = old_cap
|
||||
elif self._cap_upshift_accept == UpshiftAccept.ACCEL_PEDAL:
|
||||
if not self._accel_pressed:
|
||||
self._cap_raise_accepted = True
|
||||
else:
|
||||
self._target_cap = old_cap
|
||||
else:
|
||||
self._cap_upshift_release_edge()
|
||||
else:
|
||||
self.state = SpeedLimitAssistState.pending
|
||||
self._cap_upshift_release_edge()
|
||||
else:
|
||||
self._cap_upshift_release_edge()
|
||||
|
||||
enabled = self.state in ENABLED_STATES
|
||||
active = self.state in ACTIVE_STATES
|
||||
elif self.state == SpeedLimitAssistState.pending:
|
||||
if self._cap_entry_ready:
|
||||
if not self._was_cap_suspended:
|
||||
self._target_cap = self._speed_limit_final_last
|
||||
self._cap_change_timer = 0
|
||||
self._cap_audio_cue_fired = False
|
||||
self._cap_raise_accepted = False
|
||||
self._disable_reason = AssistDisableReason.none
|
||||
self.state = SpeedLimitAssistState.capping
|
||||
|
||||
else:
|
||||
# Disabled-entry logic: if gates pass + cap_suspended_timer clear, enter capping/pending
|
||||
if self._cap_gates_pass:
|
||||
if self._cap_entry_ready:
|
||||
if not self._was_cap_suspended:
|
||||
self._target_cap = self._speed_limit_final_last
|
||||
self._cap_change_timer = 0
|
||||
self._cap_audio_cue_fired = False
|
||||
self._disable_reason = AssistDisableReason.none
|
||||
self.state = SpeedLimitAssistState.capping
|
||||
else:
|
||||
self._disable_reason = AssistDisableReason.mapGap if not self._has_speed_limit else AssistDisableReason.belowFloor
|
||||
self.state = SpeedLimitAssistState.pending
|
||||
|
||||
# Audio cue on capping entry
|
||||
if self.state == SpeedLimitAssistState.capping and self._state_prev != SpeedLimitAssistState.capping:
|
||||
# suppress audio cue on override-release re-entry
|
||||
if self._cap_audio_cue_enabled and not self._was_cap_suspended:
|
||||
events_sp.add(EventNameSP.speedLimitCapActive)
|
||||
self._cap_audio_cue_fired = True
|
||||
self._was_cap_suspended = False
|
||||
|
||||
enabled = self.state in CAP_ENABLED_STATES
|
||||
active = self.state in CAP_ACTIVE_STATES
|
||||
|
||||
return enabled, active
|
||||
|
||||
@@ -360,13 +485,13 @@ class SpeedLimitAssist:
|
||||
return enabled, active
|
||||
|
||||
def update_events(self, events_sp: EventsSP) -> None:
|
||||
if self.state == SpeedLimitAssistState.preActive:
|
||||
if not self.pcm_op_long and self.state == SpeedLimitAssistState.preActive:
|
||||
events_sp.add(EventNameSP.speedLimitPreActive)
|
||||
|
||||
if self.state == SpeedLimitAssistState.pending and self._state_prev != SpeedLimitAssistState.pending:
|
||||
events_sp.add(EventNameSP.speedLimitPending)
|
||||
|
||||
if self.is_active:
|
||||
if not self.pcm_op_long and self.is_active:
|
||||
if self._state_prev not in ACTIVE_STATES:
|
||||
self.update_active_event(events_sp)
|
||||
|
||||
@@ -379,8 +504,9 @@ class SpeedLimitAssist:
|
||||
self.update_active_event(events_sp)
|
||||
|
||||
def update(self, long_enabled: bool, long_override: bool, v_ego: float, a_ego: float, v_cruise_cluster: float, speed_limit: float,
|
||||
speed_limit_final_last: float, has_speed_limit: bool, distance: float, events_sp: EventsSP) -> None:
|
||||
speed_limit_final_last: float, has_speed_limit: bool, distance: float, events_sp: EventsSP, accel_pressed: bool = False) -> None:
|
||||
self.long_enabled = long_enabled
|
||||
self.long_override = long_override
|
||||
self.v_ego = v_ego
|
||||
self.a_ego = a_ego
|
||||
|
||||
@@ -388,13 +514,14 @@ class SpeedLimitAssist:
|
||||
self._speed_limit = speed_limit
|
||||
self._speed_limit_final_last = speed_limit_final_last
|
||||
self._distance = distance
|
||||
self._accel_pressed = accel_pressed
|
||||
|
||||
self.update_params()
|
||||
self.update_calculations(v_cruise_cluster)
|
||||
|
||||
self._state_prev = self.state
|
||||
if self.pcm_op_long:
|
||||
self.is_enabled, self.is_active = self.update_state_machine_pcm_op_long()
|
||||
self.is_enabled, self.is_active = self.update_state_machine_cap(events_sp)
|
||||
else:
|
||||
self.is_enabled, self.is_active = self.update_state_machine_non_pcm_long()
|
||||
|
||||
@@ -411,4 +538,9 @@ class SpeedLimitAssist:
|
||||
self.output_v_target = self.get_v_target_from_control()
|
||||
self.output_a_target = self.get_a_target_from_control()
|
||||
|
||||
if self.pcm_op_long and self.state == SpeedLimitAssistState.capping:
|
||||
self.cap_delta = max(0.0, self.v_cruise_cluster - self._target_cap)
|
||||
else:
|
||||
self.cap_delta = 0.0
|
||||
|
||||
self.frame += 1
|
||||
|
||||
136
sunnypilot/selfdrive/controls/lib/speed_limit/tests/conftest.py
Normal file
136
sunnypilot/selfdrive/controls/lib/speed_limit/tests/conftest.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""
|
||||
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
|
||||
|
||||
This file is part of sunnypilot and is licensed under the MIT License.
|
||||
See the LICENSE.md file in the root directory for more details.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from cereal import custom, car
|
||||
from opendbc.car.car_helpers import interfaces
|
||||
from opendbc.car.toyota.values import CAR as TOYOTA
|
||||
from openpilot.common.constants import CV
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.sunnypilot.selfdrive.car import interfaces as sunnypilot_interfaces
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import get_min_cap_floor
|
||||
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
|
||||
|
||||
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
|
||||
|
||||
class CarParamsFactory:
|
||||
@staticmethod
|
||||
def create_car_interface(car_name: str = TOYOTA.TOYOTA_RAV4_TSS2) -> tuple[car.CarParams, custom.CarParamsSP, object]:
|
||||
params = Params()
|
||||
CarInterface = interfaces[car_name]
|
||||
CP = CarInterface.get_non_essential_params(car_name)
|
||||
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
|
||||
CI = CarInterface(CP, CP_SP)
|
||||
CI.CP.openpilotLongitudinalControl = True
|
||||
sunnypilot_interfaces.setup_interfaces(CI, params)
|
||||
return CI.CP, CI.CP_SP, CI
|
||||
|
||||
|
||||
class SpeedLimitAssistScenario:
|
||||
def __init__(self, CP: car.CarParams, CP_SP: custom.CarParamsSP, params: Params = None):
|
||||
if params is None:
|
||||
params = Params()
|
||||
self.params = params
|
||||
self.CP = CP
|
||||
self.CP_SP = CP_SP
|
||||
self.params.put("SpeedLimitMode", int(Mode.assist))
|
||||
self.sla = SpeedLimitAssist(CP, CP_SP)
|
||||
self.sla.update_params()
|
||||
self.events_sp = EventsSP()
|
||||
self.speed_conv = CV.MS_TO_KPH if self.sla.is_metric else CV.MS_TO_MPH
|
||||
|
||||
def set_state(self, state: int) -> "SpeedLimitAssistScenario":
|
||||
self.sla.state = state
|
||||
return self
|
||||
|
||||
def set_speed_limits(self, speed_limit: float, distance: float = 0, speed_limit_final_last: float = 0,
|
||||
speed_limit_prev: float = 0) -> "SpeedLimitAssistScenario":
|
||||
self.sla._speed_limit = speed_limit
|
||||
self.sla._distance = distance
|
||||
self.sla.speed_limit_prev = speed_limit_prev
|
||||
return self
|
||||
|
||||
def set_cruise_speeds(self, v_cruise_cluster: float, v_cruise_cluster_prev: float = None) -> "SpeedLimitAssistScenario":
|
||||
self.sla.v_cruise_cluster = v_cruise_cluster
|
||||
if v_cruise_cluster_prev is None:
|
||||
v_cruise_cluster_prev = v_cruise_cluster
|
||||
self.sla.v_cruise_cluster_prev = v_cruise_cluster_prev
|
||||
self.sla.prev_v_cruise_cluster_conv = round(v_cruise_cluster_prev * self.speed_conv)
|
||||
return self
|
||||
|
||||
def set_engaged(self, op_engaged: bool) -> "SpeedLimitAssistScenario":
|
||||
self.sla.op_engaged = op_engaged
|
||||
return self
|
||||
|
||||
def set_param(self, key: str, value) -> "SpeedLimitAssistScenario":
|
||||
# IntEnum instances carry a .value the Params API does not accept directly
|
||||
if hasattr(value, 'value'):
|
||||
value = value.value
|
||||
|
||||
if isinstance(value, bool):
|
||||
self.params.put_bool(key, value)
|
||||
elif isinstance(value, int):
|
||||
self.params.put(key, value)
|
||||
else:
|
||||
self.params.put(key, str(value) if not isinstance(value, str) else value)
|
||||
|
||||
# Runtime caches these behind PARAMS_UPDATE_PERIOD; force-sync for tests
|
||||
if key == "SpeedLimitMinCapFloor":
|
||||
self.sla._min_cap_floor = get_min_cap_floor(self.sla.params, self.sla.is_metric)
|
||||
elif key == "SpeedLimitUpshiftAccept":
|
||||
self.sla._cap_upshift_accept = self.sla.params.get("SpeedLimitUpshiftAccept", return_default=True)
|
||||
elif key == "SpeedLimitCapAudioCue":
|
||||
self.sla._cap_audio_cue_enabled = bool(self.sla.params.get("SpeedLimitCapAudioCue", return_default=True))
|
||||
elif key == "SpeedLimitMode":
|
||||
self.sla.enabled = self.sla.params.get("SpeedLimitMode", return_default=True) == Mode.assist
|
||||
elif key == "IsMetric":
|
||||
self.sla.is_metric = self.sla.params.get_bool("IsMetric")
|
||||
return self
|
||||
|
||||
def clear_events(self) -> "SpeedLimitAssistScenario":
|
||||
self.events_sp.clear()
|
||||
return self
|
||||
|
||||
def reset_state(self) -> "SpeedLimitAssistScenario":
|
||||
self.sla.state = SpeedLimitAssistState.disabled
|
||||
self.sla.frame = -1
|
||||
self.sla.long_enabled = False
|
||||
self.sla.long_enabled_prev = False
|
||||
self.sla._speed_limit = 0.0
|
||||
self.sla.speed_limit_prev = 0.0
|
||||
self.sla._speed_limit_final_last = 0.0
|
||||
self.sla._distance = 0.0
|
||||
self.sla.long_engaged_timer = 0
|
||||
self.sla.pre_active_timer = 0
|
||||
self.events_sp.clear()
|
||||
return self
|
||||
|
||||
def build(self) -> "SpeedLimitAssistScenario":
|
||||
return self
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def params():
|
||||
p = Params()
|
||||
yield p
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def car_params_factory():
|
||||
return CarParamsFactory()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scenario_builder(params):
|
||||
def builder(car_name: str = TOYOTA.TOYOTA_RAV4_TSS2) -> SpeedLimitAssistScenario:
|
||||
CP, CP_SP, _ = CarParamsFactory.create_car_interface(car_name)
|
||||
return SpeedLimitAssistScenario(CP, CP_SP, params)
|
||||
return builder
|
||||
@@ -0,0 +1,362 @@
|
||||
"""
|
||||
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
|
||||
|
||||
This file is part of sunnypilot and is licensed under the MIT License.
|
||||
See the LICENSE.md file in the root directory for more details.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from cereal import car, custom
|
||||
from openpilot.common.realtime import DT_MDL
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist, \
|
||||
SpeedLimitAssistState
|
||||
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
|
||||
|
||||
|
||||
def make_mock_car_params(mocker, pcm_op_long: bool = True) -> car.CarParams:
|
||||
"""Create a minimal CarParams mock with pcm_op_long setting."""
|
||||
cp = mocker.MagicMock(spec=car.CarParams)
|
||||
cp.openpilotLongitudinalControl = pcm_op_long
|
||||
cp.pcmCruise = pcm_op_long
|
||||
cp.brand = "generic"
|
||||
return cp
|
||||
|
||||
|
||||
def make_mock_car_params_sp(mocker) -> custom.CarParamsSP:
|
||||
"""Create a minimal CarParamsSP mock."""
|
||||
cp_sp = mocker.MagicMock(spec=custom.CarParamsSP)
|
||||
return cp_sp
|
||||
|
||||
|
||||
def get_event_name_sp():
|
||||
"""Get EventNameSP enum."""
|
||||
return custom.OnroadEventSP.EventName
|
||||
|
||||
|
||||
def make_sla_factory(mocker, pcm_op_long: bool = True) -> SpeedLimitAssist:
|
||||
"""Factory: create a SpeedLimitAssist instance with mocked params."""
|
||||
cp = make_mock_car_params(mocker, pcm_op_long)
|
||||
cp_sp = make_mock_car_params_sp(mocker)
|
||||
|
||||
mock_params = mocker.MagicMock()
|
||||
mock_params.get_bool.return_value = True
|
||||
mock_params.get.return_value = True
|
||||
|
||||
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist.Params",
|
||||
return_value=mock_params)
|
||||
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers.set_speed_limit_assist_availability")
|
||||
mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers.get_min_cap_floor", return_value=5.0)
|
||||
sla = SpeedLimitAssist(cp, cp_sp)
|
||||
return sla
|
||||
|
||||
|
||||
class TestBug1OverrideNoOscillation:
|
||||
"""Override falling edge arms suspend timer, preventing oscillation."""
|
||||
|
||||
def test_sustained_override_stays_disabled(self, mocker):
|
||||
"""5s sustained override: state never enters capping."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.capping
|
||||
sla._state_prev = SpeedLimitAssistState.capping
|
||||
sla._target_cap = 25.0
|
||||
events_sp = EventsSP()
|
||||
|
||||
num_ticks = int(5.0 / DT_MDL)
|
||||
for _ in range(num_ticks):
|
||||
sla.long_override = True
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert (sla.state != SpeedLimitAssistState.capping), "State entered capping during sustained override"
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
def test_override_release_enters_capping_after_guard(self, mocker):
|
||||
"""Release override: state enters capping after guard period."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.disabled
|
||||
sla._state_prev = SpeedLimitAssistState.disabled
|
||||
sla._target_cap = 25.0
|
||||
sla._was_cap_suspended = True
|
||||
sla._override_active_last = True
|
||||
sla.long_override = True
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
sla.long_override = False
|
||||
|
||||
guard_ticks = int(1.0 / DT_MDL)
|
||||
for i in range(guard_ticks + 1):
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
if i < guard_ticks:
|
||||
assert sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
|
||||
class TestBug2TargetCapPreserved:
|
||||
"""Target cap preserved across override-release cycles."""
|
||||
|
||||
def test_target_cap_preserved_on_override_suspension(self, mocker):
|
||||
"""Override pulse: _target_cap preserved, not reset to new limit."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.capping
|
||||
sla._state_prev = SpeedLimitAssistState.capping
|
||||
sla._target_cap = 25.0
|
||||
sla._cap_change_timer = 0
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla._speed_limit_final_last = 30.0
|
||||
|
||||
sla.long_override = True
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
assert sla.state == SpeedLimitAssistState.disabled
|
||||
assert sla._was_cap_suspended is True
|
||||
|
||||
sla.long_override = False
|
||||
guard_ticks = int(1.0 / DT_MDL)
|
||||
for _ in range(guard_ticks + 1):
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
assert sla._target_cap == 25.0, f"Expected 25.0, got {sla._target_cap}"
|
||||
|
||||
|
||||
class TestBug3AccelPressedWired:
|
||||
"""Accel_pressed parameter wired to sla.update()."""
|
||||
|
||||
def test_accel_pressed_parameter_received(self, mocker):
|
||||
"""sla.update() receives accel_pressed=True."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla.v_ego = 20.0
|
||||
sla.a_ego = 0.0
|
||||
sla.v_cruise_cluster = 30.0
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.update(
|
||||
long_enabled=True,
|
||||
long_override=False,
|
||||
v_ego=20.0,
|
||||
a_ego=0.0,
|
||||
v_cruise_cluster=30.0,
|
||||
speed_limit=30.0,
|
||||
speed_limit_final_last=30.0,
|
||||
has_speed_limit=True,
|
||||
distance=100.0,
|
||||
events_sp=events_sp,
|
||||
accel_pressed=True,
|
||||
)
|
||||
|
||||
assert sla._accel_pressed is True
|
||||
|
||||
def test_accel_pressed_false_by_default(self, mocker):
|
||||
"""sla.update() with accel_pressed=False (default)."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.update(
|
||||
long_enabled=True,
|
||||
long_override=False,
|
||||
v_ego=20.0,
|
||||
a_ego=0.0,
|
||||
v_cruise_cluster=30.0,
|
||||
speed_limit=30.0,
|
||||
speed_limit_final_last=30.0,
|
||||
has_speed_limit=True,
|
||||
distance=100.0,
|
||||
events_sp=events_sp,
|
||||
)
|
||||
|
||||
assert sla._accel_pressed is False
|
||||
|
||||
|
||||
class TestBug4NoAudioCueOnOverrideReentry:
|
||||
"""Audio cue suppressed on override-release re-entry."""
|
||||
|
||||
def test_cue_fires_on_cold_entry(self, mocker):
|
||||
"""Fresh engagement: audio cue fires exactly once."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.disabled
|
||||
sla._state_prev = SpeedLimitAssistState.disabled
|
||||
sla._was_cap_suspended = False
|
||||
sla._cap_audio_cue_enabled = True
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
assert sla._cap_audio_cue_fired is True
|
||||
event_name_sp = get_event_name_sp()
|
||||
assert event_name_sp.speedLimitCapActive in events_sp.events, "Audio cue event not fired"
|
||||
|
||||
def test_no_cue_on_override_reentry(self, mocker):
|
||||
"""Override-release re-entry: audio cue NOT fired."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.disabled
|
||||
sla._state_prev = SpeedLimitAssistState.capping
|
||||
sla._target_cap = 25.0
|
||||
sla._was_cap_suspended = True
|
||||
sla._override_active_last = False
|
||||
sla._cap_audio_cue_enabled = True
|
||||
sla._cap_audio_cue_fired = True
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla._cap_suspended_timer = int(1.0 / DT_MDL)
|
||||
guard_ticks = int(1.0 / DT_MDL) + 1
|
||||
for _ in range(guard_ticks):
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
sla._state_prev = sla.state
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
event_name_sp = get_event_name_sp()
|
||||
assert (event_name_sp.speedLimitCapActive not in events_sp.events), "Audio cue should not fire on override re-entry"
|
||||
|
||||
|
||||
class TestEdgeACases:
|
||||
"""Edge A: No spurious timer on non-capping override."""
|
||||
|
||||
def test_edge_a_no_spurious_timer_on_disabled_override(self, mocker):
|
||||
"""Override during disabled state should not arm timer."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = False
|
||||
sla._was_cap_suspended = False
|
||||
sla.state = SpeedLimitAssistState.disabled
|
||||
sla._state_prev = SpeedLimitAssistState.disabled
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.long_override = True
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
sla.long_override = False
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla._cap_suspended_timer == 0
|
||||
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
|
||||
class TestEdgeBCases:
|
||||
"""Edge B: _target_cap preserved via pending."""
|
||||
|
||||
def test_edge_b_target_cap_preserved_via_pending(self, mocker):
|
||||
"""Target cap preserved when transiting pending->capping after suspension."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla.state = SpeedLimitAssistState.capping
|
||||
sla._state_prev = SpeedLimitAssistState.capping
|
||||
sla._target_cap = 25.0
|
||||
sla._was_cap_suspended = True
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla._has_speed_limit = False
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
assert sla.state == SpeedLimitAssistState.pending
|
||||
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 30.0
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla.state == SpeedLimitAssistState.capping
|
||||
assert sla._target_cap == 25.0
|
||||
|
||||
|
||||
class TestEdgeCCases:
|
||||
"""Edge C: Timer cleared on disengage."""
|
||||
|
||||
def test_edge_c_timer_cleared_on_disengage(self, mocker):
|
||||
"""Timer cleared when long_enabled=False."""
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
|
||||
sla.long_enabled = True
|
||||
sla.enabled = True
|
||||
sla._has_speed_limit = True
|
||||
sla._speed_limit_final_last = 25.0
|
||||
sla._min_cap_floor = 5.0
|
||||
sla._cap_suspended_timer = 50
|
||||
sla.state = SpeedLimitAssistState.capping
|
||||
sla._state_prev = SpeedLimitAssistState.capping
|
||||
events_sp = EventsSP()
|
||||
|
||||
sla.long_enabled = False
|
||||
sla.update_state_machine_cap(events_sp)
|
||||
|
||||
assert sla._cap_suspended_timer == 0
|
||||
assert sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
|
||||
class TestTargetCapPublished:
|
||||
"""Target cap published in cereal message."""
|
||||
|
||||
def test_target_cap_written_to_assist(self, mocker):
|
||||
"""_write_assist_fields writes sla._target_cap to assist.targetCap."""
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlannerSP
|
||||
|
||||
sla = make_sla_factory(mocker, pcm_op_long=True)
|
||||
sla._target_cap = 22.35
|
||||
sla.output_v_target = 10.0
|
||||
sla.output_a_target = 0.0
|
||||
sla.cap_delta = 0.0
|
||||
sla.is_enabled = True
|
||||
sla.is_active = False
|
||||
sla.state = SpeedLimitAssistState.disabled
|
||||
|
||||
planner = mocker.MagicMock()
|
||||
planner.sla = sla
|
||||
|
||||
msg = custom.LongitudinalPlanSP.new_message()
|
||||
assist = msg.speedLimit.assist
|
||||
|
||||
LongitudinalPlannerSP._write_assist_fields(planner, assist)
|
||||
|
||||
assert assist.targetCap == pytest.approx(22.35)
|
||||
assert assist.enabled is True
|
||||
assert assist.active is False
|
||||
assert assist.vTarget == pytest.approx(10.0)
|
||||
assert assist.aTarget == pytest.approx(0.0)
|
||||
assert assist.capDelta == pytest.approx(0.0)
|
||||
@@ -7,7 +7,7 @@ See the LICENSE.md file in the root directory for more details.
|
||||
|
||||
import pytest
|
||||
|
||||
from cereal import custom
|
||||
from cereal import custom, car
|
||||
from opendbc.car.car_helpers import interfaces
|
||||
from opendbc.car.rivian.values import CAR as RIVIAN
|
||||
from opendbc.car.tesla.values import CAR as TESLA
|
||||
@@ -25,6 +25,7 @@ from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist
|
||||
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
|
||||
|
||||
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
ButtonType = car.CarState.ButtonEvent.Type
|
||||
|
||||
ALL_STATES = tuple(SpeedLimitAssistState.schema.enumerants.values())
|
||||
|
||||
@@ -71,6 +72,7 @@ class TestSpeedLimitAssist:
|
||||
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
|
||||
CI = CarInterface(CP, CP_SP)
|
||||
CI.CP.openpilotLongitudinalControl = True # always assume it's openpilot longitudinal
|
||||
CI.CP.pcmCruise = False # test non-PCM FSM path (preActive, pending, adapting, active)
|
||||
sunnypilot_interfaces.setup_interfaces(CI, self.params)
|
||||
return CI
|
||||
|
||||
@@ -83,13 +85,17 @@ class TestSpeedLimitAssist:
|
||||
|
||||
def reset_state(self):
|
||||
self.sla.state = SpeedLimitAssistState.disabled
|
||||
self.sla._state_prev = SpeedLimitAssistState.disabled
|
||||
self.sla.frame = -1
|
||||
self.sla.last_op_engaged_frame = 0
|
||||
self.sla.op_engaged = False
|
||||
self.sla.op_engaged_prev = False
|
||||
self.sla.long_enabled = False
|
||||
self.sla.long_enabled_prev = False
|
||||
self.sla.long_engaged_timer = 0
|
||||
self.sla.pre_active_timer = 0
|
||||
self.sla._speed_limit = 0.
|
||||
self.sla._speed_limit_final_last = 0.
|
||||
self.sla.speed_limit_prev = 0.
|
||||
self.sla.last_valid_speed_limit_offsetted = 0.
|
||||
self.sla.v_cruise_cluster = 0.
|
||||
self.sla.v_cruise_cluster_prev = 0.
|
||||
self.sla._distance = 0.
|
||||
self.events_sp.clear()
|
||||
|
||||
@@ -133,20 +139,6 @@ class TestSpeedLimitAssist:
|
||||
assert self.sla.state == SpeedLimitAssistState.preActive
|
||||
assert self.sla.is_enabled and not self.sla.is_active
|
||||
|
||||
def test_transition_disabled_to_pending_no_speed_limit_not_max_initial_set_speed(self):
|
||||
for _ in range(int(3. / DT_MDL)):
|
||||
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, SPEED_LIMITS['city'], 0, 0, False, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.pending
|
||||
assert self.sla.is_enabled and not self.sla.is_active
|
||||
|
||||
def test_preactive_to_active_with_max_speed_confirmation(self):
|
||||
self.sla.state = SpeedLimitAssistState.preActive
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
|
||||
SPEED_LIMITS['highway'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.active
|
||||
assert self.sla.is_enabled and self.sla.is_active
|
||||
assert self.sla.output_v_target == SPEED_LIMITS['highway']
|
||||
|
||||
def test_preactive_timeout_to_inactive(self):
|
||||
self.sla.state = SpeedLimitAssistState.preActive
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, SPEED_LIMITS['highway'], SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
@@ -155,47 +147,6 @@ class TestSpeedLimitAssist:
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, SPEED_LIMITS['highway'], SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.inactive
|
||||
|
||||
def test_preactive_to_pending_no_speed_limit(self):
|
||||
self.sla.state = SpeedLimitAssistState.preActive
|
||||
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, self.pcm_long_max_set_speed, 0, 0, False, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.pending
|
||||
assert self.sla.is_enabled and not self.sla.is_active
|
||||
|
||||
def test_pending_to_active_when_speed_limit_available(self):
|
||||
self.sla.state = SpeedLimitAssistState.pending
|
||||
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
|
||||
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['highway'], 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['highway'], SPEED_LIMITS['highway'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.active
|
||||
|
||||
def test_pending_to_adapting_when_below_speed_limit(self):
|
||||
self.sla.state = SpeedLimitAssistState.pending
|
||||
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
|
||||
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['highway'] + 5, 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['highway'], SPEED_LIMITS['highway'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.adapting
|
||||
assert self.sla.is_enabled and self.sla.is_active
|
||||
|
||||
def test_active_to_adapting_transition(self):
|
||||
self.initialize_active_state(self.pcm_long_max_set_speed)
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['highway'] + 2, 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
|
||||
SPEED_LIMITS['highway'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.adapting
|
||||
|
||||
def test_adapting_to_active_transition(self):
|
||||
self.sla.state = SpeedLimitAssistState.adapting
|
||||
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
|
||||
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed, SPEED_LIMITS['highway'],
|
||||
SPEED_LIMITS['highway'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.active
|
||||
|
||||
def test_manual_cruise_change_detection(self):
|
||||
self.sla.state = SpeedLimitAssistState.active
|
||||
expected_cruise = SPEED_LIMITS['highway']
|
||||
@@ -203,6 +154,7 @@ class TestSpeedLimitAssist:
|
||||
|
||||
different_cruise = SPEED_LIMITS['highway'] + 5
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, different_cruise, SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
# In non-pcm mode, manual cruise change transitions to inactive (not tempPaused)
|
||||
assert self.sla.state == SpeedLimitAssistState.inactive
|
||||
|
||||
# TODO-SP: test lower CST cases
|
||||
@@ -276,3 +228,236 @@ class TestSpeedLimitAssist:
|
||||
assert self.sla.state in [SpeedLimitAssistState.preActive, SpeedLimitAssistState.active]
|
||||
elif initial_state in ACTIVE_STATES:
|
||||
assert self.sla.state in ACTIVE_STATES
|
||||
|
||||
def test_non_pcm_regression_method_exists(self):
|
||||
"""Regression: update_state_machine_non_pcm_long() method exists unchanged."""
|
||||
assert hasattr(self.sla, "update_state_machine_non_pcm_long")
|
||||
# Verify it is callable
|
||||
assert callable(self.sla.update_state_machine_non_pcm_long)
|
||||
# Verify method is not affected by cap mode refactor (signature check)
|
||||
import inspect
|
||||
inspect.signature(self.sla.update_state_machine_non_pcm_long)
|
||||
# Non-PCM method should have expected parameters (varies by impl, just verify it's present)
|
||||
|
||||
|
||||
class TestSpeedLimitAssistTempPaused:
|
||||
"""Tests for tempPaused state functionality (cap mode)."""
|
||||
|
||||
def setup_method(self, method):
|
||||
self.params = Params()
|
||||
self.reset_custom_params()
|
||||
self.events_sp = EventsSP()
|
||||
CI = self._setup_platform(DEFAULT_CAR)
|
||||
self.sla = SpeedLimitAssist(CI.CP, CI.CP_SP)
|
||||
self.sla.pre_active_timer = int(PRE_ACTIVE_GUARD_PERIOD[self.sla.pcm_op_long] / DT_MDL)
|
||||
self.pcm_long_max_set_speed = PCM_LONG_REQUIRED_MAX_SET_SPEED[self.sla.is_metric][1] # use 80 MPH for now
|
||||
self.speed_conv = CV.MS_TO_KPH if self.sla.is_metric else CV.MS_TO_MPH
|
||||
# For temp paused tests, use pcm_op_long = True
|
||||
self.sla.pcm_op_long = True
|
||||
self.sla.enabled = True
|
||||
|
||||
def teardown_method(self, method):
|
||||
self.reset_state()
|
||||
|
||||
def _setup_platform(self, car_name):
|
||||
CarInterface = interfaces[car_name]
|
||||
CP = CarInterface.get_non_essential_params(car_name)
|
||||
CP_SP = CarInterface.get_non_essential_params_sp(CP, car_name)
|
||||
CI = CarInterface(CP, CP_SP)
|
||||
CI.CP.openpilotLongitudinalControl = True # always assume it's openpilot longitudinal
|
||||
CI.CP.pcmCruise = False # test non-PCM FSM path (preActive, pending, adapting, active)
|
||||
sunnypilot_interfaces.setup_interfaces(CI, self.params)
|
||||
return CI
|
||||
|
||||
def reset_custom_params(self):
|
||||
self.params.put("IsReleaseSpBranch", True)
|
||||
self.params.put("SpeedLimitMode", int(Mode.assist))
|
||||
self.params.put_bool("IsMetric", False)
|
||||
self.params.put("SpeedLimitOffsetType", 0)
|
||||
self.params.put("SpeedLimitValueOffset", 0)
|
||||
|
||||
def reset_state(self):
|
||||
self.sla.state = SpeedLimitAssistState.disabled
|
||||
self.sla._state_prev = SpeedLimitAssistState.disabled
|
||||
self.sla.frame = -1
|
||||
self.sla.long_enabled = False
|
||||
self.sla.long_enabled_prev = False
|
||||
self.sla.long_engaged_timer = 0
|
||||
self.sla.pre_active_timer = 0
|
||||
self.sla._speed_limit = 0.
|
||||
self.sla._speed_limit_final_last = 0.
|
||||
self.sla.speed_limit_prev = 0.
|
||||
self.sla.v_cruise_cluster = 0.
|
||||
self.sla.v_cruise_cluster_prev = 0.
|
||||
self.sla._distance = 0.
|
||||
self.events_sp.clear()
|
||||
|
||||
def test_temp_paused_entry_capping_state(self):
|
||||
"""Test that tempPaused entry occurs during state machine when user paused."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla._user_paused = True
|
||||
self.sla._user_paused_timer = 1000
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
|
||||
|
||||
def test_temp_paused_exit_on_speed_limit_change(self):
|
||||
"""Test exiting tempPaused when speed limit changes."""
|
||||
self.sla.state = SpeedLimitAssistState.tempPaused
|
||||
self.sla._user_paused = True
|
||||
self.sla._user_paused_timer = 1000
|
||||
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
|
||||
# Change speed limit
|
||||
new_limit = SPEED_LIMITS['highway']
|
||||
self.sla.update(True, False, new_limit, 0, self.pcm_long_max_set_speed,
|
||||
new_limit, new_limit, True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.disabled
|
||||
assert self.sla._user_paused == False
|
||||
|
||||
def test_temp_paused_exit_on_timer_expiry(self):
|
||||
"""Test exiting tempPaused when 5-minute timer expires."""
|
||||
self.sla.state = SpeedLimitAssistState.tempPaused
|
||||
self.sla._user_paused = True
|
||||
self.sla._user_paused_timer = 1
|
||||
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
|
||||
|
||||
# Trigger update with timer expiry
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.disabled
|
||||
assert self.sla._user_paused_timer <= 0
|
||||
|
||||
def test_disable_reason_user_cancel(self):
|
||||
"""Test disable_reason set to userCancel on pause."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._user_paused = True
|
||||
self.sla._user_paused_timer = 1000
|
||||
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
|
||||
|
||||
def test_disable_reason_long_override(self):
|
||||
"""Test disable_reason set to longOverride."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._target_cap = SPEED_LIMITS['city']
|
||||
|
||||
self.sla.update(True, True, SPEED_LIMITS['city'], 0, self.pcm_long_max_set_speed,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.disabled
|
||||
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.longOverride
|
||||
|
||||
def test_disable_reason_below_floor(self):
|
||||
"""Test disable_reason set to belowFloor."""
|
||||
min_floor = self.sla._min_cap_floor
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._speed_limit_final_last = min_floor - 1
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
self.sla.v_cruise_cluster = self.pcm_long_max_set_speed
|
||||
self.sla.v_cruise_cluster_prev = self.pcm_long_max_set_speed
|
||||
self.sla.prev_v_cruise_cluster_conv = round(self.pcm_long_max_set_speed * self.speed_conv)
|
||||
|
||||
self.sla.update(True, False, min_floor - 1, 0, self.pcm_long_max_set_speed,
|
||||
min_floor - 1, min_floor - 1, True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.pending
|
||||
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.belowFloor
|
||||
|
||||
def test_temp_paused_entry_cap_cluster_nudge_plus(self):
|
||||
"""Test tempPaused entry in capping state when cluster nudged above limit."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla._target_cap = SPEED_LIMITS['city']
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
|
||||
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
|
||||
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
|
||||
|
||||
# Nudge cluster up (simulate user pressing accel)
|
||||
nudged_cruise = SPEED_LIMITS['city'] + 1.0
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
assert self.sla._user_paused == True
|
||||
assert self.sla._user_paused_timer > 0
|
||||
assert self.sla._disable_reason == custom.LongitudinalPlanSP.SpeedLimit.AssistDisableReason.userTempPause
|
||||
|
||||
def test_temp_paused_entry_cap_cluster_nudge_minus(self):
|
||||
"""Test tempPaused entry in capping state when cluster nudged below limit."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla._target_cap = SPEED_LIMITS['city']
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
|
||||
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
|
||||
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
|
||||
|
||||
# Nudge cluster down (simulate user pressing brake/decel)
|
||||
nudged_cruise = SPEED_LIMITS['city'] - 1.0
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
assert self.sla._user_paused == True
|
||||
assert self.sla._user_paused_timer > 0
|
||||
|
||||
def test_temp_paused_exit_cluster_returns_to_limit(self):
|
||||
"""Test exiting tempPaused when cluster returns within ±1 of limit."""
|
||||
self.sla.state = SpeedLimitAssistState.tempPaused
|
||||
self.sla._user_paused = True
|
||||
self.sla._user_paused_timer = 1000
|
||||
self.sla._speed_limit_final_last_at_pause = SPEED_LIMITS['city']
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city'] + 5
|
||||
self.sla.prev_v_cruise_cluster_conv = round((SPEED_LIMITS['city'] + 5) * self.speed_conv)
|
||||
|
||||
# Return cluster to within ±1 of limit
|
||||
returned_cruise = SPEED_LIMITS['city'] + 0.5
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, returned_cruise,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.disabled
|
||||
assert self.sla._user_paused == False
|
||||
|
||||
def test_temp_paused_sticky_double_nudge(self):
|
||||
"""Test that multiple nudges keep state in tempPaused."""
|
||||
self.sla.state = SpeedLimitAssistState.capping
|
||||
self.sla._has_speed_limit = True
|
||||
self.sla._speed_limit_final_last = SPEED_LIMITS['city']
|
||||
self.sla._target_cap = SPEED_LIMITS['city']
|
||||
self.sla.long_enabled = True
|
||||
self.sla.enabled = True
|
||||
self.sla.v_cruise_cluster = SPEED_LIMITS['city']
|
||||
self.sla.v_cruise_cluster_prev = SPEED_LIMITS['city']
|
||||
self.sla.prev_v_cruise_cluster_conv = round(SPEED_LIMITS['city'] * self.speed_conv)
|
||||
|
||||
# First nudge
|
||||
nudged_cruise_1 = SPEED_LIMITS['city'] + 2.0
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise_1,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
assert self.sla._user_paused_timer > 0
|
||||
saved_timer_1 = self.sla._user_paused_timer
|
||||
|
||||
# Second nudge while in tempPaused (shouldn't trigger another entry)
|
||||
nudged_cruise_2 = SPEED_LIMITS['city'] + 3.0
|
||||
self.sla.update(True, False, SPEED_LIMITS['city'], 0, nudged_cruise_2,
|
||||
SPEED_LIMITS['city'], SPEED_LIMITS['city'], True, 0, self.events_sp)
|
||||
assert self.sla.state == SpeedLimitAssistState.tempPaused
|
||||
# Timer should have been decremented by one update call
|
||||
assert self.sla._user_paused_timer == saved_timer_1 - 1
|
||||
|
||||
@@ -0,0 +1,585 @@
|
||||
"""
|
||||
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
|
||||
|
||||
This file is part of sunnypilot and is licensed under the MIT License.
|
||||
See the LICENSE.md file in the root directory for more details.
|
||||
|
||||
Test suite for Speed Limit Assist cap mode (pcm_op_long only).
|
||||
Covers 20 edge cases for FSM, debounce, upshift, pedal release, and audio cue.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from cereal import custom
|
||||
from opendbc.car.toyota.values import CAR as TOYOTA
|
||||
from openpilot.common.constants import CV
|
||||
from openpilot.common.realtime import DT_MDL
|
||||
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import UpshiftAccept
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.speed_limit_assist import SpeedLimitAssist, \
|
||||
CAP_ACTIVE_STATES
|
||||
|
||||
SpeedLimitAssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
|
||||
|
||||
|
||||
class TestSpeedLimitCapMode:
|
||||
|
||||
def test_cap_mode_applies_to_pcm_op_long(self, scenario_builder):
|
||||
"""Cap mode applies to pcm_op_long cars (pcmCruise=True + openpilotLongitudinalControl=True)."""
|
||||
scenario = scenario_builder(TOYOTA.TOYOTA_RAV4_TSS2)
|
||||
scenario.set_state(SpeedLimitAssistState.disabled)
|
||||
assert scenario.sla.pcm_op_long is True
|
||||
assert scenario.sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
def test_disabled_to_capping_transition(self, scenario_builder):
|
||||
"""FSM: disabled -> capping when speed limit available and engaged."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.disabled)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Update to enter capping
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
assert scenario.sla.is_active
|
||||
|
||||
def test_capping_to_disabled_on_disengagement(self, scenario_builder):
|
||||
"""FSM: capping -> disabled when user disengages (manual override)."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Simulate manual override (long_override=True)
|
||||
scenario.sla.update(
|
||||
True, True, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
assert scenario.sla.state == SpeedLimitAssistState.disabled
|
||||
assert scenario.sla.output_v_target == V_CRUISE_UNSET
|
||||
|
||||
def test_below_floor_pause_transition(self, scenario_builder):
|
||||
"""FSM: capping exits to pending (no cap emitted) when posted limit is below min cap floor."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 25) # 25 mph floor
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
|
||||
# Posted limit 20 mph (below 25 mph floor) -> cap must pause
|
||||
low_limit_ms = 20 * CV.MPH_TO_MS
|
||||
scenario.set_speed_limits(low_limit_ms, 0)
|
||||
scenario.set_cruise_speeds(30 * CV.MPH_TO_MS)
|
||||
|
||||
for _ in range(int(0.5 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 30 * CV.MPH_TO_MS, 0,
|
||||
30 * CV.MPH_TO_MS, low_limit_ms, low_limit_ms,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Below-floor condition pauses cap: state is not capping, v_target is unset (no cap)
|
||||
assert scenario.sla.state != SpeedLimitAssistState.capping
|
||||
assert scenario.sla.output_v_target == V_CRUISE_UNSET
|
||||
|
||||
def test_resume_from_pause_above_floor(self, scenario_builder):
|
||||
"""FSM: pending (below-floor pause) -> capping when vehicle speed rises above min cap floor."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 25) # 25 mph
|
||||
scenario.set_state(SpeedLimitAssistState.pending)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Update with speed above floor
|
||||
scenario.sla.update(
|
||||
True, False, 30 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
# Should resume capping if conditions allow
|
||||
assert scenario.sla.state in CAP_ACTIVE_STATES or scenario.sla.state == SpeedLimitAssistState.pending
|
||||
|
||||
def test_change_debounce_hold_new_limit(self, scenario_builder):
|
||||
"""FSM: New speed limit held for 1s before accepting (debounce)."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Start in capping with initial cap at 25 mph
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
scenario.sla._has_speed_limit = True
|
||||
|
||||
# First: establish baseline with 25 mph and pressed pedal (to establish state for edge detection)
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=True
|
||||
)
|
||||
|
||||
# Then: change limit to 35 mph, press pedal briefly, then release and wait for debounce to expire
|
||||
# Press pedal for first 0.5s (speed_limit=25 but speed_limit_final_last=35 simulates detection)
|
||||
for _ in range(int(0.5 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=True
|
||||
)
|
||||
|
||||
# Release pedal and wait for cap debounce + accel debounce to complete (0.7s more = 1.2s total)
|
||||
for _ in range(int(0.7 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=False
|
||||
)
|
||||
|
||||
# Cap should now be 35 mph after debounce and pedal release edge
|
||||
assert abs(scenario.sla._target_cap - 35 * CV.MPH_TO_MS) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_upshift_never_raise_keeps_old_cap(self, scenario_builder):
|
||||
"""FSM: NEVER_RAISE mode keeps cap unchanged when limit increases."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.NEVER_RAISE)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Start with cap at 25 mph
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Increase limit to 35 mph, wait for debounce to expire
|
||||
for _ in range(int(1.2 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# In NEVER_RAISE mode, cap stays at 25 mph (old value)
|
||||
assert abs(scenario.sla._target_cap - 25 * CV.MPH_TO_MS) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_upshift_accel_pedal_requires_release(self, scenario_builder):
|
||||
"""FSM: ACCEL_PEDAL mode accepts new cap only on pedal release."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Start with cap at 25 mph
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Pedal pressed with new limit 35 mph, wait 1.1s (exceeds cap debounce) but pedal still pressed
|
||||
for _ in range(int(1.1 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=True
|
||||
)
|
||||
|
||||
# Cap should still be 25 mph (pedal pressed, upshift rejected despite debounce)
|
||||
assert abs(scenario.sla._target_cap - 25 * CV.MPH_TO_MS) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_pedal_release_debounce_200ms(self, scenario_builder):
|
||||
"""FSM: Accel pedal release edge requires 200ms debounce."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitUpshiftAccept", UpshiftAccept.ACCEL_PEDAL)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# First establish the limit change and wait for cap change debounce to start
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=True
|
||||
)
|
||||
|
||||
# Pedal pressed, wait 0.6s, then release and wait 0.5s (total 1.1s > 1.0s cap debounce + 0.2s accel debounce)
|
||||
for _ in range(int(0.6 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=True
|
||||
)
|
||||
|
||||
# Release and wait for both debounces to complete
|
||||
for _ in range(int(0.5 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 35 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 35 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp, accel_pressed=False
|
||||
)
|
||||
|
||||
# Now cap should be updated to 35 mph (cap debounce complete + accel release edge detected and debounced)
|
||||
assert abs(scenario.sla._target_cap - 35 * CV.MPH_TO_MS) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_suspended_timer_after_long_override(self, scenario_builder):
|
||||
"""FSM: 1s suspension window after manual override release."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
scenario.sla.update(
|
||||
True, True, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
assert scenario.sla.state == SpeedLimitAssistState.disabled
|
||||
assert scenario.sla._cap_suspended_timer > 0 # frame counter
|
||||
|
||||
# Attempt to re-engage within suspension window (0.5s)
|
||||
for _ in range(int(0.5 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should still be disabled (suspension active)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
# Wait for suspension to expire (1.1s total)
|
||||
for _ in range(int(0.7 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Now cap can re-engage and transition to capping
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_audio_cue_fires_once_on_capping_entry(self, scenario_builder):
|
||||
"""Event: speedLimitCapActive fires once on entry to capping state."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitCapAudioCue", True)
|
||||
scenario.set_state(SpeedLimitAssistState.disabled)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Enter capping state
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Verify we transitioned to capping
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
# Audio cue flag should be set
|
||||
assert scenario.sla._cap_audio_cue_fired is True
|
||||
|
||||
def test_audio_cue_disabled_no_fire(self, scenario_builder):
|
||||
"""Event: speedLimitCapActive suppressed when audio cue disabled."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitCapAudioCue", False)
|
||||
scenario.set_state(SpeedLimitAssistState.disabled)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
# Enter capping state
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Verify we transitioned to capping
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
# Audio cue flag should NOT be set (disabled)
|
||||
assert scenario.sla._cap_audio_cue_fired is False
|
||||
|
||||
def test_cap_delta_ui_feedback(self, scenario_builder):
|
||||
"""UI: target_cap tracks posted limit, delta = v_cruise_cluster - target_cap (positive when cap below driver intent)."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
limit_ms = 25 * CV.MPH_TO_MS
|
||||
cruise_ms = 35 * CV.MPH_TO_MS
|
||||
scenario.set_speed_limits(limit_ms, 0)
|
||||
scenario.set_cruise_speeds(cruise_ms)
|
||||
|
||||
scenario.sla._target_cap = limit_ms
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
cruise_ms, limit_ms, limit_ms,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
assert abs(scenario.sla.output_v_target - limit_ms) < 0.1
|
||||
assert abs(scenario.sla._target_cap - limit_ms) < 0.1
|
||||
cap_delta = max(0., cruise_ms - scenario.sla._target_cap)
|
||||
assert cap_delta > 0
|
||||
assert abs(cap_delta - 10 * CV.MPH_TO_MS) < 0.1
|
||||
assert abs(scenario.sla.cap_delta - cap_delta) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_min_cap_floor_zero_disables_pause(self, scenario_builder):
|
||||
"""FSM: min cap floor 0 disables pause-on-low-speed behavior."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 0)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(5 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Very low speed (5 mph), but floor is 0
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
5 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should continue capping (floor 0 means no pause even at low speed)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
assert scenario.sla._cap_below_floor is False
|
||||
|
||||
def test_min_cap_floor_max_value_40_mph(self, scenario_builder):
|
||||
"""FSM: min cap floor clamped to 40 mph (reasonable max)."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("IsMetric", False)
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 40)
|
||||
scenario.sla.is_metric = False
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(30 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Speed 25 mph (limit) is below floor 40 mph, should pause
|
||||
for _ in range(int(0.5 / DT_MDL)):
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
30 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should transition to pending (below floor)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.pending
|
||||
assert scenario.sla._cap_below_floor is True
|
||||
|
||||
def test_v_target_clamped_to_cruise_when_capping(self, scenario_builder):
|
||||
"""Output: v_target = min(v_cruise, cap) when actively capping."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# v_target should be min(20, 25) = 20 m/s (driver cruise is limiting factor)
|
||||
assert abs(scenario.sla.output_v_target - 20 * CV.MPH_TO_MS) < 0.1
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
# Missing edge cases from matrix (Issues #1, #2, #5, #7, #16, #18, #19)
|
||||
|
||||
def test_school_zone_false_positive_25_mph_floor(self, scenario_builder):
|
||||
"""Edge case #1: OSM school zone 25 mph at inactive hours blocked by floor."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 25)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(11.18, 0) # 25 mph -> m/s
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 11.18
|
||||
|
||||
# Speed limit 25 mph = 11.18 m/s is at floor (not strictly above)
|
||||
scenario.sla.update(
|
||||
True, False, 11.18, 0,
|
||||
20 * CV.MPH_TO_MS, 11.18, 11.18,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should not pause at exactly floor (must be strictly above)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
|
||||
def test_construction_stale_25_mph_pauses(self, scenario_builder):
|
||||
"""Edge case #2: Construction temp speed limit 25 mph pauses on stale data."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_param("SpeedLimitMinCapFloor", 25)
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
# Stale construction limit (25 mph, below floor by resolver check)
|
||||
scenario.set_speed_limits(11.18, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 11.18
|
||||
|
||||
# Simulate old data with has_speed_limit=False (resolver cleared it)
|
||||
scenario.sla.update(
|
||||
True, False, 11.18, 0,
|
||||
20 * CV.MPH_TO_MS, 11.18, 11.18,
|
||||
False, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should transition to pending (lost limit)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.pending
|
||||
|
||||
@pytest.mark.parametrize("is_metric", [True, False])
|
||||
def test_kmh_mph_unit_conversion_border(self, scenario_builder, is_metric):
|
||||
"""Edge case #5: km/h ↔ mph border crossing unit conversion."""
|
||||
scenario = scenario_builder()
|
||||
scenario.params.put_bool("IsMetric", is_metric)
|
||||
# Reload SLA to pick up metric setting
|
||||
from opendbc.car.toyota.values import CAR as TOYOTA
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.tests.conftest import CarParamsFactory
|
||||
CP, CP_SP, _ = CarParamsFactory.create_car_interface(TOYOTA.TOYOTA_RAV4_TSS2)
|
||||
scenario.sla = SpeedLimitAssist(CP, CP_SP)
|
||||
scenario.sla.params.put_bool("IsMetric", is_metric)
|
||||
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
|
||||
if is_metric:
|
||||
# 65 km/h ≈ 40.4 mph
|
||||
scenario.set_speed_limits(65 * CV.KPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(60 * CV.KPH_TO_MS)
|
||||
scenario.sla._target_cap = 65 * CV.KPH_TO_MS
|
||||
else:
|
||||
# 40 mph ≈ 64.4 km/h
|
||||
scenario.set_speed_limits(40 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(35 * CV.MPH_TO_MS)
|
||||
scenario.sla._target_cap = 40 * CV.MPH_TO_MS
|
||||
|
||||
scenario.sla.update(
|
||||
True, False, scenario.sla._target_cap, 0,
|
||||
scenario.sla.v_cruise_cluster, scenario.sla._speed_limit,
|
||||
scenario.sla._speed_limit, True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should remain in capping with correct unit-aware cap
|
||||
assert scenario.sla.state == SpeedLimitAssistState.capping
|
||||
assert scenario.sla._has_speed_limit is True
|
||||
|
||||
def test_gps_tunnel_fade_10s_age_limit(self, scenario_builder):
|
||||
"""Edge case #7: GPS tunnel fade stale limit via LIMIT_MAX_MAP_DATA_AGE."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Simulate has_speed_limit=False (resolver expired data after 10s)
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
False, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should transition to pending (lost limit)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.pending
|
||||
assert scenario.sla.output_v_target == V_CRUISE_UNSET
|
||||
|
||||
def test_lost_speed_limit_mid_cap_to_pending(self, scenario_builder):
|
||||
"""Edge case #16: Lost speed limit mid-cap transitions to pending."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# Simulate limit suddenly unavailable
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
False, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should transition to pending, emit V_CRUISE_UNSET
|
||||
assert scenario.sla.state == SpeedLimitAssistState.pending
|
||||
assert scenario.sla.output_v_target == V_CRUISE_UNSET
|
||||
|
||||
def test_mode_param_off_disables_sla_on_pcm_op_long(self, scenario_builder):
|
||||
"""Edge case #18: SpeedLimitMode controls engagement on pcm_op_long cars."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.disabled)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(25 * CV.MPH_TO_MS, 0)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.set_param("SpeedLimitMode", 0)
|
||||
|
||||
scenario.sla.update(
|
||||
True, False, 25 * CV.MPH_TO_MS, 0,
|
||||
20 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS, 25 * CV.MPH_TO_MS,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
assert scenario.sla.state == SpeedLimitAssistState.disabled
|
||||
|
||||
def test_speed_limit_final_zero_with_has_limit_edge(self, scenario_builder):
|
||||
"""Edge case #19: has_speed_limit=true but speed_limit_final=0 -> pending."""
|
||||
scenario = scenario_builder()
|
||||
scenario.set_state(SpeedLimitAssistState.capping)
|
||||
scenario.set_engaged(True)
|
||||
scenario.set_speed_limits(0, 0) # Zero speed limit (bad data)
|
||||
scenario.set_cruise_speeds(20 * CV.MPH_TO_MS)
|
||||
|
||||
scenario.sla._target_cap = 25 * CV.MPH_TO_MS
|
||||
|
||||
# has_speed_limit=True but speed_limit_final_last=0
|
||||
scenario.sla.update(
|
||||
True, False, 0, 0,
|
||||
20 * CV.MPH_TO_MS, 0, 0,
|
||||
True, 0, scenario.events_sp
|
||||
)
|
||||
|
||||
# Should transition to pending (bad/zero limit treated as no limit)
|
||||
assert scenario.sla.state == SpeedLimitAssistState.pending
|
||||
|
||||
def test_non_pcm_path_unchanged(self, scenario_builder):
|
||||
"""Regression: Non-PCM update_state_machine_non_pcm_long() unchanged."""
|
||||
scenario = scenario_builder()
|
||||
# Verify method exists and has correct signature
|
||||
assert hasattr(scenario.sla, "update_state_machine_non_pcm_long")
|
||||
# Method signature check (internal implementation detail)
|
||||
@@ -216,6 +216,14 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
|
||||
Priority.LOW, VisualAlert.none, AudibleAlertSP.promptSingleHigh, 5.),
|
||||
},
|
||||
|
||||
EventNameSP.speedLimitCapActive: {
|
||||
ET.WARNING: Alert(
|
||||
"Speed Limit Capping",
|
||||
"",
|
||||
AlertStatus.normal, AlertSize.small,
|
||||
Priority.LOW, VisualAlert.none, AudibleAlertSP.promptSingleHigh, 5.),
|
||||
},
|
||||
|
||||
EventNameSP.speedLimitChanged: {
|
||||
ET.WARNING: Alert(
|
||||
"Set speed changed",
|
||||
|
||||
@@ -1227,6 +1227,41 @@
|
||||
"max": 30,
|
||||
"step": 1
|
||||
},
|
||||
"SpeedLimitUpshiftAccept": {
|
||||
"title": "Speed Limit Cap Upshift",
|
||||
"description": "Mode for accepting new speed limit changes in cap mode.",
|
||||
"options": [
|
||||
{
|
||||
"value": 0,
|
||||
"label": "Never Raise"
|
||||
},
|
||||
{
|
||||
"value": 1,
|
||||
"label": "Accel Pedal Confirm"
|
||||
}
|
||||
]
|
||||
},
|
||||
"SpeedLimitMinCapFloor": {
|
||||
"title": "Speed Limit Cap Floor",
|
||||
"description": "Minimum speed below which speed limit capping is paused.",
|
||||
"min": 0,
|
||||
"max": 40,
|
||||
"step": 1
|
||||
},
|
||||
"SpeedLimitCapAudioCue": {
|
||||
"title": "Speed Limit Cap Audio Cue",
|
||||
"description": "Enable audio cue when entering speed limit capping mode.",
|
||||
"options": [
|
||||
{
|
||||
"value": 0,
|
||||
"label": "Off"
|
||||
},
|
||||
{
|
||||
"value": 1,
|
||||
"label": "On"
|
||||
}
|
||||
]
|
||||
},
|
||||
"SshEnabled": {
|
||||
"title": "Enable SSH",
|
||||
"description": ""
|
||||
|
||||
Reference in New Issue
Block a user