feat(dec): rework dynamic experimental controller

This commit is contained in:
rav4kumar
2026-06-02 17:24:36 -07:00
parent 04224e8747
commit 83de89e253
4 changed files with 446 additions and 361 deletions

View File

@@ -1,17 +1,46 @@
from openpilot.common.realtime import DT_MDL
class WMACConstants:
# Lead detection parameters
LEAD_WINDOW_SIZE = 6 # Stable detection window
LEAD_PROB = 0.45 # Balanced threshold for lead detection
TRAJECTORY_SIZE = 33
PARAM_READ_FRAMES = max(1, int(round(1.0 / DT_MDL)))
# Slow down detection parameters
SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable
SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios
EMERGENCY_HOLD_FRAMES = max(1, int(round(0.75 / DT_MDL)))
MIN_MODE_DURATION = {'acc': max(1, int(round(0.6 / DT_MDL))), 'blended': max(1, int(round(0.5 / DT_MDL)))}
ENTER_BLENDED_FRAMES = max(1, int(round(0.4 / DT_MDL)))
EXIT_BLENDED_FRAMES = max(1, int(round(0.35 / DT_MDL)))
STANDSTILL_FRAMES = max(1, int(round(0.2 / DT_MDL)))
# Optimized slow down distance curve - smooth and progressive
LEAD_PROB = 0.45
LEAD_EXIT_PROB = 0.25
LEAD_RISE_RATE = 1.0
LEAD_FALL_RATE = 0.35
RADAR_LEAD_ACC_PROB = 0.5
RADAR_LEAD_ACC_EXIT_PROB = 0.4
RADAR_LEAD_ACC_RISE_RATE = 1.0
RADAR_LEAD_ACC_FALL_RATE = 0.25
RADAR_LEAD_ACC_MAX_DREL = 80.0
RADAR_LEAD_ACC_MAX_TTC = 6.0
RADAR_LEAD_ACC_MIN_CLOSING_SPEED = -0.5
SLOW_DOWN_PROB = 0.5
SLOW_DOWN_EXIT_PROB = 0.4
SLOW_DOWN_RISE_RATE = 0.65
SLOW_DOWN_FALL_RATE = 0.15
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.]
URGENT_SLOW_DOWN_PROB = 0.85
# Slowness detection parameters
SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection
SLOWNESS_PROB = 0.55 # Clear threshold for slowness
SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset
MODEL_DECEL_START = -0.5
MODEL_DECEL_RANGE = 2.0
ENDPOINT_URGENCY_GAIN = 1.3
CRITICAL_ENDPOINT_FACTOR = 0.3
CRITICAL_URGENCY_GAIN = 1.5
SPEED_URGENCY_MIN = 25.0
SPEED_URGENCY_RANGE = 80.0
SLOWNESS_PROB = 0.55
SLOWNESS_EXIT_PROB = 0.45
SLOWNESS_RISE_RATE = 0.35
SLOWNESS_FALL_RATE = 0.5
SLOWNESS_CRUISE_OFFSET = 1.025

View File

