diff --git a/sunnypilot/selfdrive/controls/lib/dec/constants.py b/sunnypilot/selfdrive/controls/lib/dec/constants.py index 4586afbc9f..765bb23757 100644 --- a/sunnypilot/selfdrive/controls/lib/dec/constants.py +++ b/sunnypilot/selfdrive/controls/lib/dec/constants.py @@ -1,17 +1,46 @@ +from openpilot.common.realtime import DT_MDL + + class WMACConstants: - # Lead detection parameters - LEAD_WINDOW_SIZE = 6 # Stable detection window - LEAD_PROB = 0.45 # Balanced threshold for lead detection + TRAJECTORY_SIZE = 33 + PARAM_READ_FRAMES = max(1, int(round(1.0 / DT_MDL))) - # Slow down detection parameters - SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable - SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios + EMERGENCY_HOLD_FRAMES = max(1, int(round(0.75 / DT_MDL))) + MIN_MODE_DURATION = {'acc': max(1, int(round(0.6 / DT_MDL))), 'blended': max(1, int(round(0.5 / DT_MDL)))} + ENTER_BLENDED_FRAMES = max(1, int(round(0.4 / DT_MDL))) + EXIT_BLENDED_FRAMES = max(1, int(round(0.35 / DT_MDL))) + STANDSTILL_FRAMES = max(1, int(round(0.2 / DT_MDL))) - # Optimized slow down distance curve - smooth and progressive + LEAD_PROB = 0.45 + LEAD_EXIT_PROB = 0.25 + LEAD_RISE_RATE = 1.0 + LEAD_FALL_RATE = 0.35 + RADAR_LEAD_ACC_PROB = 0.5 + RADAR_LEAD_ACC_EXIT_PROB = 0.4 + RADAR_LEAD_ACC_RISE_RATE = 1.0 + RADAR_LEAD_ACC_FALL_RATE = 0.25 + RADAR_LEAD_ACC_MAX_DREL = 80.0 + RADAR_LEAD_ACC_MAX_TTC = 6.0 + RADAR_LEAD_ACC_MIN_CLOSING_SPEED = -0.5 + + SLOW_DOWN_PROB = 0.5 + SLOW_DOWN_EXIT_PROB = 0.4 + SLOW_DOWN_RISE_RATE = 0.65 + SLOW_DOWN_FALL_RATE = 0.15 SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.] SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.] + URGENT_SLOW_DOWN_PROB = 0.85 - # Slowness detection parameters - SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection - SLOWNESS_PROB = 0.55 # Clear threshold for slowness - SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset + MODEL_DECEL_START = -0.5 + MODEL_DECEL_RANGE = 2.0 + ENDPOINT_URGENCY_GAIN = 1.3 + CRITICAL_ENDPOINT_FACTOR = 0.3 + CRITICAL_URGENCY_GAIN = 1.5 + SPEED_URGENCY_MIN = 25.0 + SPEED_URGENCY_RANGE = 80.0 + + SLOWNESS_PROB = 0.55 + SLOWNESS_EXIT_PROB = 0.45 + SLOWNESS_RISE_RATE = 0.35 + SLOWNESS_FALL_RATE = 0.5 + SLOWNESS_CRUISE_OFFSET = 1.025 diff --git a/sunnypilot/selfdrive/controls/lib/dec/dec.py b/sunnypilot/selfdrive/controls/lib/dec/dec.py index 46cad1b048..a10a8fc916 100644 --- a/sunnypilot/selfdrive/controls/lib/dec/dec.py +++ b/sunnypilot/selfdrive/controls/lib/dec/dec.py @@ -6,129 +6,116 @@ See the LICENSE.md file in the root directory for more details. """ # Version = 2025-6-30 -from cereal import messaging -from opendbc.car import structs -from numpy import interp -from openpilot.common.params import Params -from openpilot.common.realtime import DT_MDL -from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants from typing import Literal -# d-e2e, from modeldata.h -TRAJECTORY_SIZE = 33 -SET_MODE_TIMEOUT = 15 +from cereal import messaging +from numpy import interp +from opendbc.car import structs +from openpilot.common.params import Params +from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants -# Define the valid mode types ModeType = Literal['acc', 'blended'] -class SmoothKalmanFilter: - """Enhanced Kalman filter with smoothing for stable decision making.""" +def clip01(value: float) -> float: + return max(0.0, min(1.0, float(value))) - def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01, - alpha=1.0, smoothing_factor=0.85): - self.x = initial_value - self.P = 1.0 - self.R = measurement_noise - self.Q = process_noise - self.alpha = alpha - self.smoothing_factor = smoothing_factor - self.initialized = False - self.history = [] - self.max_history = 10 - self.confidence = 0.0 - def add_data(self, measurement): - if len(self.history) >= self.max_history: - self.history.pop(0) - self.history.append(measurement) +class SmoothedSignal: + def __init__(self, rise_rate: float, fall_rate: float, initial_value: float = 0.0): + self.rise_rate = clip01(rise_rate) + self.fall_rate = clip01(fall_rate) + self.value = clip01(initial_value) - if not self.initialized: - self.x = measurement - self.initialized = True - self.confidence = 0.1 - return + def update(self, measurement: float) -> float: + measurement = clip01(measurement) + rate = self.rise_rate if measurement > self.value else self.fall_rate + self.value += (measurement - self.value) * rate + return self.value - self.P = self.alpha * self.P + self.Q + def reset(self, value: float = 0.0) -> None: + self.value = clip01(value) - K = self.P / (self.P + self.R) - effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1 - innovation = measurement - self.x - self.x = self.x + effective_K * innovation - self.P = (1 - effective_K) * self.P +class HysteresisSignal: + def __init__(self, enter_threshold: float, exit_threshold: float, rise_rate: float, fall_rate: float): + self.enter_threshold = clip01(enter_threshold) + self.exit_threshold = clip01(exit_threshold) + self.filter = SmoothedSignal(rise_rate, fall_rate) + self.active = False - if abs(innovation) < 0.1: - self.confidence = min(1.0, self.confidence + 0.05) - else: - self.confidence = max(0.1, self.confidence - 0.02) + def update(self, measurement: float) -> bool: + value = self.filter.update(measurement) + threshold = self.exit_threshold if self.active else self.enter_threshold + self.active = value > threshold + return self.active - def get_value(self): - return self.x if self.initialized else None + def reset(self) -> None: + self.filter.reset() + self.active = False - def get_confidence(self): - return self.confidence - - def reset_data(self): - self.initialized = False - self.history = [] - self.confidence = 0.0 + @property + def value(self) -> float: + return self.filter.value class ModeTransitionManager: - """Manages smooth transitions between driving modes with hysteresis.""" - def __init__(self): self.current_mode: ModeType = 'acc' - self.mode_confidence = {'acc': 1.0, 'blended': 0.0} - self.transition_timeout = 0 - self.min_mode_duration = 10 self.mode_duration = 0 - self.emergency_override = False + self._pending_mode: ModeType = 'acc' + self._pending_count = 0 + self._blended_hold_frames = 0 - def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False): - # Emergency override for critical situations (stops, collisions) - if emergency: - self.emergency_override = True - self.current_mode = mode - self.transition_timeout = SET_MODE_TIMEOUT - self.mode_duration = 0 + def request_mode(self, mode: ModeType, immediate: bool = False, hold_frames: int = 0, cancel_hold: bool = False) -> None: + if immediate: + self._blended_hold_frames = max(self._blended_hold_frames, hold_frames) + self._pending_mode = mode + self._pending_count = 0 + self._switch_mode(mode) return - self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence) - for m in self.mode_confidence: - if m != mode: - self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05) + if cancel_hold and mode == 'acc': + self._blended_hold_frames = 0 - # Require minimum duration in current mode (unless emergency) - if self.mode_duration < self.min_mode_duration and not self.emergency_override: + if self._blended_hold_frames > 0: + mode = 'blended' + + if mode == self.current_mode: + self._pending_mode = mode + self._pending_count = 0 return - # Hysteresis: higher threshold for mode changes - confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response + if mode != self._pending_mode: + self._pending_mode = mode + self._pending_count = 1 + else: + self._pending_count += 1 - if self.mode_confidence[mode] > confidence_threshold: - if mode != self.current_mode and self.transition_timeout == 0: - self.transition_timeout = SET_MODE_TIMEOUT - self.current_mode = mode - self.mode_duration = 0 + if self.mode_duration < WMACConstants.MIN_MODE_DURATION[self.current_mode]: + return - def update(self): - if self.transition_timeout > 0: - self.transition_timeout -= 1 + required_count = WMACConstants.ENTER_BLENDED_FRAMES if mode == 'blended' else WMACConstants.EXIT_BLENDED_FRAMES + if self._pending_count >= required_count: + self._switch_mode(mode) + + def update(self) -> None: + if self._blended_hold_frames > 0: + self._blended_hold_frames -= 1 self.mode_duration += 1 - # Reset emergency override after some time - if self.emergency_override and self.mode_duration > 20: - self.emergency_override = False - - # Gradual confidence decay - for mode in self.mode_confidence: - self.mode_confidence[mode] *= 0.98 - def get_mode(self) -> ModeType: return self.current_mode + def _switch_mode(self, mode: ModeType) -> None: + if mode == self.current_mode: + return + + self.current_mode = mode + self.mode_duration = 0 + self._pending_mode = mode + self._pending_count = 0 + class DynamicExperimentalController: def __init__(self, CP: structs.CarParams, mpc, params=None): @@ -142,35 +129,33 @@ class DynamicExperimentalController: self._mode_manager = ModeTransitionManager() - # Smooth filters for stable decision making with faster response for critical scenarios - self._lead_filter = SmoothKalmanFilter( - measurement_noise=0.15, - process_noise=0.05, - alpha=1.02, - smoothing_factor=0.8 + self._lead_tracker = HysteresisSignal( + enter_threshold=WMACConstants.LEAD_PROB, + exit_threshold=WMACConstants.LEAD_EXIT_PROB, + rise_rate=WMACConstants.LEAD_RISE_RATE, + fall_rate=WMACConstants.LEAD_FALL_RATE, + ) + self._radar_acc_lead_tracker = HysteresisSignal( + enter_threshold=WMACConstants.RADAR_LEAD_ACC_PROB, + exit_threshold=WMACConstants.RADAR_LEAD_ACC_EXIT_PROB, + rise_rate=WMACConstants.RADAR_LEAD_ACC_RISE_RATE, + fall_rate=WMACConstants.RADAR_LEAD_ACC_FALL_RATE, + ) + self._slow_down_tracker = HysteresisSignal( + enter_threshold=WMACConstants.SLOW_DOWN_PROB, + exit_threshold=WMACConstants.SLOW_DOWN_EXIT_PROB, + rise_rate=WMACConstants.SLOW_DOWN_RISE_RATE, + fall_rate=WMACConstants.SLOW_DOWN_FALL_RATE, + ) + self._slowness_tracker = HysteresisSignal( + enter_threshold=WMACConstants.SLOWNESS_PROB, + exit_threshold=WMACConstants.SLOWNESS_EXIT_PROB, + rise_rate=WMACConstants.SLOWNESS_RISE_RATE, + fall_rate=WMACConstants.SLOWNESS_FALL_RATE, ) - self._slow_down_filter = SmoothKalmanFilter( - measurement_noise=0.1, - process_noise=0.1, - alpha=1.05, - smoothing_factor=0.7 - ) - - self._slowness_filter = SmoothKalmanFilter( - measurement_noise=0.1, - process_noise=0.06, - alpha=1.015, - smoothing_factor=0.92 - ) - - self._mpc_fcw_filter = SmoothKalmanFilter( - measurement_noise=0.2, - process_noise=0.1, - alpha=1.1, - smoothing_factor=0.5 - ) self._has_lead_filtered = False + self._has_radar_acc_lead = False self._has_slow_down = False self._has_slowness = False self._has_mpc_fcw = False @@ -179,13 +164,14 @@ class DynamicExperimentalController: self._has_standstill = False self._mpc_fcw_crash_cnt = 0 self._standstill_count = 0 - # debug + self._endpoint_x = float('inf') self._expected_distance = 0.0 self._trajectory_valid = False + self._raw_urgency = 0.0 def _read_params(self) -> None: - if self._frame % int(1. / DT_MDL) == 0: + if self._frame % WMACConstants.PARAM_READ_FRAMES == 0: self._enabled = self._params.get_bool("DynamicExperimentalControl") def mode(self) -> str: @@ -198,7 +184,6 @@ class DynamicExperimentalController: return self._active def set_mpc_fcw_crash_cnt(self) -> None: - """Set MPC FCW crash count""" self._mpc_fcw_crash_cnt = self._mpc.crash_cnt def _update_calculations(self, sm: messaging.SubMaster) -> None: @@ -210,179 +195,109 @@ class DynamicExperimentalController: self._v_cruise_kph = car_state.vCruise self._has_standstill = car_state.standstill - # standstill detection if self._has_standstill: - self._standstill_count = min(20, self._standstill_count + 1) + self._standstill_count = min(WMACConstants.STANDSTILL_FRAMES * 3, self._standstill_count + 1) else: self._standstill_count = max(0, self._standstill_count - 1) - # Lead detection - self._lead_filter.add_data(float(lead_one.status)) - lead_value = self._lead_filter.get_value() or 0.0 - self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB - - # MPC FCW detection - fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0 - self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0)) - self._has_mpc_fcw = fcw_filtered_value > 0.5 - - # Slow down detection + self._has_lead_filtered = self._lead_tracker.update(float(lead_one.status)) + self._has_radar_acc_lead = self._radar_acc_lead_tracker.update(self._radar_acc_lead_score(lead_one)) + self._has_mpc_fcw = self._mpc_fcw_crash_cnt > 0 self._calculate_slow_down(md) - # Slowness detection - if not (self._standstill_count > 5) and not self._has_slow_down: + if self._standstill_count > WMACConstants.STANDSTILL_FRAMES or self._has_slow_down: + self._slowness_tracker.reset() + self._has_slowness = False + else: current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET)) - self._slowness_filter.add_data(current_slowness) - slowness_value = self._slowness_filter.get_value() or 0.0 + self._has_slowness = self._slowness_tracker.update(current_slowness) - # Hysteresis for slowness - threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1) - self._has_slowness = slowness_value > threshold - - def _calculate_slow_down(self, md): - """Calculate urgency based on trajectory endpoint vs expected distance.""" - - # Reset to safe defaults - urgency = 0.0 + def _calculate_slow_down(self, md) -> None: self._endpoint_x = float('inf') + self._expected_distance = 0.0 self._trajectory_valid = False - #Require exact trajectory size - position_valid = len(md.position.x) == TRAJECTORY_SIZE - orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE + urgency = self._model_action_urgency(md) + position_valid = len(md.position.x) == WMACConstants.TRAJECTORY_SIZE - if not (position_valid and orientation_valid): - # Invalid trajectory - this itself might indicate a stop scenario - # Apply moderate urgency for incomplete trajectories at speed - if self._v_ego_kph > 20.0: - urgency = 0.3 + if position_valid: + self._trajectory_valid = True + self._endpoint_x = md.position.x[WMACConstants.TRAJECTORY_SIZE - 1] + self._expected_distance = interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST) + urgency = max(urgency, self._endpoint_urgency(self._endpoint_x, self._expected_distance)) - self._slow_down_filter.add_data(urgency) - urgency_filtered = self._slow_down_filter.get_value() or 0.0 - self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB - self._urgency = urgency_filtered - return + self._raw_urgency = clip01(urgency) + self._has_slow_down = self._slow_down_tracker.update(self._raw_urgency) + self._urgency = self._slow_down_tracker.value - # We have a valid full trajectory - self._trajectory_valid = True + def _radar_acc_lead_score(self, lead_one) -> float: + if not lead_one.status: + return 0.0 - # Use the exact endpoint (33rd point, index 32) - endpoint_x = md.position.x[TRAJECTORY_SIZE - 1] - self._endpoint_x = endpoint_x + d_rel = float(getattr(lead_one, 'dRel', float('inf'))) + v_rel = float(getattr(lead_one, 'vRel', 0.0)) + if d_rel <= WMACConstants.RADAR_LEAD_ACC_MAX_DREL: + return 1.0 + if v_rel <= WMACConstants.RADAR_LEAD_ACC_MIN_CLOSING_SPEED and d_rel / max(-v_rel, 0.1) <= WMACConstants.RADAR_LEAD_ACC_MAX_TTC: + return 1.0 + return 0.0 - # Get expected distance based on current speed using tuned constants - expected_distance = interp(self._v_ego_kph, - WMACConstants.SLOW_DOWN_BP, - WMACConstants.SLOW_DOWN_DIST) - self._expected_distance = expected_distance + def _model_action_urgency(self, md) -> float: + action = getattr(md, 'action', None) + if action is None: + return 0.0 - # Calculate urgency based on trajectory shortage - if endpoint_x < expected_distance: - shortage = expected_distance - endpoint_x - shortage_ratio = shortage / expected_distance + urgency = 1.0 if getattr(action, 'shouldStop', False) else 0.0 + desired_accel = getattr(action, 'desiredAcceleration', 0.0) + if desired_accel < WMACConstants.MODEL_DECEL_START: + urgency = max(urgency, min(1.0, (WMACConstants.MODEL_DECEL_START - desired_accel) / WMACConstants.MODEL_DECEL_RANGE)) + return urgency - # Base urgency on shortage ratio - urgency = min(1.0, shortage_ratio * 2.0) + def _endpoint_urgency(self, endpoint_x: float, expected_distance: float) -> float: + if endpoint_x >= expected_distance: + return 0.0 - # Increase urgency for very short trajectories (imminent stops) - critical_distance = expected_distance * 0.3 - if endpoint_x < critical_distance: - urgency = min(1.0, urgency * 2.0) + shortage_ratio = (expected_distance - endpoint_x) / expected_distance + urgency = min(1.0, shortage_ratio * WMACConstants.ENDPOINT_URGENCY_GAIN) - # Speed-based urgency adjustment - if self._v_ego_kph > 25.0: - speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0 - urgency = min(1.0, urgency * speed_factor) + if endpoint_x < expected_distance * WMACConstants.CRITICAL_ENDPOINT_FACTOR: + urgency = min(1.0, urgency * WMACConstants.CRITICAL_URGENCY_GAIN) - # Apply filtering but with less smoothing for stops - self._slow_down_filter.add_data(urgency) - urgency_filtered = self._slow_down_filter.get_value() or 0.0 + if self._v_ego_kph > WMACConstants.SPEED_URGENCY_MIN: + speed_factor = 1.0 + (self._v_ego_kph - WMACConstants.SPEED_URGENCY_MIN) / WMACConstants.SPEED_URGENCY_RANGE + urgency = min(1.0, urgency * speed_factor) - # Update state with lower threshold for better stop detection - self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8) - self._urgency = urgency_filtered + return urgency - def _radarless_mode(self) -> None: - """Radarless mode decision logic with emergency handling.""" + def _desired_mode(self) -> tuple[ModeType, bool]: + if not self._CP.radarUnavailable and self._has_radar_acc_lead: + return 'acc', False - # EMERGENCY: MPC FCW - immediate blended mode if self._has_mpc_fcw: - self._mode_manager.request_mode('blended', confidence=1.0, emergency=True) - return + return 'blended', True - # Standstill: use blended - if self._standstill_count > 3: - self._mode_manager.request_mode('blended', confidence=0.9) - return + standstill = self._standstill_count > WMACConstants.STANDSTILL_FRAMES + urgent_slow_down = self._has_slow_down and self._raw_urgency > WMACConstants.URGENT_SLOW_DOWN_PROB - # Slow down scenarios: emergency for high urgency, normal for lower urgency - if self._has_slow_down: - if self._urgency > 0.7: - # Emergency: immediate blended mode for high urgency stops - self._mode_manager.request_mode('blended', confidence=1.0, emergency=True) - else: - # Normal: blended with urgency-based confidence - confidence = min(1.0, self._urgency * 1.5) - self._mode_manager.request_mode('blended', confidence=confidence) - return + if self._CP.radarUnavailable: + if standstill or self._has_slow_down: + return 'blended', urgent_slow_down + return 'acc', False - # Driving slow: use ACC (but not if actively slowing down) - if self._has_slowness and not self._has_slow_down: - self._mode_manager.request_mode('acc', confidence=0.8) - return + if standstill or self._has_slow_down: + return 'blended', urgent_slow_down - # Default: ACC - self._mode_manager.request_mode('acc', confidence=0.7) - - def _radar_mode(self) -> None: - """Radar mode with emergency handling.""" - - # EMERGENCY: MPC FCW - immediate blended mode - if self._has_mpc_fcw: - self._mode_manager.request_mode('blended', confidence=1.0, emergency=True) - return - - # If lead detected and not in standstill: always use ACC - if self._has_lead_filtered and not (self._standstill_count > 3): - self._mode_manager.request_mode('acc', confidence=1.0) - return - - # Slow down scenarios: emergency for high urgency, normal for lower urgency - if self._has_slow_down: - if self._urgency > 0.7: - # Emergency: immediate blended mode for high urgency stops - self._mode_manager.request_mode('blended', confidence=1.0, emergency=True) - else: - # Normal: blended with urgency-based confidence - confidence = min(1.0, self._urgency * 1.3) - self._mode_manager.request_mode('blended', confidence=confidence) - return - - # Standstill: use blended - if self._standstill_count > 3: - self._mode_manager.request_mode('blended', confidence=0.9) - return - - # Driving slow: use ACC (but not if actively slowing down) - if self._has_slowness and not self._has_slow_down: - self._mode_manager.request_mode('acc', confidence=0.8) - return - - # Default: ACC - self._mode_manager.request_mode('acc', confidence=0.7) + return 'acc', False def update(self, sm: messaging.SubMaster) -> None: self._read_params() - self.set_mpc_fcw_crash_cnt() - self._update_calculations(sm) - if self._CP.radarUnavailable: - self._radarless_mode() - else: - self._radar_mode() - + mode, immediate = self._desired_mode() + self._mode_manager.request_mode(mode, immediate=immediate, hold_frames=WMACConstants.EMERGENCY_HOLD_FRAMES, + cancel_hold=self._has_radar_acc_lead) self._mode_manager.update() + self._active = sm['selfdriveState'].experimentalMode and self._enabled self._frame += 1 diff --git a/sunnypilot/selfdrive/controls/lib/dec/tests/pytest_dynamic_controller.py b/sunnypilot/selfdrive/controls/lib/dec/tests/pytest_dynamic_controller.py deleted file mode 100644 index f9da39c03b..0000000000 --- a/sunnypilot/selfdrive/controls/lib/dec/tests/pytest_dynamic_controller.py +++ /dev/null @@ -1,94 +0,0 @@ -import pytest - -from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController - -class MockLeadOne: - def __init__(self, status=0.0): - self.status = status - -class MockRadarState: - def __init__(self, status=0.0): - self.leadOne = MockLeadOne(status=status) - -class MockCarState: - def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False): - self.vEgo = vEgo - self.vCruise = vCruise - self.standstill = standstill - -class MockModelData: - def __init__(self, valid=True): - size = 33 if valid else 10 # incomplete if invalid - self.position = type("Pos", (), {"x": [0.0] * size})() - self.orientation = type("Ori", (), {"x": [0.0] * size})() - -class MockSelfDriveState: - def __init__(self, experimentalMode=False): - self.experimentalMode = experimentalMode - -class MockParams: - def get_bool(self, name): - return True - -@pytest.fixture -def default_sm(): - sm = { - 'carState': MockCarState(vEgo=10.0, vCruise=20.0), - 'radarState': MockRadarState(status=1.0), - 'modelV2': MockModelData(valid=True), - 'selfdriveState': MockSelfDriveState(experimentalMode=True), - } - return sm - -@pytest.fixture -def mock_cp(): - class CP: - radarUnavailable = False - return CP() - -@pytest.fixture -def mock_mpc(): - class MPC: - crash_cnt = 0 - return MPC() - -# Fake Kalman Filter that always returns a given value -class FakeKalman: - def __init__(self, value=1.0): - self.value = value - def add_data(self, v): pass - def get_value(self): return self.value - def get_confidence(self): return 1.0 - def reset_data(self): pass - -def test_initial_mode_is_acc(mock_cp, mock_mpc): - controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) - assert controller.mode() == "acc" - -def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm): - controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) - default_sm['carState'].standstill = True - for _ in range(10): - controller.update(default_sm) - assert controller.mode() == "blended" - -def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm): - controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) - mock_mpc.crash_cnt = 1 # simulate FCW - for _ in range(2): - controller.update(default_sm) - assert controller.mode() == "blended" - -def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm): - mock_cp.radarUnavailable = True - controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) - - # Force conditions to simulate slowdown - controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown - controller._v_ego_kph = 35.0 - default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory - - for _ in range(3): - controller.update(default_sm) - - assert controller.mode() == "blended" diff --git a/sunnypilot/selfdrive/controls/lib/dec/tests/test_dynamic_controller.py b/sunnypilot/selfdrive/controls/lib/dec/tests/test_dynamic_controller.py new file mode 100644 index 0000000000..d4fa748f7a --- /dev/null +++ b/sunnypilot/selfdrive/controls/lib/dec/tests/test_dynamic_controller.py @@ -0,0 +1,235 @@ +import pytest + +from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, HysteresisSignal + + +class MockLeadOne: + def __init__(self, status=0.0, dRel=30.0, vRel=0.0): + self.status = status + self.dRel = dRel + self.vRel = vRel + + +class MockRadarState: + def __init__(self, status=0.0, dRel=30.0, vRel=0.0): + self.leadOne = MockLeadOne(status=status, dRel=dRel, vRel=vRel) + + +class MockCarState: + def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False): + self.vEgo = vEgo + self.vCruise = vCruise + self.standstill = standstill + + +class MockAction: + def __init__(self, desiredAcceleration=0.0, shouldStop=False): + self.desiredAcceleration = desiredAcceleration + self.shouldStop = shouldStop + + +class MockModelData: + def __init__(self, valid=True, endpoint_x=200.0, orientation_valid=None, desired_acceleration=0.0, should_stop=False): + position_size = 33 if valid else 10 + orientation_size = position_size if orientation_valid is None else (33 if orientation_valid else 10) + position_x = [0.0] * position_size + if position_x: + position_x[-1] = endpoint_x + self.position = type("Pos", (), {"x": position_x})() + self.orientation = type("Ori", (), {"x": [0.0] * orientation_size})() + self.acceleration = type("Accel", (), {"x": [0.0] * position_size})() + self.action = MockAction(desired_acceleration, should_stop) + + +class MockSelfDriveState: + def __init__(self, experimentalMode=False): + self.experimentalMode = experimentalMode + + +class MockParams: + def get_bool(self, name): + return True + + +@pytest.fixture +def default_sm(): + sm = { + 'carState': MockCarState(vEgo=10.0, vCruise=20.0), + 'radarState': MockRadarState(status=1.0), + 'modelV2': MockModelData(valid=True), + 'selfdriveState': MockSelfDriveState(experimentalMode=True), + } + return sm + + +@pytest.fixture +def mock_cp(): + class CP: + radarUnavailable = False + return CP() + + +@pytest.fixture +def mock_mpc(): + class MPC: + crash_cnt = 0 + return MPC() + + +def test_initial_mode_is_acc(mock_cp, mock_mpc): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + assert controller.mode() == "acc" + + +def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['carState'].standstill = True + for _ in range(20): + controller.update(default_sm) + assert controller.mode() == "blended" + + +def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + mock_mpc.crash_cnt = 1 + controller.update(default_sm) + assert controller.mode() == "blended" + + +def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm): + mock_cp.radarUnavailable = True + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + + controller.update(default_sm) + + assert controller.mode() == "blended" + + +def test_valid_position_with_missing_orientation_can_trigger_slowdown(mock_cp, mock_mpc, default_sm): + mock_cp.radarUnavailable = True + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, orientation_valid=False) + + controller.update(default_sm) + + assert controller._trajectory_valid + assert controller.mode() == "blended" + + +def test_incomplete_position_does_not_trigger_slowdown(mock_cp, mock_mpc, default_sm): + mock_cp.radarUnavailable = True + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['modelV2'] = MockModelData(valid=False, endpoint_x=0.0) + + for _ in range(3): + controller.update(default_sm) + + assert not controller._trajectory_valid + assert not controller._has_slow_down + assert controller.mode() == "acc" + + +def test_slowdown_hysteresis_prevents_threshold_chatter(): + signal = HysteresisSignal(enter_threshold=0.5, exit_threshold=0.4, rise_rate=1.0, fall_rate=1.0) + + assert signal.update(0.55) + assert signal.update(0.45) + assert not signal.update(0.35) + + +def test_model_should_stop_triggers_blended_without_valid_trajectory(mock_cp, mock_mpc, default_sm): + mock_cp.radarUnavailable = True + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['modelV2'] = MockModelData(valid=False, should_stop=True) + + controller.update(default_sm) + + assert not controller._trajectory_valid + assert controller.mode() == "blended" + + +def test_radar_lead_keeps_acc_over_model_slowdown(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + + for _ in range(3): + controller.update(default_sm) + + assert controller._has_slow_down + assert controller._has_radar_acc_lead + assert controller.mode() == "acc" + + +def test_far_radar_lead_allows_blended_until_acc_relevant(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + + controller.update(default_sm) + + assert controller._has_lead_filtered + assert not controller._has_radar_acc_lead + assert controller.mode() == "blended" + + +def test_relevant_radar_lead_smoothly_returns_to_acc(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + controller.update(default_sm) + assert controller.mode() == "blended" + + default_sm['radarState'] = MockRadarState(status=1.0, dRel=45.0, vRel=0.0) + for _ in range(20): + controller.update(default_sm) + + assert controller._has_radar_acc_lead + assert controller.mode() == "acc" + + +def test_closing_far_radar_lead_returns_to_acc(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=-25.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + + for _ in range(20): + controller.update(default_sm) + + assert controller._has_radar_acc_lead + assert controller.mode() == "acc" + + +def test_radar_lead_keeps_acc_over_fcw_and_standstill(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0) + default_sm['carState'].standstill = True + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, should_stop=True) + mock_mpc.crash_cnt = 1 + + for _ in range(10): + controller.update(default_sm) + + assert controller._has_lead_filtered + assert controller._has_mpc_fcw + assert controller.mode() == "acc" + + +def test_lead_flicker_hold_prevents_one_frame_mode_flip(mock_cp, mock_mpc, default_sm): + controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams()) + default_sm['radarState'] = MockRadarState(status=1.0) + controller.update(default_sm) + + default_sm['radarState'] = MockRadarState(status=0.0) + default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0) + controller.update(default_sm) + + assert controller._has_lead_filtered + assert controller.mode() == "acc"