@@ -6,129 +6,116 @@ See the LICENSE.md file in the root directory for more details.
"""
# Version = 2025-6-30
from cereal import messaging
from opendbc.car import structs
from numpy import interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
from typing import Literal
# d-e2e, from modeldata.h
TRAJECTORY_SIZE = 33
SET_MODE_TIMEOUT = 15
from cereal import messaging
from numpy import interp
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
# Define the valid mode types
ModeType = Literal['acc', 'blended']
class SmoothKalmanFilter:
"""Enhanced Kalman filter with smoothing for stable decision making."""
def clip01(value: float) -> float:
return max(0.0, min(1.0, float(value)))
def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01,
alpha=1.0, smoothing_factor=0.85):
self.x = initial_value
self.P = 1.0
self.R = measurement_noise
self.Q = process_noise
self.alpha = alpha
self.smoothing_factor = smoothing_factor
self.initialized = False
self.history = []
self.max_history = 10
self.confidence = 0.0
def add_data(self, measurement):
if len(self.history) >= self.max_history:
self.history.pop(0)
self.history.append(measurement)
class SmoothedSignal:
def __init__(self, rise_rate: float, fall_rate: float, initial_value: float = 0.0):
self.rise_rate = clip01(rise_rate)
self.fall_rate = clip01(fall_rate)
self.value = clip01(initial_value)
if not self.initialized:
self.x = measurement
self.initialized = True
self.confidence = 0.1
return
def update(self, measurement: float) -> float:
measurement = clip01(measurement)
rate = self.rise_rate if measurement > self.value else self.fall_rate
self.value += (measurement - self.value) * rate
return self.value
self.P = self.alpha * self.P + self.Q
def reset(self, value: float = 0.0) -> None:
self.value = clip01(value)
K = self.P / (self.P + self.R)
effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1
innovation = measurement - self.x
self.x = self.x + effective_K * innovation
self.P = (1 - effective_K) * self.P
class HysteresisSignal:
def __init__(self, enter_threshold: float, exit_threshold: float, rise_rate: float, fall_rate: float):
self.enter_threshold = clip01(enter_threshold)
self.exit_threshold = clip01(exit_threshold)
self.filter = SmoothedSignal(rise_rate, fall_rate)
self.active = False
if abs(innovation) < 0.1:
self.confidence = min(1.0, self.confidence + 0.05)
else:
self.confidence = max(0.1, self.confidence - 0.02)
def update(self, measurement: float) -> bool:
value = self.filter.update(measurement)
threshold = self.exit_threshold if self.active else self.enter_threshold
self.active = value > threshold
return self.active
def get_value(self):
return self.x if self.initialized else None
def reset(self) -> None:
self.filter.reset()
self.active = False
def get_confidence(self):
return self.confidence
def reset_data(self):
self.initialized = False
self.history = []
self.confidence = 0.0
@property
def value(self) -> float:
return self.filter.value
class ModeTransitionManager:
"""Manages smooth transitions between driving modes with hysteresis."""
def __init__(self):
self.current_mode: ModeType = 'acc'
self.mode_confidence = {'acc': 1.0, 'blended': 0.0}
self.transition_timeout = 0
self.min_mode_duration = 10
self.mode_duration = 0
self.emergency_override = False
self._pending_mode: ModeType = 'acc'
self._pending_count = 0
self._blended_hold_frames = 0
def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False):
# Emergency override for critical situations (stops, collisions)
if emergency:
self.emergency_override = True
self.current_mode = mode
self.transition_timeout = SET_MODE_TIMEOUT
self.mode_duration = 0
def request_mode(self, mode: ModeType, immediate: bool = False, hold_frames: int = 0, cancel_hold: bool = False) -> None:
if immediate:
self._blended_hold_frames = max(self._blended_hold_frames, hold_frames)
self._pending_mode = mode
self._pending_count = 0
self._switch_mode(mode)
return
self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence)
for m in self.mode_confidence:
if m != mode:
self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05)
if cancel_hold and mode == 'acc':
self._blended_hold_frames = 0
# Require minimum duration in current mode (unless emergency)
if self.mode_duration < self.min_mode_duration and not self.emergency_override:
if self._blended_hold_frames > 0:
mode = 'blended'
if mode == self.current_mode:
self._pending_mode = mode
self._pending_count = 0
return
# Hysteresis: higher threshold for mode changes
confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response
if mode != self._pending_mode:
self._pending_mode = mode
self._pending_count = 1
else:
self._pending_count += 1
if self.mode_confidence[mode] > confidence_threshold:
if mode != self.current_mode and self.transition_timeout == 0:
self.transition_timeout = SET_MODE_TIMEOUT
self.current_mode = mode
self.mode_duration = 0
if self.mode_duration < WMACConstants.MIN_MODE_DURATION[self.current_mode]:
return
def update(self):
if self.transition_timeout > 0:
self.transition_timeout -= 1
required_count = WMACConstants.ENTER_BLENDED_FRAMES if mode == 'blended' else WMACConstants.EXIT_BLENDED_FRAMES
if self._pending_count >= required_count:
self._switch_mode(mode)
def update(self) -> None:
if self._blended_hold_frames > 0:
self._blended_hold_frames -= 1
self.mode_duration += 1
# Reset emergency override after some time
if self.emergency_override and self.mode_duration > 20:
self.emergency_override = False
# Gradual confidence decay
for mode in self.mode_confidence:
self.mode_confidence[mode] *= 0.98
def get_mode(self) -> ModeType:
return self.current_mode
def _switch_mode(self, mode: ModeType) -> None:
if mode == self.current_mode:
return
self.current_mode = mode
self.mode_duration = 0
self._pending_mode = mode
self._pending_count = 0
class DynamicExperimentalController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
@@ -142,35 +129,33 @@ class DynamicExperimentalController:
self._mode_manager = ModeTransitionManager()
# Smooth filters for stable decision making with faster response for critical scenarios
self._lead_filter = SmoothKalmanFilter(
measurement_noise=0.15,
process_noise=0.05,
alpha=1.02,
smoothing_factor=0.8
self._lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.LEAD_PROB,
exit_threshold=WMACConstants.LEAD_EXIT_PROB,
rise_rate=WMACConstants.LEAD_RISE_RATE,
fall_rate=WMACConstants.LEAD_FALL_RATE,
)
self._radar_acc_lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.RADAR_LEAD_ACC_PROB,
exit_threshold=WMACConstants.RADAR_LEAD_ACC_EXIT_PROB,
rise_rate=WMACConstants.RADAR_LEAD_ACC_RISE_RATE,
fall_rate=WMACConstants.RADAR_LEAD_ACC_FALL_RATE,
)
self._slow_down_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOW_DOWN_PROB,
exit_threshold=WMACConstants.SLOW_DOWN_EXIT_PROB,
rise_rate=WMACConstants.SLOW_DOWN_RISE_RATE,
fall_rate=WMACConstants.SLOW_DOWN_FALL_RATE,
)
self._slowness_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOWNESS_PROB,
exit_threshold=WMACConstants.SLOWNESS_EXIT_PROB,
rise_rate=WMACConstants.SLOWNESS_RISE_RATE,
fall_rate=WMACConstants.SLOWNESS_FALL_RATE,
)
self._slow_down_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.1,
alpha=1.05,
smoothing_factor=0.7
)
self._slowness_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.06,
alpha=1.015,
smoothing_factor=0.92
)
self._mpc_fcw_filter = SmoothKalmanFilter(
measurement_noise=0.2,
process_noise=0.1,
alpha=1.1,
smoothing_factor=0.5
)
self._has_lead_filtered = False
self._has_radar_acc_lead = False
self._has_slow_down = False
self._has_slowness = False
self._has_mpc_fcw = False
@@ -179,13 +164,14 @@ class DynamicExperimentalController:
self._has_standstill = False
self._mpc_fcw_crash_cnt = 0
self._standstill_count = 0
# debug
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
self._raw_urgency = 0.0
def _read_params(self) -> None:
if self._frame % int(1. / DT_MDL) == 0:
if self._frame % WMACConstants.PARAM_READ_FRAMES == 0:
self._enabled = self._params.get_bool("DynamicExperimentalControl")
def mode(self) -> str:
@@ -198,7 +184,6 @@ class DynamicExperimentalController:
return self._active
def set_mpc_fcw_crash_cnt(self) -> None:
"""Set MPC FCW crash count"""
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
def _update_calculations(self, sm: messaging.SubMaster) -> None:
@@ -210,179 +195,109 @@ class DynamicExperimentalController:
self._v_cruise_kph = car_state.vCruise
self._has_standstill = car_state.standstill
# standstill detection
if self._has_standstill:
self._standstill_count = min(20, self._standstill_count + 1)
self._standstill_count = min(WMACConstants.STANDSTILL_FRAMES * 3, self._standstill_count + 1)
else:
self._standstill_count = max(0, self._standstill_count - 1)
# Lead detection
self._lead_filter.add_data(float(lead_one.status))
lead_value = self._lead_filter.get_value() or 0.0
self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB
# MPC FCW detection
fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0
self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0))
self._has_mpc_fcw = fcw_filtered_value > 0.5
# Slow down detection
self._has_lead_filtered = self._lead_tracker.update(float(lead_one.status))
self._has_radar_acc_lead = self._radar_acc_lead_tracker.update(self._radar_acc_lead_score(lead_one))
self._has_mpc_fcw = self._mpc_fcw_crash_cnt > 0
self._calculate_slow_down(md)
# Slowness detection
if not (self._standstill_count > 5) and not self._has_slow_down:
if self._standstill_count > WMACConstants.STANDSTILL_FRAMES or self._has_slow_down:
self._slowness_tracker.reset()
self._has_slowness = False
else:
current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
self._slowness_filter.add_data(current_slowness)
slowness_value = self._slowness_filter.get_value() or 0.0
self._has_slowness = self._slowness_tracker.update(current_slowness)
# Hysteresis for slowness
threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1)
self._has_slowness = slowness_value > threshold
def _calculate_slow_down(self, md):
"""Calculate urgency based on trajectory endpoint vs expected distance."""
# Reset to safe defaults
urgency = 0.0
def _calculate_slow_down(self, md) -> None:
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
#Require exact trajectory size
position_valid = len(md.position.x) == TRAJECTORY_SIZE
orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE
urgency = self._model_action_urgency(md)
position_valid = len(md.position.x) == WMACConstants.TRAJECTORY_SIZE
if not (position_valid and orientation_valid):
# Invalid trajectory - this itself might indicate a stop scenario
# Apply moderate urgency for incomplete trajectories at speed
if self._v_ego_kph > 20.0:
urgency = 0.3
if position_valid:
self._trajectory_valid = True
self._endpoint_x = md.position.x[WMACConstants.TRAJECTORY_SIZE - 1]
self._expected_distance = interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST)
urgency = max(urgency, self._endpoint_urgency(self._endpoint_x, self._expected_distance))
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB
self._urgency = urgency_filtered
return
self._raw_urgency = clip01(urgency)
self._has_slow_down = self._slow_down_tracker.update(self._raw_urgency)
self._urgency = self._slow_down_tracker.value
# We have a valid full trajectory
self._trajectory_valid = True
def _radar_acc_lead_score(self, lead_one) -> float:
if not lead_one.status:
return 0.0
# Use the exact endpoint (33rd point, index 32)
endpoint_x = md.position.x[TRAJECTORY_SIZE - 1]
self._endpoint_x = endpoint_x
d_rel = float(getattr(lead_one, 'dRel', float('inf')))
v_rel = float(getattr(lead_one, 'vRel', 0.0))
if d_rel <= WMACConstants.RADAR_LEAD_ACC_MAX_DREL:
return 1.0
if v_rel <= WMACConstants.RADAR_LEAD_ACC_MIN_CLOSING_SPEED and d_rel / max(-v_rel, 0.1) <= WMACConstants.RADAR_LEAD_ACC_MAX_TTC:
return 1.0
return 0.0
# Get expected distance based on current speed using tuned constants
expected_distance = interp(self._v_ego_kph,
WMACConstants.SLOW_DOWN_BP,
WMACConstants.SLOW_DOWN_DIST)
self._expected_distance = expected_distance
def _model_action_urgency(self, md) -> float:
action = getattr(md, 'action', None)
if action is None:
return 0.0
# Calculate urgency based on trajectory shortage
if endpoint_x < expected_distance:
shortage = expected_distance - endpoint_x
shortage_ratio = shortage / expected_distance
urgency = 1.0 if getattr(action, 'shouldStop', False) else 0.0
desired_accel = getattr(action, 'desiredAcceleration', 0.0)
if desired_accel < WMACConstants.MODEL_DECEL_START:
urgency = max(urgency, min(1.0, (WMACConstants.MODEL_DECEL_START - desired_accel) / WMACConstants.MODEL_DECEL_RANGE))
return urgency
# Base urgency on shortage ratio
urgency = min(1.0, shortage_ratio * 2.0)
def _endpoint_urgency(self, endpoint_x: float, expected_distance: float) -> float:
if endpoint_x >= expected_distance:
return 0.0
# Increase urgency for very short trajectories (imminent stops)
critical_distance = expected_distance * 0.3
if endpoint_x < critical_distance:
urgency = min(1.0, urgency * 2.0)
shortage_ratio = (expected_distance - endpoint_x) / expected_distance
urgency = min(1.0, shortage_ratio * WMACConstants.ENDPOINT_URGENCY_GAIN)
# Speed-based urgency adjustment
if self._v_ego_kph > 25.0:
speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0
urgency = min(1.0, urgency * speed_factor)
if endpoint_x < expected_distance * WMACConstants.CRITICAL_ENDPOINT_FACTOR:
urgency = min(1.0, urgency * WMACConstants.CRITICAL_URGENCY_GAIN)
# Apply filtering but with less smoothing for stops
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
if self._v_ego_kph > WMACConstants.SPEED_URGENCY_MIN:
speed_factor = 1.0 + (self._v_ego_kph - WMACConstants.SPEED_URGENCY_MIN) / WMACConstants.SPEED_URGENCY_RANGE
urgency = min(1.0, urgency * speed_factor)
# Update state with lower threshold for better stop detection
self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8)
self._urgency = urgency_filtered
return urgency
def _radarless_mode(self) -> None:
"""Radarless mode decision logic with emergency handling."""
def _desired_mode(self) -> tuple[ModeType, bool]:
if not self._CP.radarUnavailable and self._has_radar_acc_lead:
return 'acc', False
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
return 'blended', True
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
standstill = self._standstill_count > WMACConstants.STANDSTILL_FRAMES
urgent_slow_down = self._has_slow_down and self._raw_urgency > WMACConstants.URGENT_SLOW_DOWN_PROB
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.5)
self._mode_manager.request_mode('blended', confidence=confidence)
return
if self._CP.radarUnavailable:
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
return 'acc', False
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
def _radar_mode(self) -> None:
"""Radar mode with emergency handling."""
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
# If lead detected and not in standstill: always use ACC
if self._has_lead_filtered and not (self._standstill_count > 3):
self._mode_manager.request_mode('acc', confidence=1.0)
return
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.3)
self._mode_manager.request_mode('blended', confidence=confidence)
return
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
return 'acc', False
def update(self, sm: messaging.SubMaster) -> None:
self._read_params()
self.set_mpc_fcw_crash_cnt()
self._update_calculations(sm)
if self._CP.radarUnavailable:
self._radarless_mode()
else:
self._radar_mode()
mode, immediate = self._desired_mode()
self._mode_manager.request_mode(mode, immediate=immediate, hold_frames=WMACConstants.EMERGENCY_HOLD_FRAMES,
cancel_hold=self._has_radar_acc_lead)
self._mode_manager.update()
self._active = sm['selfdriveState'].experimentalMode and self._enabled
self._frame += 1

View File

@@ -1,94 +0,0 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
class MockLeadOne:
def __init__(self, status=0.0):
self.status = status
class MockRadarState:
def __init__(self, status=0.0):
self.leadOne = MockLeadOne(status=status)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockModelData:
def __init__(self, valid=True):
size = 33 if valid else 10 # incomplete if invalid
self.position = type("Pos", (), {"x": [0.0] * size})()
self.orientation = type("Ori", (), {"x": [0.0] * size})()
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
# Fake Kalman Filter that always returns a given value
class FakeKalman:
def __init__(self, value=1.0):
self.value = value
def add_data(self, v): pass
def get_value(self): return self.value
def get_confidence(self): return 1.0
def reset_data(self): pass
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['carState'].standstill = True
for _ in range(10):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
mock_mpc.crash_cnt = 1 # simulate FCW
for _ in range(2):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
# Force conditions to simulate slowdown
controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown
controller._v_ego_kph = 35.0
default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory
for _ in range(3):
controller.update(default_sm)
assert controller.mode() == "blended"

View File

@@ -0,0 +1,235 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, HysteresisSignal
class MockLeadOne:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.status = status
self.dRel = dRel
self.vRel = vRel
class MockRadarState:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.leadOne = MockLeadOne(status=status, dRel=dRel, vRel=vRel)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockAction:
def __init__(self, desiredAcceleration=0.0, shouldStop=False):
self.desiredAcceleration = desiredAcceleration
self.shouldStop = shouldStop
class MockModelData:
def __init__(self, valid=True, endpoint_x=200.0, orientation_valid=None, desired_acceleration=0.0, should_stop=False):
position_size = 33 if valid else 10
orientation_size = position_size if orientation_valid is None else (33 if orientation_valid else 10)
position_x = [0.0] * position_size
if position_x:
position_x[-1] = endpoint_x
self.position = type("Pos", (), {"x": position_x})()
self.orientation = type("Ori", (), {"x": [0.0] * orientation_size})()
self.acceleration = type("Accel", (), {"x": [0.0] * position_size})()
self.action = MockAction(desired_acceleration, should_stop)
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['carState'].standstill = True
for _ in range(20):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
mock_mpc.crash_cnt = 1
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
def test_valid_position_with_missing_orientation_can_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, orientation_valid=False)
controller.update(default_sm)
assert controller._trajectory_valid
assert controller.mode() == "blended"
def test_incomplete_position_does_not_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert not controller._trajectory_valid
assert not controller._has_slow_down
assert controller.mode() == "acc"
def test_slowdown_hysteresis_prevents_threshold_chatter():
signal = HysteresisSignal(enter_threshold=0.5, exit_threshold=0.4, rise_rate=1.0, fall_rate=1.0)
assert signal.update(0.55)
assert signal.update(0.45)
assert not signal.update(0.35)
def test_model_should_stop_triggers_blended_without_valid_trajectory(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, should_stop=True)
controller.update(default_sm)
assert not controller._trajectory_valid
assert controller.mode() == "blended"
def test_radar_lead_keeps_acc_over_model_slowdown(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert controller._has_slow_down
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_far_radar_lead_allows_blended_until_acc_relevant(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert not controller._has_radar_acc_lead
assert controller.mode() == "blended"
def test_relevant_radar_lead_smoothly_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
default_sm['radarState'] = MockRadarState(status=1.0, dRel=45.0, vRel=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_closing_far_radar_lead_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=-25.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_radar_lead_keeps_acc_over_fcw_and_standstill(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['carState'].standstill = True
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, should_stop=True)
mock_mpc.crash_cnt = 1
for _ in range(10):
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller._has_mpc_fcw
assert controller.mode() == "acc"
def test_lead_flicker_hold_prevents_one_frame_mode_flip(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
controller.update(default_sm)
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller.mode() == "acc"