Compare commits

...

52 Commits

Author SHA1 Message Date
rav4kumar cb0bc5746d fix(long): bugs 2026-07-02 08:29:28 -07:00
rav4kumar 15ddcc2026 refactor(long): tfollow + lead denoiser and output shaping 2026-07-01 12:39:22 -07:00
rav4kumar 5cfa627b1e feat(long): hard brake 2026-07-01 12:38:34 -07:00
rav4kumar c241b7f9e5 refactor(long): 2026-06-30 21:10:24 -07:00
rav4kumar 1cae2e14b9 feat(long): tfllow 2026-06-30 13:31:24 -07:00
rav4kumar bc498ea6af refactor(long): cleam up 2026-06-30 10:45:17 -07:00
rav4kumar c0e08181df feat(long): physics decel cap 2026-06-29 13:44:00 -07:00
rav4kumar bbe7b01adc Reapply "feat(dec): rework dynamic experimental controller"
This reverts commit cc8115141f.
2026-06-28 12:19:11 -07:00
rav4kumar 9f3fa8ceb7 feat(long): lead jitter smoother for trackId 2026-06-28 12:18:02 -07:00
rav4kumar df54f8e083 feat(long): gas suppression near a lead 2026-06-27 11:23:26 -07:00
rav4kumar 4d351bdcad feat(long): RadarDistance lead instability 2026-06-27 11:08:17 -07:00
rav4kumar bca4be26cd fix(long): comfort_stop and vLead 2026-06-26 12:22:30 -07:00
rav4kumar cc8115141f Revert "feat(dec): rework dynamic experimental controller"
This reverts commit 83de89e253.
2026-06-26 11:54:24 -07:00
rav4kumar d7af8bfc4d fix(long): RadarDistance drop out 2026-06-26 11:53:44 -07:00
rav4kumar b30e52261e fix(long): remove RadarDistance vLead 2026-06-24 23:56:41 -07:00
rav4kumar e09bc59ea3 fix(long): comfort stop 2026-06-24 12:44:55 -07:00
rav4kumar f9e7974e02 feat(long): comfort stop 2026-06-24 10:51:55 -07:00
rav4kumar 8c902576dc long stopping 2026-06-23 11:24:06 -07:00
rav4kumar bc96b6a6ce fix(long): remove onset jerk-cap 2026-06-22 13:21:30 -07:00
rav4kumar 0a68face78 long refactor 2026-06-21 13:35:31 -07:00
rav4kumar 78307a31f1 feat(long): jerk-limit 2026-06-20 12:19:48 -07:00
rav4kumar d10577dc2c ref 2026-06-20 11:40:52 -07:00
rav4kumar 37f19a35b6 tune 2026-06-19 23:12:18 -07:00
rav4kumar 2276e9d47d tune long 2026-06-19 13:22:51 -07:00
rav4kumar 0a167d5024 refactor(long): clean up 2026-06-18 11:43:36 -07:00
rav4kumar 21e0583752 Fix RELC road edge detection 2026-06-18 11:14:06 -07:00
rav4kumar 88dad50ee1 Enable relc for stock modeld 2026-06-17 13:48:07 -07:00
rav4kumar 9f20c56b2f feat(long): roomier stops + smoother decel 2026-06-17 11:13:05 -07:00
rav4kumar a2b50e5a8b feat(long): prompt ECO launch from stop 2026-06-16 12:05:57 -07:00
Kumar bebe8f0b04 Update constants.py 2026-06-16 06:27:59 -07:00
rav4kumar aa23fd0e4a tuneref 2026-06-14 12:15:24 -07:00
rav4kumar 8f942eefd7 ref 2026-06-12 11:56:04 -07:00
rav4kumar 9487f880e2 feat(long): convex brakeonset shaper 2026-06-12 11:55:36 -07:00
rav4kumar 17b097b434 tune 2026-06-12 07:14:01 -07:00
rav4kumar 17eca0ab43 gental 2026-06-10 21:34:18 -07:00
rav4kumar 634ba8f6b2 sl params 2026-06-10 11:38:31 -07:00
rav4kumar a45487f829 feat(long): radar distance controller 2026-06-10 11:38:31 -07:00
rav4kumar e790ce047b refactor(long): strip accel personality to lean core 2026-06-09 22:07:13 -07:00
rav4kumar 96d7850888 feat(long): fix accelersonality rubbernband 2026-06-09 22:02:56 -07:00
Kumar 6bf721a8c9 Update longitudinal_planner.py 2026-06-09 17:25:14 -07:00
rav4kumar bc6dbf8ca1 feat(long): acceleration controller 2026-06-09 15:03:02 -07:00
rav4kumar ffb7bbbbc4 ref 2026-06-09 12:41:01 -07:00
rav4kumar 83de89e253 feat(dec): rework dynamic experimental controller 2026-06-06 11:45:24 -07:00
rav4kumar 04224e8747 Add custom params to sunnylink settings 2026-06-06 10:31:17 -07:00
rav4kumar 6012ebb7c7 ref 2026-06-06 10:30:46 -07:00
rav4kumar e91dbe351e ref 2026-06-05 11:29:37 -07:00
rav4kumar e25061fd08 fix mapd scorll 2026-06-05 11:28:47 -07:00
rav4kumar baf56ae324 feat/relc 2026-06-05 11:06:06 -07:00
rav4kumar 9badd3fa40 mici-sla-ui 2026-06-05 11:05:29 -07:00
rav4kumar 2220e7fc11 point the submodule 2026-06-05 11:04:53 -07:00
rav4kumar e862935209 toyota sp link and drive mode btn support 2026-06-05 11:04:49 -07:00
rav4kumar 9bd504a5cb abh, bsm 2026-06-05 11:04:43 -07:00
48 changed files with 2487 additions and 383 deletions
+1
View File
@@ -4,6 +4,7 @@
[submodule "opendbc"]
path = opendbc_repo
url = https://github.com/sunnypilot/opendbc.git
branch = tn
[submodule "msgq"]
path = msgq_repo
url = https://github.com/commaai/msgq.git
+24
View File
@@ -194,6 +194,7 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
aTarget @5 :Float32;
events @6 :List(OnroadEventSP.Event);
e2eAlerts @7 :E2eAlerts;
acceleration @8 :Acceleration;
struct DynamicExperimentalControl {
state @0 :DynamicExperimentalControlState;
@@ -296,6 +297,26 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
greenLightAlert @0 :Bool;
leadDepartAlert @1 :Bool;
}
# Acceleration Personality (Eco / Normal / Sport)
struct Acceleration {
personality @0 :AccelerationPersonality;
enabled @1 :Bool;
maxAccel @2 :Float32; # current speed-indexed accel ceiling
brakeNeed @3 :Float32; # repurposed: follow-gap widen added on top of the stock t_follow (s)
decelTarget @4 :Float32; # repurposed: t_follow handed to the MPC (s)
smoothActive @5 :Bool; # repurposed: follow-gap widen currently active
bypassed @6 :Bool; # unused (input-shaping design has no output post-shaping / bypass)
comfortStopActive @7 :Bool; # low-speed comfort decel-to-stop floor currently governing (behind a near-stopped lead)
comfortStopFloor @8 :Float32; # comfort-stop floor commanded (m/s^2, negative; 0 when not engaged)
leadUnstable @9 :Bool; # RadarDistance lead-instability telemetry (bimodal/bouncing radar lead; informational, no control effect yet)
}
enum AccelerationPersonality {
eco @0;
normal @1;
sport @2;
}
}
struct OnroadEventSP @0xda96579883444c35 {
@@ -342,6 +363,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
laneChangeRoadEdge @24;
}
}
@@ -448,6 +470,8 @@ struct LiveMapDataSP @0xf416ec09499d9d19 {
struct ModelDataV2SP @0xa1680744031fdb2d {
laneTurnDirection @0 :TurnDirection;
leftLaneChangeEdgeBlock @1 :Bool;
rightLaneChangeEdgeBlock @2 :Bool;
enum TurnDirection {
none @0;
+16
View File
@@ -4,6 +4,7 @@
#include <unordered_map>
#include "cereal/gen/cpp/log.capnp.h"
#include "cereal/gen/cpp/custom.capnp.h"
inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"AccessToken", {CLEAR_ON_MANAGER_START | DONT_LOG, STRING}},
@@ -179,12 +180,19 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RoadEdgeLaneChangeEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowTurnSignals", {PERSISTENT | BACKUP, BOOL, "0"}},
{"StandstillTimer", {PERSISTENT | BACKUP, BOOL, "0"}},
{"TrueVEgoUI", {PERSISTENT | BACKUP, BOOL, "0"}},
// toyota specific params
{"ToyotaAutoHold", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaEnhancedBsm", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaTSS2Long", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ToyotaDriveMode", {PERSISTENT | BACKUP, BOOL, "0"}},
// MADS params
{"Mads", {PERSISTENT | BACKUP, BOOL, "1"}},
{"MadsMainCruiseAllowed", {PERSISTENT | BACKUP, BOOL, "1"}},
@@ -228,6 +236,14 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"DynamicExperimentalControl", {PERSISTENT | BACKUP, BOOL, "0"}},
{"BlindSpot", {PERSISTENT | BACKUP, BOOL, "0"}},
// Acceleration Personality (Eco / Normal / Sport)
{"AccelPersonalityEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"AccelPersonality", {PERSISTENT | BACKUP, INT, std::to_string(static_cast<int>(cereal::LongitudinalPlanSP::AccelerationPersonality::NORMAL))}},
// Radar Distance: de-noise the lead the MPC follows (flicker-hold + churn smoother); never reports a
// farther/faster lead than reality, so braking stays >= stock
{"RadarDistance", {PERSISTENT | BACKUP, BOOL, "0"}},
// sunnypilot model params
{"CameraOffset", {PERSISTENT | BACKUP, FLOAT, "0.0"}},
{"LagdToggle", {PERSISTENT | BACKUP, BOOL, "1"}},
+7 -1
View File
@@ -10,7 +10,7 @@ from cereal import car, log, custom
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog, ForwardingHandler
from opendbc.safety import ALTERNATIVE_EXPERIENCE
from opendbc.car import DT_CTRL, structs
from opendbc.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from opendbc.car.carlog import carlog
@@ -121,7 +121,13 @@ class Car:
self.CI, self.CP, self.CP_SP = CI, CI.CP, CI.CP_SP
self.RI = RI
# set alternative experiences from parameters
sp_toyota_auto_brake_hold = self.params.get_bool("ToyotaAutoHold")
self.CP.alternativeExperience = 0
if sp_toyota_auto_brake_hold:
self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.ALLOW_AEB
# mads
set_alternative_experience(self.CP, self.CP_SP, self.params)
set_car_specific_params(self.CP, self.CP_SP, self.params)
+3 -3
View File
@@ -56,7 +56,7 @@ class DesireHelper:
def get_lane_change_direction(CS):
return LaneChangeDirection.left if CS.leftBlinker else LaneChangeDirection.right
def update(self, carstate, lateral_active, lane_change_prob):
def update(self, carstate, lateral_active, lane_change_prob, left_edge_detected=False, right_edge_detected=False):
self.alc.update_params()
self.lane_turn_controller.update_params()
v_ego = carstate.vEgo
@@ -88,8 +88,8 @@ class DesireHelper:
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = (((carstate.leftBlindspot or left_edge_detected) and self.lane_change_direction == LaneChangeDirection.left) or
((carstate.rightBlindspot or right_edge_detected) and self.lane_change_direction == LaneChangeDirection.right))
self.alc.update_lane_change(blindspot_detected, carstate.brakePressed)
@@ -219,6 +219,7 @@ class LongitudinalMpc:
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.reset()
self.source = LongitudinalPlanSource.cruise
self.t_follow_fn = None # sunnypilot: optional (t_follow, v_ego)->t_follow override; None == byte-stock
def reset(self):
self.solver.reset()
@@ -316,6 +317,8 @@ class LongitudinalMpc:
def update(self, radarstate, v_cruise, personality=log.LongitudinalPersonality.standard):
t_follow = get_T_FOLLOW(personality)
v_ego = self.x0[1]
if self.t_follow_fn is not None:
t_follow = self.t_follow_fn(t_follow, v_ego)
self.status = radarstate.leadOne.status or radarstate.leadTwo.status
lead_xv_0 = self.process_lead(radarstate.leadOne)
+13 -5
View File
@@ -52,6 +52,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
self.CP = CP
self.mpc = LongitudinalMpc(dt=dt)
LongitudinalPlannerSP.__init__(self, self.CP, CP_SP, self.mpc)
self.mpc.t_follow_fn = self.accel.get_t_follow # Acceleration Personality: add-only follow-gap widen
self.fcw = False
self.dt = dt
self.allow_throttle = True
@@ -110,7 +111,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_clip = [ACCEL_MIN, get_max_accel(v_ego)]
accel_clip = [ACCEL_MIN, self.accel.get_max_accel(v_ego)]
steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg
accel_clip = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_clip, self.CP)
@@ -118,6 +119,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
self.v_desired_filter.x = v_ego
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = np.clip(sm['carState'].aEgo, accel_clip[0], accel_clip[1])
self.accel.reset() # drop any accumulated follow-gap widen so it re-ramps cleanly on re-engage
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
@@ -138,7 +140,7 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
self.mpc.set_weights(prev_accel_constraint, personality=sm['selfdriveState'].personality)
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
self.mpc.update(sm['radarState'], v_cruise, personality=sm['selfdriveState'].personality)
self.mpc.update(self.smooth_radarstate(sm['radarState']), v_cruise, personality=sm['selfdriveState'].personality)
self.v_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
@@ -160,7 +162,8 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
output_a_target_e2e = sm['modelV2'].action.desiredAcceleration
output_should_stop_e2e = sm['modelV2'].action.shouldStop
if self.is_e2e(sm):
is_e2e = self.is_e2e(sm)
if is_e2e:
output_a_target = min(output_a_target_e2e, output_a_target_mpc)
self.output_should_stop = output_should_stop_e2e or output_should_stop_mpc
if output_a_target < output_a_target_mpc:
@@ -169,8 +172,13 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
output_a_target = output_a_target_mpc
self.output_should_stop = output_should_stop_mpc
for idx in range(2):
accel_clip[idx] = np.clip(accel_clip[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
# Acceleration Personality shapes only MPC INPUTS (accel ceiling above + t_follow via mpc.t_follow_fn),
# never the output accel -- output_a_target passes through byte-stock so the MPC owns the trajectory.
# Lower (braking) bound and the ceiling's downward slew stay at the stock rate; only the ceiling's
# upward slew is tier-dependent (Acceleration Personality).
accel_clip[0] = np.clip(accel_clip[0], self.prev_accel_clip[0] - 0.05, self.prev_accel_clip[0] + 0.05)
accel_clip[1] = np.clip(accel_clip[1], self.prev_accel_clip[1] - 0.05, self.prev_accel_clip[1] + self.accel.get_rise_rate())
self.output_a_target = np.clip(output_a_target, accel_clip[0], accel_clip[1])
self.prev_accel_clip = accel_clip
@@ -0,0 +1,57 @@
import inspect
import re
from pathlib import Path
from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
REPO_ROOT = Path(__file__).resolve().parents[3]
def test_smoothing_params_default_off():
params_keys = (REPO_ROOT / "common/params_keys.h").read_text()
assert re.search(r'"AccelPersonalityEnabled", \{PERSISTENT \| BACKUP, BOOL, "0"\}', params_keys)
assert re.search(r'"RadarDistance", \{PERSISTENT \| BACKUP, BOOL, "0"\}', params_keys)
# the retired dRel-bias sub-toggles must not return (their features were deleted in the input-shaping rewrite)
assert '"StopGapBias"' not in params_keys
assert '"LeadDecelAnticipate"' not in params_keys
def test_output_is_byte_stock_and_inputs_are_shaped():
update_src = inspect.getsource(LongitudinalPlanner.update)
# INPUT shaping only: the accel ceiling and the radar-conditioning seam are present...
assert "self.accel.get_max_accel(v_ego)" in update_src
assert "self.mpc.update(self.smooth_radarstate(sm['radarState'])" in update_src
# ...and the OUTPUT is never post-shaped (byte-stock output; no accel shaping, no should_stop override).
assert "smooth_target_accel" not in update_src
assert "sng_should_stop" not in update_src # reverted: the should_stop hysteresis caused a high-speed under-brake
def test_t_follow_hook_wired_and_identity_default():
init_src = inspect.getsource(LongitudinalPlanner.__init__)
assert "self.mpc.t_follow_fn = self.accel.get_t_follow" in init_src # planner wires the add-only widen
mpc_init = inspect.getsource(LongitudinalMpc.__init__)
assert "self.t_follow_fn = None" in mpc_init # default None == byte-stock identity
mpc_update = inspect.getsource(LongitudinalMpc.update)
assert "if self.t_follow_fn is not None:" in mpc_update # guarded hook, only fires when set
# Tokens for the reverted input-side DEC model-stop-target (capped v_target into the MPC pre-solve). It was
# superseded by DEC blended-mode and chased a source-fixed radar gate; it must not silently return.
_DEC_MODEL_STOP_TOKENS = ("apply_model_stop_target", "force_stop_requested", "_update_model_stop", "MODEL_STOP_TARGET_TIME")
def test_dec_model_stop_target_not_reintroduced():
this_file = Path(__file__).resolve()
for sub in ("selfdrive/controls", "sunnypilot/selfdrive/controls"):
for path in (REPO_ROOT / sub).rglob("*.py"):
if path.resolve() == this_file:
continue # this guard names the tokens as strings
src = path.read_text()
for token in _DEC_MODEL_STOP_TOKENS:
assert token not in src, f"reverted DEC model-stop-target ({token}) re-introduced in {path}"
+6 -1
View File
@@ -28,6 +28,7 @@ from openpilot.selfdrive.modeld.helpers import usbgpu_present, modeld_pkl_path,
from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
PROCESS_NAME = "selfdrive.modeld.modeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
@@ -223,6 +224,7 @@ def main(demo=False):
prev_action = log.ModelDataV2.Action()
DH = DesireHelper()
RELC = RoadEdgeLaneChangeController(DH)
while True:
# Keep receiving frames until we are at least 1 frame ahead of previous extra frame
@@ -325,7 +327,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RELC.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs, v_ego, modelv2_send.modelV2.roadEdges)
mdv2sp_send.modelDataV2SP.leftLaneChangeEdgeBlock = RELC.left_edge_detected
mdv2sp_send.modelDataV2SP.rightLaneChangeEdgeBlock = RELC.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RELC.left_edge_detected, RELC.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
mdv2sp_send.modelDataV2SP.laneTurnDirection = DH.lane_turn_direction
+8 -1
View File
@@ -321,9 +321,16 @@ class SelfdriveD(CruiseHelper):
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
mdv2sp = self.sm['modelDataV2SP']
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
(CS.rightBlindspot and direction == LaneChangeDirection.right):
self.events.add(EventName.laneChangeBlocked)
elif (mdv2sp.leftLaneChangeEdgeBlock and direction == LaneChangeDirection.left) or \
(mdv2sp.rightLaneChangeEdgeBlock and direction == LaneChangeDirection.right):
self.events_sp.add(custom.OnroadEventSP.EventName.laneChangeRoadEdge)
else:
if direction == LaneChangeDirection.left:
self.events.add(EventName.preLaneChangeLeft)
+46 -1
View File
@@ -31,6 +31,15 @@ DESCRIPTIONS = {
"Receive alerts to steer back into the lane when your vehicle drifts over a detected lane line " +
"without a turn signal activated while driving over 31 mph (50 km/h)."
),
"AccelPersonalityEnabled": tr_noop("Enable Eco/Normal/Sport acceleration profiles, including early soft braking."),
"AccelPersonality": tr_noop(
"Eco accelerates gently and brakes early and soft; Sport accelerates briskly. " +
"Hard-braking authority is always preserved."
),
"RadarDistance": tr_noop(
"Hold a lead through brief radar flicker/dropout so sunnypilot does not lose and re-grab it, " +
"smoothing the hard/late brakes that radar drop-outs cause. Braking is never reduced below stock."
),
"AlwaysOnDM": tr_noop("Enable driver monitoring even when sunnypilot is not engaged."),
'RecordFront': tr_noop("Upload data from the driver facing camera and help improve the driver monitoring algorithm."),
"IsMetric": tr_noop("Display speed in km/h instead of mph."),
@@ -64,6 +73,12 @@ class TogglesLayout(Widget):
"disengage_on_accelerator.png",
False,
),
"RadarDistance": (
lambda: tr("Radar Distance"),
DESCRIPTIONS["RadarDistance"],
"speed_limit.png",
False,
),
"IsLdwEnabled": (
lambda: tr("Enable Lane Departure Warnings"),
DESCRIPTIONS["IsLdwEnabled"],
@@ -106,6 +121,24 @@ class TogglesLayout(Widget):
icon="speed_limit.png"
)
self._accel_personality_enabled = toggle_item(
lambda: tr("Enable Acceleration Profiles"),
lambda: tr(DESCRIPTIONS["AccelPersonalityEnabled"]),
self._params.get_bool("AccelPersonalityEnabled"),
callback=self._set_accel_personality_enabled,
icon="speed_limit.png",
)
self._accel_personality_setting = multiple_button_item(
lambda: tr("Acceleration Profile"),
lambda: tr(DESCRIPTIONS["AccelPersonality"]),
buttons=[lambda: tr("Eco"), lambda: tr("Normal"), lambda: tr("Sport")],
button_width=300,
callback=self._set_accel_personality,
selected_index=self._params.get("AccelPersonality", return_default=True),
icon="speed_limit.png"
)
self._toggles = {}
self._locked_toggles = set()
for param, (title, desc, icon, needs_restart) in self._toggle_defs.items():
@@ -135,9 +168,11 @@ class TogglesLayout(Widget):
self._toggles[param] = toggle
# insert longitudinal personality after NDOG toggle
# insert longitudinal + acceleration personality after NDOG toggle
if param == "DisengageOnAccelerator":
self._toggles["LongitudinalPersonality"] = self._long_personality_setting
self._toggles["AccelPersonalityEnabled"] = self._accel_personality_enabled
self._toggles["AccelPersonality"] = self._accel_personality_setting
self._update_experimental_mode_icon()
self._scroller = Scroller(list(self._toggles.values()), line_separator=True, spacing=0)
@@ -176,11 +211,15 @@ class TogglesLayout(Widget):
self._toggles["ExperimentalMode"].action_item.set_enabled(True)
self._toggles["ExperimentalMode"].set_description(e2e_description)
self._long_personality_setting.action_item.set_enabled(True)
self._accel_personality_enabled.action_item.set_enabled(True)
self._accel_personality_setting.action_item.set_enabled(True)
else:
# no long for now
self._toggles["ExperimentalMode"].action_item.set_enabled(False)
self._toggles["ExperimentalMode"].action_item.set_state(False)
self._long_personality_setting.action_item.set_enabled(False)
self._accel_personality_enabled.action_item.set_enabled(False)
self._accel_personality_setting.action_item.set_enabled(False)
self._params.remove("ExperimentalMode")
unavailable = tr("Experimental mode is currently unavailable on this car since the car's stock ACC is used for longitudinal control.")
@@ -247,3 +286,9 @@ class TogglesLayout(Widget):
def _set_longitudinal_personality(self, button_index: int):
self._params.put("LongitudinalPersonality", button_index, block=True)
def _set_accel_personality(self, button_index: int):
self._params.put("AccelPersonality", button_index, block=True)
def _set_accel_personality_enabled(self, state: bool):
self._params.put_bool("AccelPersonalityEnabled", state, block=True)
+5 -2
View File
@@ -13,6 +13,7 @@ from openpilot.system.ui.lib.application import gui_app
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.settings import SettingsLayoutSP as SettingsLayout
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad import OnroadViewContainerSP as AugmentedRoadView
ONROAD_DELAY = 2.5 # seconds
@@ -118,13 +119,15 @@ class MiciMainLayout(Scroller):
# FIXME: these two pops can interrupt user interacting in the settings
if self._onroad_time_delay is not None and rl.get_time() - self._onroad_time_delay >= ONROAD_DELAY:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._onroad_time_delay = None
# When car leaves standstill, pop nav stack and scroll to onroad
CS = ui_state.sm["carState"]
if not CS.standstill and self._prev_standstill:
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
if not gui_app.sunnypilot_ui() or self._should_auto_scroll_to_onroad():
gui_app.pop_widgets_to(self, lambda: self._scroll_to(self._onroad_layout))
self._prev_standstill = CS.standstill
def _on_interactive_timeout(self):
@@ -51,11 +51,17 @@ class LaneChangeSettingsLayout(Widget):
description=lambda: tr("Toggle to enable a delay timer for seamless lane changes when blind spot monitoring " +
"(BSM) detects a obstructing vehicle, ensuring safe maneuvering."),
)
self._road_edge_block = toggle_item_sp(
param="RoadEdgeLaneChangeEnabled",
title=lambda: tr("Block Lane Change: Road Edge Detection"),
description=lambda: tr("Blocks the lane change if the model sees a road edge on your signaled side."),
)
items = [
self._lane_change_timer,
LineSeparatorSP(40),
self._bsm_delay,
self._road_edge_block,
]
return items
@@ -0,0 +1,13 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
class MiciMainLayoutSP(MiciMainLayout):
def _should_auto_scroll_to_onroad(self) -> bool:
return not self._onroad_layout.is_on_info_panel()
@@ -0,0 +1,63 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import gui_app
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroller_sp import ScrollerSP
from openpilot.selfdrive.ui.sunnypilot.mici.onroad.augmented_road_view import AugmentedRoadViewSP
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.onroad_info_panel import OnroadInfoPanel
CONFIDENCE_BALL_VISIBLE_RATIO = 0.4
HORIZONTAL_SETTLE_PX = 5
HORIZONTAL_RESET_RATIO = 0.5
class OnroadViewContainerSP(ScrollerSP):
def __init__(self, bookmark_callback=None):
super().__init__(horizontal=False, snap_items=True, spacing=0, pad=0, scroll_indicator=False, edge_shadows=False)
self.road_view = AugmentedRoadViewSP(bookmark_callback=bookmark_callback)
self.onroad_info_panel = OnroadInfoPanel(bookmark_callback=bookmark_callback)
self._scroller.add_widgets([
self.road_view,
self.onroad_info_panel,
])
self._scroller.set_reset_scroll_at_show(False)
self._scroller.set_scrolling_enabled(lambda: abs(self.rect.x) < HORIZONTAL_SETTLE_PX)
for child in (self.road_view, self.onroad_info_panel):
inner_touch_valid = child._touch_valid_callback
child.set_touch_valid_callback(
lambda inner=inner_touch_valid: self._touch_valid() and (inner() if inner else True)
)
def set_rect(self, rect: rl.Rectangle):
super().set_rect(rect)
self.road_view.set_rect(rect)
self.onroad_info_panel.set_rect(rect)
return self
def is_swiping_left(self) -> bool:
return self.road_view.is_swiping_left() or self.onroad_info_panel.is_swiping_left()
def set_click_callback(self, callback) -> None:
self.road_view.set_click_callback(callback)
self.onroad_info_panel.set_click_callback(callback)
def is_on_info_panel(self) -> bool:
"""True when scrolled past halfway toward onroad_info_panel (used by main layout
to skip auto-pop-back-to-camera while user is reading the info panel)."""
return abs(self._scroller.scroll_panel.get_offset()) > self._rect.height / 2
def _render(self, rect: rl.Rectangle):
if abs(self.rect.x) > gui_app.width * HORIZONTAL_RESET_RATIO:
self._scroller.scroll_panel.set_offset(0)
vertical_offset = self._scroller.scroll_panel.get_offset()
show_ball = abs(vertical_offset) < rect.height * CONFIDENCE_BALL_VISIBLE_RATIO
self.road_view.set_show_confidence_ball(show_ball)
super()._render(rect)
@@ -0,0 +1,324 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from dataclasses import dataclass
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.application import MousePos
from openpilot.system.ui.widgets import Widget
from openpilot.selfdrive.ui.mici.onroad.alert_renderer import AlertRenderer
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import BookmarkIcon
METER_TO_KM = 0.001
METER_TO_MILE = 0.000621371
@dataclass(frozen=True)
class OnroadInfoPanelColors:
white: rl.Color = rl.WHITE
black: rl.Color = rl.BLACK
red: rl.Color = rl.Color(255, 0, 0, 255)
green: rl.Color = rl.Color(0, 255, 0, 255)
grey: rl.Color = rl.Color(190, 195, 190, 255)
light_grey: rl.Color = rl.Color(200, 200, 200, 255)
dark_grey: rl.Color = rl.Color(100, 100, 100, 255)
bg_dark: rl.Color = rl.Color(0, 0, 0, 255)
card_bg: rl.Color = rl.Color(50, 50, 50, 200)
badge_bg: rl.Color = rl.Color(60, 60, 60, 255)
COLORS = OnroadInfoPanelColors()
class OnroadInfoPanel(Widget):
def __init__(self, bookmark_callback=None):
super().__init__()
self.speed_limit: float = 0.0
self.speed_limit_valid: bool = False
self.speed_limit_offset: float = 0.0
self.next_speed_limit: float = 0.0
self.next_speed_limit_distance: float = 0.0
self.road_name: str = ""
self.current_speed: float = 0.0
self.set_speed: float = 0.0
self.cruise_enabled: bool = False
self._sign_slide: float = 0.0
self._font_bold: rl.Font = gui_app.font(FontWeight.BOLD)
self._font_semi_bold: rl.Font = gui_app.font(FontWeight.SEMI_BOLD)
self._font_medium: rl.Font = gui_app.font(FontWeight.MEDIUM)
self._marquee_offset: float = 0.0
self._marquee_direction: int = 1
self._marquee_pause_timer: float = 0.0
self._marquee_speed: float = 40.0
self._marquee_pause_duration: float = 1.5
self._alert_renderer = AlertRenderer()
self._alert_alpha_filter = FirstOrderFilter(0, 0.05, 1 / gui_app.target_fps)
self._bookmark_icon = BookmarkIcon(bookmark_callback)
def is_swiping_left(self) -> bool:
return self._bookmark_icon.is_swiping_left()
def _handle_mouse_release(self, mouse_pos: MousePos) -> None:
# Mirror stock AugmentedRoadView: suppress click while bookmark gesture active
if not self._bookmark_icon.interacting():
super()._handle_mouse_release(mouse_pos)
def _update_state(self) -> None:
sm = ui_state.sm
speed_conv = CV.MS_TO_KPH if ui_state.is_metric else CV.MS_TO_MPH
if sm.valid["longitudinalPlanSP"]:
lp_sp = sm["longitudinalPlanSP"]
resolver = lp_sp.speedLimit.resolver
self.speed_limit = resolver.speedLimit * speed_conv
self.speed_limit_valid = resolver.speedLimitValid
self.speed_limit_offset = resolver.speedLimitOffset * speed_conv
if sm.valid["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
self.next_speed_limit = lmd.speedLimitAhead * speed_conv
self.next_speed_limit_distance = lmd.speedLimitAheadDistance
self.road_name = lmd.roadName
if sm.updated["carState"]:
self.current_speed = sm["carState"].vEgo * speed_conv
if sm.valid["carState"] and sm.valid["controlsState"]:
self.cruise_enabled = sm["carState"].cruiseState.enabled
v_cruise_cluster = sm["carState"].vCruiseCluster
set_speed_kph = sm["controlsState"].vCruiseDEPRECATED if v_cruise_cluster == 0.0 else v_cruise_cluster
self.set_speed = set_speed_kph * (METER_TO_MILE / METER_TO_KM) if not ui_state.is_metric else set_speed_kph
def _render(self, rect: rl.Rectangle) -> None:
self._update_state()
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), COLORS.bg_dark)
margin = 20
mid_y = rect.y + rect.height / 2
left_x = rect.x + margin
if self.cruise_enabled:
unit = tr("MAX")
display_speed = self.set_speed
else:
unit = tr("km/h") if ui_state.is_metric else tr("MPH")
display_speed = self.current_speed
speed_val = str(round(display_speed))
if self.speed_limit_valid and display_speed > self.speed_limit:
speed_color = COLORS.red
else:
speed_color = COLORS.white
rl.draw_text_ex(self._font_semi_bold, unit, rl.Vector2(left_x, mid_y - 95), 38, 0, COLORS.grey)
rl.draw_text_ex(self._font_bold, speed_val, rl.Vector2(left_x, mid_y - 60), 110, 0, speed_color)
sign_width = 135
sign_height = 135 if ui_state.is_metric else 175
has_next = self.next_speed_limit > 0 and self.next_speed_limit != self.speed_limit
target_slide = 1.0 if has_next else 0.0
slide_speed = 3.0 * rl.get_frame_time()
if self._sign_slide < target_slide:
self._sign_slide = min(self._sign_slide + slide_speed, target_slide)
elif self._sign_slide > target_slide:
self._sign_slide = max(self._sign_slide - slide_speed, target_slide)
next_w = int(sign_width * 0.7)
next_h = int(sign_height * 0.7)
next_peek = int(next_w * 0.85) + 5
centered_x = rect.x + rect.width - sign_width - margin
shifted_x = rect.x + rect.width - sign_width - margin - next_peek
sign_x = centered_x + (shifted_x - centered_x) * self._sign_slide
sign_y = rect.y + (rect.height - sign_height) / 2
road_y = mid_y + 55
road_width = sign_x - left_x - margin
self._draw_road_name(left_x, road_y, road_width)
if has_next and self._sign_slide > 0.01:
next_val = str(round(self.next_speed_limit))
dist_str = self._format_distance(self.next_speed_limit_distance)
next_x = sign_x + sign_width - int(next_w * 0.15)
next_y = sign_y + (sign_height - next_h) / 2
next_speed_color = COLORS.black
if ui_state.is_metric:
self._draw_vienna_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
else:
self._draw_mutcd_sign(next_x, next_y, next_w, next_h, next_val, next_speed_color, is_upcoming=True)
dist_size = measure_text_cached(self._font_medium, dist_str, 24)
rl.draw_text_ex(self._font_medium, dist_str, rl.Vector2(next_x + next_w / 2 - dist_size.x / 2, next_y + next_h + 4), 24, 0, COLORS.grey)
self._draw_speed_limit_sign(sign_x, sign_y, sign_width, sign_height)
if self.speed_limit_offset != 0 and self.speed_limit_valid:
offset_val = str(abs(round(self.speed_limit_offset)))
badge_sz = 42
badge_x = sign_x + sign_width - badge_sz * 0.85
badge_y = sign_y - badge_sz * 0.25
if ui_state.is_metric:
badge_r = badge_sz / 2
badge_cx = badge_x + badge_r
badge_cy = badge_y + badge_r
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r + 2, COLORS.dark_grey)
rl.draw_circle(int(badge_cx), int(badge_cy), badge_r, COLORS.badge_bg)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(badge_cx, badge_cy), COLORS.white)
else:
mutcd_badge_x = sign_x + sign_width - badge_sz * 0.65
mutcd_badge_y = sign_y - badge_sz * 0.50
badge_rect = rl.Rectangle(mutcd_badge_x, mutcd_badge_y, badge_sz, badge_sz)
rl.draw_rectangle_rounded(badge_rect, 0.25, 10, COLORS.badge_bg)
rl.draw_rectangle_rounded_lines_ex(badge_rect, 0.25, 10, 2, COLORS.dark_grey)
self._draw_text_centered(self._font_bold, offset_val, 24, rl.Vector2(mutcd_badge_x + badge_sz / 2, mutcd_badge_y + badge_sz / 2), COLORS.white)
# SCC
speed_size = measure_text_cached(self._font_bold, speed_val, 110)
scc_x = left_x + speed_size.x + 30
scc_y = mid_y - 50
self._draw_scc_icons(scc_x, scc_y)
self._bookmark_icon.render(rect)
if ui_state.started:
alert_obj, no_alert = self._alert_renderer.will_render()
self._alert_alpha_filter.update(0 if no_alert else 1)
alpha = self._alert_alpha_filter.x
if alpha > 0.01:
rl.draw_rectangle(int(rect.x), int(rect.y), int(rect.width), int(rect.height), rl.Color(0, 0, 0, int(150 * alpha)))
self._alert_renderer.render(rect)
def _draw_scc_icons(self, x: float, y: float) -> None:
sm = ui_state.sm
if not sm.valid["longitudinalPlanSP"]:
return
scc = sm["longitudinalPlanSP"].smartCruiseControl
box_w, box_h = 100, 36
gap = 6
drawn = 0
for label, active in [("SCC-V", scc.vision.active), ("SCC-M", scc.map.active)]:
if not active:
continue
bx = x
by = y + drawn * (box_h + gap)
rl.draw_rectangle_rounded(rl.Rectangle(bx, by, box_w, box_h), 0.3, 10, COLORS.green)
self._draw_text_centered(self._font_bold, label, 20, rl.Vector2(bx + box_w / 2, by + box_h / 2), COLORS.black)
drawn += 1
def _draw_speed_limit_sign(self, x: float, y: float, sign_width: float, sign_height: float) -> None:
speed_str = str(round(self.speed_limit)) if self.speed_limit_valid and self.speed_limit > 0 else "--"
speed_color = COLORS.black if not self.speed_limit_valid or self.current_speed <= self.speed_limit else COLORS.red
if ui_state.is_metric:
self._draw_vienna_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
else:
self._draw_mutcd_sign(x, y, sign_width, sign_height, speed_str, speed_color, is_upcoming=False)
def _draw_road_name(self, x: float, y: float, width: float) -> None:
road_display = self.road_name if self.road_name else "--"
font_size = 30
road_size = measure_text_cached(self._font_semi_bold, road_display, font_size)
text_width = road_size.x
if text_width <= width:
self._marquee_offset = 0.0
self._marquee_direction = 1
self._marquee_pause_timer = 0.0
rl.draw_text_ex(self._font_semi_bold, road_display, rl.Vector2(x, y), font_size, 0, COLORS.white)
else:
overflow = text_width - width
dt = rl.get_frame_time()
if self._marquee_pause_timer > 0:
self._marquee_pause_timer -= dt
else:
self._marquee_offset += self._marquee_direction * self._marquee_speed * dt
if self._marquee_offset >= overflow:
self._marquee_offset = overflow
self._marquee_direction = -1
self._marquee_pause_timer = self._marquee_pause_duration
elif self._marquee_offset <= 0:
self._marquee_offset = 0
self._marquee_direction = 1
self._marquee_pause_timer = self._marquee_pause_duration
rl.begin_scissor_mode(int(x), int(y), int(width), int(road_size.y + 4))
text_pos = rl.Vector2(x - self._marquee_offset, y)
rl.draw_text_ex(self._font_semi_bold, road_display, text_pos, font_size, 0, COLORS.white)
rl.end_scissor_mode()
def _draw_vienna_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
center = rl.Vector2(x + width / 2, y + height / 2)
outer_radius = min(width, height) / 2
rl.draw_circle_v(center, outer_radius, COLORS.white)
ring_width = outer_radius * 0.18
rl.draw_ring(center, outer_radius - ring_width, outer_radius, 0, 360, 36, COLORS.red)
font_size = outer_radius * (0.7 if len(speed_str) >= 3 else 0.9)
text_size = measure_text_cached(self._font_bold, speed_str, int(font_size))
text_pos = rl.Vector2(center.x - text_size.x / 2, center.y - text_size.y / 2)
rl.draw_text_ex(self._font_bold, speed_str, text_pos, font_size, 0, speed_color)
def _draw_mutcd_sign(self, x: float, y: float, width: float, height: float, speed_str: str, speed_color: rl.Color, is_upcoming: bool = False) -> None:
sign_rect = rl.Rectangle(x, y, width, height)
rl.draw_rectangle_rounded(sign_rect, 0.35, 10, COLORS.white)
inset = max(4, width * 0.05)
inner_rect = rl.Rectangle(x + inset, y + inset, width - inset * 2, height - inset * 2)
outer_radius = 0.35 * width / 2.0
inner_radius = outer_radius - inset
inner_roundness = inner_radius / (inner_rect.width / 2.0)
rl.draw_rectangle_rounded_lines_ex(inner_rect, inner_roundness, 10, 3, COLORS.black)
mid_x = x + width / 2
label_size = max(18, int(width * 0.26))
if is_upcoming:
self._draw_text_centered(self._font_bold, tr("AHEAD"), label_size, rl.Vector2(mid_x, y + height * 0.27), COLORS.black)
else:
self._draw_text_centered(self._font_bold, tr("SPEED"), label_size, rl.Vector2(mid_x, y + height * 0.20), COLORS.black)
self._draw_text_centered(self._font_bold, tr("LIMIT"), label_size, rl.Vector2(mid_x, y + height * 0.40), COLORS.black)
speed_font_size = int(width * 0.52) if len(speed_str) >= 3 else int(width * 0.62)
self._draw_text_centered(self._font_bold, speed_str, speed_font_size, rl.Vector2(mid_x, y + height * 0.72), speed_color)
def _draw_text_centered(self, font, text, size, pos_center, color):
sz = measure_text_cached(font, text, size)
rl.draw_text_ex(font, text, rl.Vector2(pos_center.x - sz.x / 2, pos_center.y - sz.y / 2), size, 0, color)
def _format_distance(self, distance: float) -> str:
if ui_state.is_metric:
if distance < 50:
return tr("Near")
if distance >= 1000:
return f"{distance * METER_TO_KM:.1f}" + tr("km")
if distance < 200:
rounded = max(10, int(distance / 10) * 10)
else:
rounded = int(distance / 100) * 100
return str(rounded) + tr("m")
else:
distance_mi = distance * METER_TO_MILE
if distance_mi < 0.1:
return tr("Near")
return f"{distance_mi:.1f}" + tr("mi")
@@ -0,0 +1,30 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import AugmentedRoadView
class _SuppressedConfidenceBall:
def render(self, *_):
pass
class AugmentedRoadViewSP(AugmentedRoadView):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._show_confidence_ball: bool = True
self._real_confidence_ball = self._confidence_ball
self._confidence_ball = _SuppressedConfidenceBall()
def set_show_confidence_ball(self, show: bool) -> None:
self._show_confidence_ball = show
def _render(self, rect: rl.Rectangle) -> None:
super()._render(rect)
if self._show_confidence_ball:
self._real_confidence_ball.render(self.rect)
@@ -0,0 +1,34 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.system.ui.lib.application import MouseEvent
from openpilot.system.ui.lib.scroll_panel2 import GuiScrollPanel2, ScrollState
class GuiScrollPanel2SP(GuiScrollPanel2):
"""Reject orthogonal-dominant drags so nested scrollers (outer horizontal +
inner vertical) don't both engage on a slightly diagonal swipe.
Implemented as a post-super state rollback rather than reimplementing the
PRESSED state machine — keeps stock behaviour authoritative."""
def _handle_mouse_event(self, mouse_event: MouseEvent, bounds: rl.Rectangle, bounds_size: float,
content_size: float) -> None:
pre_state = self._state
super()._handle_mouse_event(mouse_event, bounds, bounds_size, content_size)
if self._state == ScrollState.MANUAL_SCROLL and pre_state == ScrollState.PRESSED and \
self._initial_click_event is not None:
diff_x = abs(mouse_event.pos.x - self._initial_click_event.pos.x)
diff_y = abs(mouse_event.pos.y - self._initial_click_event.pos.y)
along = diff_x if self._horizontal else diff_y
anti = diff_y if self._horizontal else diff_x
if anti > along:
self._state = ScrollState.STEADY
self._velocity = 0.0
self._velocity_buffer.clear()
@@ -0,0 +1,16 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.system.ui.widgets.scroller import Scroller
from openpilot.selfdrive.ui.sunnypilot.mici.widgets.scroll_panel_sp import GuiScrollPanel2SP
class ScrollerSP(Scroller):
def __init__(self, **kwargs):
super().__init__(**kwargs)
inner = self._scroller
inner.scroll_panel = GuiScrollPanel2SP(inner._horizontal, handle_out_of_bounds=not inner._snap_items)
+3
View File
@@ -10,6 +10,9 @@ from openpilot.selfdrive.ui.layouts.main import MainLayout
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout
from openpilot.selfdrive.ui.ui_state import ui_state
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.layouts.main import MiciMainLayoutSP as MiciMainLayout
BIG_UI = gui_app.big_ui()
+6 -1
View File
@@ -40,6 +40,7 @@ from openpilot.sunnypilot.modeld_v2.camera_offset_helper import CameraOffsetHelp
from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
PROCESS_NAME = "selfdrive.modeld.modeld_tinygrad"
@@ -329,6 +330,7 @@ def main(demo=False):
prev_action = log.ModelDataV2.Action()
DH = DesireHelper()
RELC = RoadEdgeLaneChangeController(DH)
meta_constants = load_meta_constants()
while True:
@@ -433,7 +435,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RELC.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs, v_ego, modelv2_send.modelV2.roadEdges)
mdv2sp_send.modelDataV2SP.leftLaneChangeEdgeBlock = RELC.left_edge_detected
mdv2sp_send.modelDataV2SP.rightLaneChangeEdgeBlock = RELC.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RELC.left_edge_detected, RELC.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
mdv2sp_send.modelDataV2SP.laneTurnDirection = DH.lane_turn_direction
@@ -0,0 +1,108 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
Acceleration Personality (ECO / NORMAL / SPORT). Tunes only MPC INPUTS, never the output:
* positive-accel ceiling + per-cycle open-rate -> tier-scaled take-off from a stop;
* add-only, speed-dependent follow-gap widen on the MPC t_follow -> earlier/gentler braking, roomier gap;
* sticky should_stop hysteresis -> no stop-and-go gas-brake-gas-brake.
Add-only gap => desired distance >= stock => braking >= stock. Disabled => stock everywhere (byte-stock).
"""
import numpy as np
from cereal import messaging
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot import get_sanitize_int_param
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
NORMAL, PERSONALITY_MIN, PERSONALITY_MAX, A_CRUISE_MAX_BP, A_CRUISE_MAX_V, STOCK_A_CRUISE_MAX_V, \
RISE_RATE, STOCK_RISE_RATE, TF_WIDEN_V_BP, TF_WIDEN_BASE_V, TF_WIDEN_TIER, TF_WIDEN_MAX, \
TF_SLEW_PER_S, TF_DECEL_HOLD_A
class AccelController:
def __init__(self, CP: structs.CarParams, mpc=None, params=None):
# CP/mpc accepted for the planner's constructor signature; unused (shapes MPC inputs only).
self._params = params or Params()
self._frame = 0
self._enabled = False
self._personality = NORMAL
self._v_ego = 0.0
self._a_ego = 0.0
self._widen = 0.0 # current slewed follow-gap widen (s), add-only
self._t_follow = 0.0 # last t_follow handed to the MPC (telemetry)
self._read_params()
def _read_params(self) -> None:
self._enabled = self._params.get_bool("AccelPersonalityEnabled")
if not self._enabled:
self._personality = NORMAL
return
self._personality = get_sanitize_int_param("AccelPersonality", PERSONALITY_MIN, PERSONALITY_MAX, self._params)
def update(self, sm: messaging.SubMaster) -> None:
if self._frame % int(1. / DT_MDL) == 0:
self._read_params()
self._v_ego = float(sm['carState'].vEgo)
self._a_ego = float(sm['carState'].aEgo)
self._frame += 1
def reset(self) -> None:
# Drop the accumulated widen (e.g. on disengage / standstill re-init) so it re-ramps cleanly.
self._widen = 0.0
def get_max_accel(self, v_ego: float) -> float:
# Disabled -> stock ceiling (off == stock, independent of the NORMAL profile so NORMAL is free to differ).
table = A_CRUISE_MAX_V[self._personality] if self._enabled else STOCK_A_CRUISE_MAX_V
return float(np.interp(v_ego, A_CRUISE_MAX_BP, table))
def get_rise_rate(self) -> float:
# Disabled -> stock ceiling open-rate (off == stock, independent of the NORMAL profile).
return RISE_RATE[self._personality] if self._enabled else STOCK_RISE_RATE
def get_t_follow(self, t_follow: float, v_ego: float) -> float:
# MPC t_follow hook. Adds a slewed, decel-held, speed-dependent comfort widen on top of the stock
# t_follow. Identity when disabled => byte-stock. Add-only => desired distance >= stock => brake >= stock.
t_follow = float(t_follow)
if not self._enabled:
self._widen = 0.0
self._t_follow = t_follow
return t_follow
target = float(np.interp(v_ego, TF_WIDEN_V_BP, TF_WIDEN_BASE_V)) * TF_WIDEN_TIER[self._personality]
target = min(target, TF_WIDEN_MAX)
step = TF_SLEW_PER_S * DT_MDL
if self._a_ego <= TF_DECEL_HOLD_A and target < self._widen:
pass # decel-hold: don't ease the gap in while braking
elif target > self._widen:
self._widen = min(target, self._widen + step) # open the gap, slewed
else:
self._widen = max(target, self._widen - step) # close the gap, slewed
self._widen = max(0.0, self._widen) # add-only guard
self._t_follow = t_follow + self._widen
return self._t_follow
# --- telemetry (published to cereal LongitudinalPlanSP.acceleration; no control effect) ---
def enabled(self) -> bool:
return self._enabled
def personality(self):
return self._personality
def max_accel(self) -> float:
return self.get_max_accel(self._v_ego)
def t_follow(self) -> float:
return self._t_follow
def follow_widen(self) -> float:
return self._widen
def widen_active(self) -> bool:
return self._enabled and self._widen > 0.005
@@ -0,0 +1,51 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
Acceleration Personality tuning tables. The controller shapes only what the longitudinal MPC CONSUMES
(the positive-accel ceiling + its open-rate, and an add-only follow-gap widen fed to the MPC's t_follow);
it never post-shapes the MPC's output accel. Disabled => every getter returns the upstream stock value,
so off == byte-stock.
"""
from cereal import custom
AccelerationPersonality = custom.LongitudinalPlanSP.AccelerationPersonality
ECO = AccelerationPersonality.eco
NORMAL = AccelerationPersonality.normal
SPORT = AccelerationPersonality.sport
PERSONALITY_MIN = min(AccelerationPersonality.schema.enumerants.values())
PERSONALITY_MAX = max(AccelerationPersonality.schema.enumerants.values())
# --- Positive-accel ceiling (launch/cruise) + its upward open-rate ---------------------------------------
# off == stock: get_max_accel/get_rise_rate fall back to the STOCK_* values (upstream get_max_accel table
# and +0.05 ceiling slew), independent of the NORMAL profile so NORMAL is free to differ.
# ACCEL_MAX (opendbc) hard-caps the ceiling at 2.0 m/s^2, so the launch knots are set at/below it.
A_CRUISE_MAX_BP = [0., 10., 25., 40.] # m/s (matches upstream A_CRUISE_MAX_BP)
STOCK_A_CRUISE_MAX_V = [1.6, 1.2, 0.8, 0.6] # upstream A_CRUISE_MAX_VALS -> off == byte-stock ceiling
STOCK_RISE_RATE = 0.05 # upstream ceiling open-rate (m/s^2 per cycle)
A_CRUISE_MAX_V = {
ECO: [1.80, 1.10, 0.70, 0.50], # gentle-but-prompt launch, efficient cruise
NORMAL: [2.00, 1.40, 0.95, 0.70], # brisk launch, balanced cruise
SPORT: [2.00, 1.70, 1.20, 0.90], # strong launch (ACCEL_MAX caps the 0 m/s knot), assertive cruise
}
# Ceiling open-rate: how fast the accel ceiling may rise per cycle. All >> stock 0.05 so the permitted
# ceiling opens quickly off the line -> fast take-off from a stop. The MPC's own jerk/a_change cost still
# smooths the actual accel, so a higher open-rate cannot make the launch jerky.
RISE_RATE = {ECO: 0.10, NORMAL: 0.16, SPORT: 0.24}
# --- Follow-gap widen (add-only, fed to the MPC t_follow) ------------------------------------------------
# Add a small speed-dependent widen to the stock t_follow (the driver's gap-button value). Wider gap ->
# MPC brakes earlier + gentler onto a slowing lead and settles a roomier cruise gap. Invariants:
# * add-only -> desired distance >= stock -> braking >= stock;
# * zero below TF_WIDEN_V_BP[0] -> low-speed & standstill gap stay stock (stock stop distance preserved);
# * slewed per cycle -> no rubber-band; decel-hold -> gap won't shrink while braking (stays committed).
TF_WIDEN_V_BP = [14.0, 28.0] # m/s: widen ramps in across this band, flat above
TF_WIDEN_BASE_V = [0.0, 0.30] # s: base follow-time added at the band ends (pre-tier)
TF_WIDEN_TIER = {ECO: 1.30, NORMAL: 1.00, SPORT: 0.50} # ECO roomiest/smoothest, SPORT tightest/snappiest
TF_WIDEN_MAX = 0.45 # s: absolute cap on the added gap (never explodes)
TF_SLEW_PER_S = 0.50 # s per second: max rate the widen may open/close
TF_DECEL_HOLD_A = -0.20 # m/s^2: at/below this a_ego (braking) the widen won't shrink
@@ -0,0 +1,198 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
AccelController is an INPUT shaper for the longitudinal MPC: a per-tier positive-accel ceiling + open-rate
(launch), and an ADD-ONLY, slewed, decel-held follow-gap widen fed to the MPC t_follow. It never shapes the
MPC output, so these tests pin: off == byte-stock; tier ordering; and the t_follow invariants (add-only,
zero below the gate, slew-bounded, decel-hold, capped).
"""
from types import SimpleNamespace
import numpy as np
import pytest
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.constants import \
ECO, NORMAL, SPORT, PERSONALITY_MIN, PERSONALITY_MAX, A_CRUISE_MAX_BP, RISE_RATE, \
STOCK_A_CRUISE_MAX_V, STOCK_RISE_RATE, TF_WIDEN_V_BP, TF_WIDEN_BASE_V, TF_WIDEN_TIER, TF_WIDEN_MAX, \
TF_SLEW_PER_S, TF_DECEL_HOLD_A, AccelerationPersonality
_EPS = 1e-6
_TF_STOCK = 1.45 # a representative stock t_follow (standard personality); the widen is add-only on top
_SLEW_STEP = TF_SLEW_PER_S * DT_MDL
class FakeParams:
def __init__(self, store=None):
self.store = dict(store or {})
def get_bool(self, key):
return bool(self.store.get(key, False))
def get(self, key, return_default=False):
return int(self.store.get(key, 1))
def put(self, key, val, block=False):
self.store[key] = val
def make_sm(v_ego=20.0, a_ego=0.0):
return {'carState': SimpleNamespace(vEgo=v_ego, aEgo=a_ego)}
def make_controller(enabled=True, personality=NORMAL):
store = {"AccelPersonalityEnabled": enabled, "AccelPersonality": int(personality)}
ctrl = AccelController(CP=SimpleNamespace(), mpc=SimpleNamespace(), params=FakeParams(store))
ctrl.update(make_sm())
return ctrl
def settle(ctrl, v_ego, a_ego=0.0, t_follow=_TF_STOCK, n=400):
ctrl.update(make_sm(v_ego=v_ego, a_ego=a_ego))
out = t_follow
for _ in range(n):
out = ctrl.get_t_follow(t_follow, v_ego)
return out
# --- Profiles / off == stock ------------------------------------------------------------------------------
def test_enum_source_parity():
assert (ECO, NORMAL, SPORT) == (AccelerationPersonality.eco, AccelerationPersonality.normal, AccelerationPersonality.sport)
assert (PERSONALITY_MIN, PERSONALITY_MAX) == (0, 2)
def test_disabled_forces_normal_and_stock_ceiling():
ctrl = make_controller(enabled=False, personality=SPORT)
assert ctrl.personality() == NORMAL
assert not ctrl.enabled()
for v in (0.0, 10.0, 25.0, 40.0):
assert ctrl.get_max_accel(v) == pytest.approx(np.interp(v, A_CRUISE_MAX_BP, STOCK_A_CRUISE_MAX_V))
assert ctrl.get_rise_rate() == STOCK_RISE_RATE
def test_disabled_t_follow_is_identity():
ctrl = make_controller(enabled=False, personality=SPORT)
for v in (2.0, 10.0, 20.0, 30.0):
assert ctrl.get_t_follow(_TF_STOCK, v) == pytest.approx(_TF_STOCK)
assert ctrl.follow_widen() == 0.0
assert not ctrl.widen_active()
def test_stock_ceiling_matches_upstream():
# off must equal upstream get_max_accel table so the feature is byte-stock when disabled.
assert STOCK_A_CRUISE_MAX_V == [1.6, 1.2, 0.8, 0.6]
assert A_CRUISE_MAX_BP == [0., 10., 25., 40.]
assert STOCK_RISE_RATE == 0.05
def test_ceiling_ordering_eco_le_normal_le_sport():
eco = make_controller(personality=ECO)
nrm = make_controller(personality=NORMAL)
spt = make_controller(personality=SPORT)
for v in (0.0, 10.0, 25.0, 40.0):
assert eco.get_max_accel(v) <= nrm.get_max_accel(v) + _EPS
assert nrm.get_max_accel(v) <= spt.get_max_accel(v) + _EPS
# strictly distinct where the tables diverge (mid speed)
assert make_controller(personality=ECO).get_max_accel(25.0) < make_controller(personality=SPORT).get_max_accel(25.0)
def test_rise_rate_ordering_and_above_stock():
assert RISE_RATE[ECO] < RISE_RATE[NORMAL] < RISE_RATE[SPORT]
assert RISE_RATE[ECO] > STOCK_RISE_RATE # every tier opens the ceiling faster than stock (fast take-off)
def test_normal_is_distinct_from_stock():
nrm = make_controller(personality=NORMAL)
# enabled NORMAL differs from stock (so NORMAL is a real profile, not a stock alias)
assert nrm.get_max_accel(25.0) != pytest.approx(np.interp(25.0, A_CRUISE_MAX_BP, STOCK_A_CRUISE_MAX_V))
assert nrm.get_rise_rate() != STOCK_RISE_RATE
# --- t_follow: add-only speed widen -----------------------------------------------------------------------
def test_t_follow_zero_below_gate():
ctrl = make_controller(personality=NORMAL)
out = settle(ctrl, v_ego=TF_WIDEN_V_BP[0] - 1.0) # below the widen onset
assert out == pytest.approx(_TF_STOCK)
assert ctrl.follow_widen() == pytest.approx(0.0, abs=1e-6)
def test_t_follow_widens_at_speed():
ctrl = make_controller(personality=NORMAL)
out = settle(ctrl, v_ego=TF_WIDEN_V_BP[1] + 5.0) # flat-widen region, above the band
expected = _TF_STOCK + TF_WIDEN_BASE_V[1] * TF_WIDEN_TIER[NORMAL]
assert out == pytest.approx(expected, abs=1e-3)
assert ctrl.widen_active()
def test_t_follow_add_only_random_walk():
rng = np.random.default_rng(0)
for personality in (ECO, NORMAL, SPORT):
ctrl = make_controller(personality=personality)
for _ in range(500):
v = float(rng.uniform(0.0, 40.0))
a = float(rng.uniform(-3.0, 1.5))
ctrl.update(make_sm(v_ego=v, a_ego=a))
out = ctrl.get_t_follow(_TF_STOCK, v)
assert out >= _TF_STOCK - _EPS # never tighter than the stock gap => brake >= stock
assert ctrl.follow_widen() <= TF_WIDEN_MAX + _EPS # widen capped
def test_t_follow_tier_ordering_at_speed():
v = TF_WIDEN_V_BP[1] + 5.0
eco = settle(make_controller(personality=ECO), v_ego=v)
nrm = settle(make_controller(personality=NORMAL), v_ego=v)
spt = settle(make_controller(personality=SPORT), v_ego=v)
assert eco > nrm > spt # ECO roomiest, SPORT tightest
def test_t_follow_slew_bounded():
ctrl = make_controller(personality=ECO)
ctrl.update(make_sm(v_ego=35.0, a_ego=0.0)) # big target widen, start from 0
prev = 0.0
for _ in range(50):
ctrl.get_t_follow(_TF_STOCK, 35.0)
assert ctrl.follow_widen() - prev <= _SLEW_STEP + _EPS # opens no faster than the slew cap
prev = ctrl.follow_widen()
def test_t_follow_decel_hold_does_not_shrink_gap():
ctrl = make_controller(personality=NORMAL)
settle(ctrl, v_ego=35.0, a_ego=0.0) # open the gap fully
held = ctrl.follow_widen()
assert held > 0.1
# now braking (a_ego below the hold threshold) while speed drops into the zero-widen region
for _ in range(50):
ctrl.update(make_sm(v_ego=8.0, a_ego=TF_DECEL_HOLD_A - 1.0))
ctrl.get_t_follow(_TF_STOCK, 8.0)
assert ctrl.follow_widen() >= held - _EPS # gap does not ease in while braking
# once no longer braking, the gap eases back toward the (zero) target
for _ in range(200):
ctrl.update(make_sm(v_ego=8.0, a_ego=0.0))
ctrl.get_t_follow(_TF_STOCK, 8.0)
assert ctrl.follow_widen() == pytest.approx(0.0, abs=1e-3)
def test_reset_clears_widen():
ctrl = make_controller(personality=SPORT)
settle(ctrl, v_ego=35.0)
assert ctrl.follow_widen() > 0.0
ctrl.reset()
assert ctrl.follow_widen() == 0.0
def test_out_of_range_personality_clamps():
ctrl = make_controller(personality=99)
assert ctrl.personality() == PERSONALITY_MAX
def test_max_accel_uses_stored_v_ego():
ctrl = make_controller(personality=SPORT)
ctrl.update(make_sm(v_ego=0.0))
assert ctrl.max_accel() == pytest.approx(ctrl.get_max_accel(0.0))
@@ -1,17 +1,46 @@
from openpilot.common.realtime import DT_MDL
class WMACConstants:
# Lead detection parameters
LEAD_WINDOW_SIZE = 6 # Stable detection window
LEAD_PROB = 0.45 # Balanced threshold for lead detection
TRAJECTORY_SIZE = 33
PARAM_READ_FRAMES = max(1, int(round(1.0 / DT_MDL)))
# Slow down detection parameters
SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable
SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios
EMERGENCY_HOLD_FRAMES = max(1, int(round(0.75 / DT_MDL)))
MIN_MODE_DURATION = {'acc': max(1, int(round(0.6 / DT_MDL))), 'blended': max(1, int(round(0.5 / DT_MDL)))}
ENTER_BLENDED_FRAMES = max(1, int(round(0.4 / DT_MDL)))
EXIT_BLENDED_FRAMES = max(1, int(round(0.35 / DT_MDL)))
STANDSTILL_FRAMES = max(1, int(round(0.2 / DT_MDL)))
# Optimized slow down distance curve - smooth and progressive
LEAD_PROB = 0.45
LEAD_EXIT_PROB = 0.25
LEAD_RISE_RATE = 1.0
LEAD_FALL_RATE = 0.35
RADAR_LEAD_ACC_PROB = 0.5
RADAR_LEAD_ACC_EXIT_PROB = 0.4
RADAR_LEAD_ACC_RISE_RATE = 1.0
RADAR_LEAD_ACC_FALL_RATE = 0.25
RADAR_LEAD_ACC_MAX_DREL = 80.0
RADAR_LEAD_ACC_MAX_TTC = 6.0
RADAR_LEAD_ACC_MIN_CLOSING_SPEED = -0.5
SLOW_DOWN_PROB = 0.5
SLOW_DOWN_EXIT_PROB = 0.4
SLOW_DOWN_RISE_RATE = 0.65
SLOW_DOWN_FALL_RATE = 0.15
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.]
URGENT_SLOW_DOWN_PROB = 0.85
# Slowness detection parameters
SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection
SLOWNESS_PROB = 0.55 # Clear threshold for slowness
SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset
MODEL_DECEL_START = -0.5
MODEL_DECEL_RANGE = 2.0
ENDPOINT_URGENCY_GAIN = 1.3
CRITICAL_ENDPOINT_FACTOR = 0.3
CRITICAL_URGENCY_GAIN = 1.5
SPEED_URGENCY_MIN = 25.0
SPEED_URGENCY_RANGE = 80.0
SLOWNESS_PROB = 0.55
SLOWNESS_EXIT_PROB = 0.45
SLOWNESS_RISE_RATE = 0.35
SLOWNESS_FALL_RATE = 0.5
SLOWNESS_CRUISE_OFFSET = 1.025
+171 -256
View File
@@ -6,129 +6,116 @@ See the LICENSE.md file in the root directory for more details.
"""
# Version = 2025-6-30
from cereal import messaging
from opendbc.car import structs
from numpy import interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
from typing import Literal
# d-e2e, from modeldata.h
TRAJECTORY_SIZE = 33
SET_MODE_TIMEOUT = 15
from cereal import messaging
from numpy import interp
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
# Define the valid mode types
ModeType = Literal['acc', 'blended']
class SmoothKalmanFilter:
"""Enhanced Kalman filter with smoothing for stable decision making."""
def clip01(value: float) -> float:
return max(0.0, min(1.0, float(value)))
def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01,
alpha=1.0, smoothing_factor=0.85):
self.x = initial_value
self.P = 1.0
self.R = measurement_noise
self.Q = process_noise
self.alpha = alpha
self.smoothing_factor = smoothing_factor
self.initialized = False
self.history = []
self.max_history = 10
self.confidence = 0.0
def add_data(self, measurement):
if len(self.history) >= self.max_history:
self.history.pop(0)
self.history.append(measurement)
class SmoothedSignal:
def __init__(self, rise_rate: float, fall_rate: float, initial_value: float = 0.0):
self.rise_rate = clip01(rise_rate)
self.fall_rate = clip01(fall_rate)
self.value = clip01(initial_value)
if not self.initialized:
self.x = measurement
self.initialized = True
self.confidence = 0.1
return
def update(self, measurement: float) -> float:
measurement = clip01(measurement)
rate = self.rise_rate if measurement > self.value else self.fall_rate
self.value += (measurement - self.value) * rate
return self.value
self.P = self.alpha * self.P + self.Q
def reset(self, value: float = 0.0) -> None:
self.value = clip01(value)
K = self.P / (self.P + self.R)
effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1
innovation = measurement - self.x
self.x = self.x + effective_K * innovation
self.P = (1 - effective_K) * self.P
class HysteresisSignal:
def __init__(self, enter_threshold: float, exit_threshold: float, rise_rate: float, fall_rate: float):
self.enter_threshold = clip01(enter_threshold)
self.exit_threshold = clip01(exit_threshold)
self.filter = SmoothedSignal(rise_rate, fall_rate)
self.active = False
if abs(innovation) < 0.1:
self.confidence = min(1.0, self.confidence + 0.05)
else:
self.confidence = max(0.1, self.confidence - 0.02)
def update(self, measurement: float) -> bool:
value = self.filter.update(measurement)
threshold = self.exit_threshold if self.active else self.enter_threshold
self.active = value > threshold
return self.active
def get_value(self):
return self.x if self.initialized else None
def reset(self) -> None:
self.filter.reset()
self.active = False
def get_confidence(self):
return self.confidence
def reset_data(self):
self.initialized = False
self.history = []
self.confidence = 0.0
@property
def value(self) -> float:
return self.filter.value
class ModeTransitionManager:
"""Manages smooth transitions between driving modes with hysteresis."""
def __init__(self):
self.current_mode: ModeType = 'acc'
self.mode_confidence = {'acc': 1.0, 'blended': 0.0}
self.transition_timeout = 0
self.min_mode_duration = 10
self.mode_duration = 0
self.emergency_override = False
self._pending_mode: ModeType = 'acc'
self._pending_count = 0
self._blended_hold_frames = 0
def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False):
# Emergency override for critical situations (stops, collisions)
if emergency:
self.emergency_override = True
self.current_mode = mode
self.transition_timeout = SET_MODE_TIMEOUT
self.mode_duration = 0
def request_mode(self, mode: ModeType, immediate: bool = False, hold_frames: int = 0, cancel_hold: bool = False) -> None:
if immediate:
self._blended_hold_frames = max(self._blended_hold_frames, hold_frames)
self._pending_mode = mode
self._pending_count = 0
self._switch_mode(mode)
return
self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence)
for m in self.mode_confidence:
if m != mode:
self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05)
if cancel_hold and mode == 'acc':
self._blended_hold_frames = 0
# Require minimum duration in current mode (unless emergency)
if self.mode_duration < self.min_mode_duration and not self.emergency_override:
if self._blended_hold_frames > 0:
mode = 'blended'
if mode == self.current_mode:
self._pending_mode = mode
self._pending_count = 0
return
# Hysteresis: higher threshold for mode changes
confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response
if mode != self._pending_mode:
self._pending_mode = mode
self._pending_count = 1
else:
self._pending_count += 1
if self.mode_confidence[mode] > confidence_threshold:
if mode != self.current_mode and self.transition_timeout == 0:
self.transition_timeout = SET_MODE_TIMEOUT
self.current_mode = mode
self.mode_duration = 0
if self.mode_duration < WMACConstants.MIN_MODE_DURATION[self.current_mode]:
return
def update(self):
if self.transition_timeout > 0:
self.transition_timeout -= 1
required_count = WMACConstants.ENTER_BLENDED_FRAMES if mode == 'blended' else WMACConstants.EXIT_BLENDED_FRAMES
if self._pending_count >= required_count:
self._switch_mode(mode)
def update(self) -> None:
if self._blended_hold_frames > 0:
self._blended_hold_frames -= 1
self.mode_duration += 1
# Reset emergency override after some time
if self.emergency_override and self.mode_duration > 20:
self.emergency_override = False
# Gradual confidence decay
for mode in self.mode_confidence:
self.mode_confidence[mode] *= 0.98
def get_mode(self) -> ModeType:
return self.current_mode
def _switch_mode(self, mode: ModeType) -> None:
if mode == self.current_mode:
return
self.current_mode = mode
self.mode_duration = 0
self._pending_mode = mode
self._pending_count = 0
class DynamicExperimentalController:
def __init__(self, CP: structs.CarParams, mpc, params=None):
@@ -142,35 +129,33 @@ class DynamicExperimentalController:
self._mode_manager = ModeTransitionManager()
# Smooth filters for stable decision making with faster response for critical scenarios
self._lead_filter = SmoothKalmanFilter(
measurement_noise=0.15,
process_noise=0.05,
alpha=1.02,
smoothing_factor=0.8
self._lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.LEAD_PROB,
exit_threshold=WMACConstants.LEAD_EXIT_PROB,
rise_rate=WMACConstants.LEAD_RISE_RATE,
fall_rate=WMACConstants.LEAD_FALL_RATE,
)
self._radar_acc_lead_tracker = HysteresisSignal(
enter_threshold=WMACConstants.RADAR_LEAD_ACC_PROB,
exit_threshold=WMACConstants.RADAR_LEAD_ACC_EXIT_PROB,
rise_rate=WMACConstants.RADAR_LEAD_ACC_RISE_RATE,
fall_rate=WMACConstants.RADAR_LEAD_ACC_FALL_RATE,
)
self._slow_down_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOW_DOWN_PROB,
exit_threshold=WMACConstants.SLOW_DOWN_EXIT_PROB,
rise_rate=WMACConstants.SLOW_DOWN_RISE_RATE,
fall_rate=WMACConstants.SLOW_DOWN_FALL_RATE,
)
self._slowness_tracker = HysteresisSignal(
enter_threshold=WMACConstants.SLOWNESS_PROB,
exit_threshold=WMACConstants.SLOWNESS_EXIT_PROB,
rise_rate=WMACConstants.SLOWNESS_RISE_RATE,
fall_rate=WMACConstants.SLOWNESS_FALL_RATE,
)
self._slow_down_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.1,
alpha=1.05,
smoothing_factor=0.7
)
self._slowness_filter = SmoothKalmanFilter(
measurement_noise=0.1,
process_noise=0.06,
alpha=1.015,
smoothing_factor=0.92
)
self._mpc_fcw_filter = SmoothKalmanFilter(
measurement_noise=0.2,
process_noise=0.1,
alpha=1.1,
smoothing_factor=0.5
)
self._has_lead_filtered = False
self._has_radar_acc_lead = False
self._has_slow_down = False
self._has_slowness = False
self._has_mpc_fcw = False
@@ -179,13 +164,14 @@ class DynamicExperimentalController:
self._has_standstill = False
self._mpc_fcw_crash_cnt = 0
self._standstill_count = 0
# debug
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
self._raw_urgency = 0.0
def _read_params(self) -> None:
if self._frame % int(1. / DT_MDL) == 0:
if self._frame % WMACConstants.PARAM_READ_FRAMES == 0:
self._enabled = self._params.get_bool("DynamicExperimentalControl")
def mode(self) -> str:
@@ -198,7 +184,6 @@ class DynamicExperimentalController:
return self._active
def set_mpc_fcw_crash_cnt(self) -> None:
"""Set MPC FCW crash count"""
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
def _update_calculations(self, sm: messaging.SubMaster) -> None:
@@ -210,179 +195,109 @@ class DynamicExperimentalController:
self._v_cruise_kph = car_state.vCruise
self._has_standstill = car_state.standstill
# standstill detection
if self._has_standstill:
self._standstill_count = min(20, self._standstill_count + 1)
self._standstill_count = min(WMACConstants.STANDSTILL_FRAMES * 3, self._standstill_count + 1)
else:
self._standstill_count = max(0, self._standstill_count - 1)
# Lead detection
self._lead_filter.add_data(float(lead_one.status))
lead_value = self._lead_filter.get_value() or 0.0
self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB
# MPC FCW detection
fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0
self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0))
self._has_mpc_fcw = fcw_filtered_value > 0.5
# Slow down detection
self._has_lead_filtered = self._lead_tracker.update(float(lead_one.status))
self._has_radar_acc_lead = self._radar_acc_lead_tracker.update(self._radar_acc_lead_score(lead_one))
self._has_mpc_fcw = self._mpc_fcw_crash_cnt > 0
self._calculate_slow_down(md)
# Slowness detection
if not (self._standstill_count > 5) and not self._has_slow_down:
if self._standstill_count > WMACConstants.STANDSTILL_FRAMES or self._has_slow_down:
self._slowness_tracker.reset()
self._has_slowness = False
else:
current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
self._slowness_filter.add_data(current_slowness)
slowness_value = self._slowness_filter.get_value() or 0.0
self._has_slowness = self._slowness_tracker.update(current_slowness)
# Hysteresis for slowness
threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1)
self._has_slowness = slowness_value > threshold
def _calculate_slow_down(self, md):
"""Calculate urgency based on trajectory endpoint vs expected distance."""
# Reset to safe defaults
urgency = 0.0
def _calculate_slow_down(self, md) -> None:
self._endpoint_x = float('inf')
self._expected_distance = 0.0
self._trajectory_valid = False
#Require exact trajectory size
position_valid = len(md.position.x) == TRAJECTORY_SIZE
orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE
urgency = self._model_action_urgency(md)
position_valid = len(md.position.x) == WMACConstants.TRAJECTORY_SIZE
if not (position_valid and orientation_valid):
# Invalid trajectory - this itself might indicate a stop scenario
# Apply moderate urgency for incomplete trajectories at speed
if self._v_ego_kph > 20.0:
urgency = 0.3
if position_valid:
self._trajectory_valid = True
self._endpoint_x = md.position.x[WMACConstants.TRAJECTORY_SIZE - 1]
self._expected_distance = interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST)
urgency = max(urgency, self._endpoint_urgency(self._endpoint_x, self._expected_distance))
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB
self._urgency = urgency_filtered
return
self._raw_urgency = clip01(urgency)
self._has_slow_down = self._slow_down_tracker.update(self._raw_urgency)
self._urgency = self._slow_down_tracker.value
# We have a valid full trajectory
self._trajectory_valid = True
def _radar_acc_lead_score(self, lead_one) -> float:
if not lead_one.status:
return 0.0
# Use the exact endpoint (33rd point, index 32)
endpoint_x = md.position.x[TRAJECTORY_SIZE - 1]
self._endpoint_x = endpoint_x
d_rel = float(getattr(lead_one, 'dRel', float('inf')))
v_rel = float(getattr(lead_one, 'vRel', 0.0))
if d_rel <= WMACConstants.RADAR_LEAD_ACC_MAX_DREL:
return 1.0
if v_rel <= WMACConstants.RADAR_LEAD_ACC_MIN_CLOSING_SPEED and d_rel / max(-v_rel, 0.1) <= WMACConstants.RADAR_LEAD_ACC_MAX_TTC:
return 1.0
return 0.0
# Get expected distance based on current speed using tuned constants
expected_distance = interp(self._v_ego_kph,
WMACConstants.SLOW_DOWN_BP,
WMACConstants.SLOW_DOWN_DIST)
self._expected_distance = expected_distance
def _model_action_urgency(self, md) -> float:
action = getattr(md, 'action', None)
if action is None:
return 0.0
# Calculate urgency based on trajectory shortage
if endpoint_x < expected_distance:
shortage = expected_distance - endpoint_x
shortage_ratio = shortage / expected_distance
urgency = 1.0 if getattr(action, 'shouldStop', False) else 0.0
desired_accel = getattr(action, 'desiredAcceleration', 0.0)
if desired_accel < WMACConstants.MODEL_DECEL_START:
urgency = max(urgency, min(1.0, (WMACConstants.MODEL_DECEL_START - desired_accel) / WMACConstants.MODEL_DECEL_RANGE))
return urgency
# Base urgency on shortage ratio
urgency = min(1.0, shortage_ratio * 2.0)
def _endpoint_urgency(self, endpoint_x: float, expected_distance: float) -> float:
if endpoint_x >= expected_distance:
return 0.0
# Increase urgency for very short trajectories (imminent stops)
critical_distance = expected_distance * 0.3
if endpoint_x < critical_distance:
urgency = min(1.0, urgency * 2.0)
shortage_ratio = (expected_distance - endpoint_x) / expected_distance
urgency = min(1.0, shortage_ratio * WMACConstants.ENDPOINT_URGENCY_GAIN)
# Speed-based urgency adjustment
if self._v_ego_kph > 25.0:
speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0
urgency = min(1.0, urgency * speed_factor)
if endpoint_x < expected_distance * WMACConstants.CRITICAL_ENDPOINT_FACTOR:
urgency = min(1.0, urgency * WMACConstants.CRITICAL_URGENCY_GAIN)
# Apply filtering but with less smoothing for stops
self._slow_down_filter.add_data(urgency)
urgency_filtered = self._slow_down_filter.get_value() or 0.0
if self._v_ego_kph > WMACConstants.SPEED_URGENCY_MIN:
speed_factor = 1.0 + (self._v_ego_kph - WMACConstants.SPEED_URGENCY_MIN) / WMACConstants.SPEED_URGENCY_RANGE
urgency = min(1.0, urgency * speed_factor)
# Update state with lower threshold for better stop detection
self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8)
self._urgency = urgency_filtered
return urgency
def _radarless_mode(self) -> None:
"""Radarless mode decision logic with emergency handling."""
def _desired_mode(self) -> tuple[ModeType, bool]:
if not self._CP.radarUnavailable and self._has_radar_acc_lead:
return 'acc', False
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
return 'blended', True
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
standstill = self._standstill_count > WMACConstants.STANDSTILL_FRAMES
urgent_slow_down = self._has_slow_down and self._raw_urgency > WMACConstants.URGENT_SLOW_DOWN_PROB
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.5)
self._mode_manager.request_mode('blended', confidence=confidence)
return
if self._CP.radarUnavailable:
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
return 'acc', False
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
if standstill or self._has_slow_down:
return 'blended', urgent_slow_down
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
def _radar_mode(self) -> None:
"""Radar mode with emergency handling."""
# EMERGENCY: MPC FCW - immediate blended mode
if self._has_mpc_fcw:
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
return
# If lead detected and not in standstill: always use ACC
if self._has_lead_filtered and not (self._standstill_count > 3):
self._mode_manager.request_mode('acc', confidence=1.0)
return
# Slow down scenarios: emergency for high urgency, normal for lower urgency
if self._has_slow_down:
if self._urgency > 0.7:
# Emergency: immediate blended mode for high urgency stops
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
else:
# Normal: blended with urgency-based confidence
confidence = min(1.0, self._urgency * 1.3)
self._mode_manager.request_mode('blended', confidence=confidence)
return
# Standstill: use blended
if self._standstill_count > 3:
self._mode_manager.request_mode('blended', confidence=0.9)
return
# Driving slow: use ACC (but not if actively slowing down)
if self._has_slowness and not self._has_slow_down:
self._mode_manager.request_mode('acc', confidence=0.8)
return
# Default: ACC
self._mode_manager.request_mode('acc', confidence=0.7)
return 'acc', False
def update(self, sm: messaging.SubMaster) -> None:
self._read_params()
self.set_mpc_fcw_crash_cnt()
self._update_calculations(sm)
if self._CP.radarUnavailable:
self._radarless_mode()
else:
self._radar_mode()
mode, immediate = self._desired_mode()
self._mode_manager.request_mode(mode, immediate=immediate, hold_frames=WMACConstants.EMERGENCY_HOLD_FRAMES,
cancel_hold=self._has_radar_acc_lead)
self._mode_manager.update()
self._active = sm['selfdriveState'].experimentalMode and self._enabled
self._frame += 1
@@ -1,94 +0,0 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
class MockLeadOne:
def __init__(self, status=0.0):
self.status = status
class MockRadarState:
def __init__(self, status=0.0):
self.leadOne = MockLeadOne(status=status)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockModelData:
def __init__(self, valid=True):
size = 33 if valid else 10 # incomplete if invalid
self.position = type("Pos", (), {"x": [0.0] * size})()
self.orientation = type("Ori", (), {"x": [0.0] * size})()
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
# Fake Kalman Filter that always returns a given value
class FakeKalman:
def __init__(self, value=1.0):
self.value = value
def add_data(self, v): pass
def get_value(self): return self.value
def get_confidence(self): return 1.0
def reset_data(self): pass
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['carState'].standstill = True
for _ in range(10):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
mock_mpc.crash_cnt = 1 # simulate FCW
for _ in range(2):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
# Force conditions to simulate slowdown
controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown
controller._v_ego_kph = 35.0
default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory
for _ in range(3):
controller.update(default_sm)
assert controller.mode() == "blended"
@@ -0,0 +1,235 @@
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, HysteresisSignal
class MockLeadOne:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.status = status
self.dRel = dRel
self.vRel = vRel
class MockRadarState:
def __init__(self, status=0.0, dRel=30.0, vRel=0.0):
self.leadOne = MockLeadOne(status=status, dRel=dRel, vRel=vRel)
class MockCarState:
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
self.vEgo = vEgo
self.vCruise = vCruise
self.standstill = standstill
class MockAction:
def __init__(self, desiredAcceleration=0.0, shouldStop=False):
self.desiredAcceleration = desiredAcceleration
self.shouldStop = shouldStop
class MockModelData:
def __init__(self, valid=True, endpoint_x=200.0, orientation_valid=None, desired_acceleration=0.0, should_stop=False):
position_size = 33 if valid else 10
orientation_size = position_size if orientation_valid is None else (33 if orientation_valid else 10)
position_x = [0.0] * position_size
if position_x:
position_x[-1] = endpoint_x
self.position = type("Pos", (), {"x": position_x})()
self.orientation = type("Ori", (), {"x": [0.0] * orientation_size})()
self.acceleration = type("Accel", (), {"x": [0.0] * position_size})()
self.action = MockAction(desired_acceleration, should_stop)
class MockSelfDriveState:
def __init__(self, experimentalMode=False):
self.experimentalMode = experimentalMode
class MockParams:
def get_bool(self, name):
return True
@pytest.fixture
def default_sm():
sm = {
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
'radarState': MockRadarState(status=1.0),
'modelV2': MockModelData(valid=True),
'selfdriveState': MockSelfDriveState(experimentalMode=True),
}
return sm
@pytest.fixture
def mock_cp():
class CP:
radarUnavailable = False
return CP()
@pytest.fixture
def mock_mpc():
class MPC:
crash_cnt = 0
return MPC()
def test_initial_mode_is_acc(mock_cp, mock_mpc):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
assert controller.mode() == "acc"
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['carState'].standstill = True
for _ in range(20):
controller.update(default_sm)
assert controller.mode() == "blended"
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
mock_mpc.crash_cnt = 1
controller.update(default_sm)
assert controller.mode() == "blended"
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
def test_valid_position_with_missing_orientation_can_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, orientation_valid=False)
controller.update(default_sm)
assert controller._trajectory_valid
assert controller.mode() == "blended"
def test_incomplete_position_does_not_trigger_slowdown(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert not controller._trajectory_valid
assert not controller._has_slow_down
assert controller.mode() == "acc"
def test_slowdown_hysteresis_prevents_threshold_chatter():
signal = HysteresisSignal(enter_threshold=0.5, exit_threshold=0.4, rise_rate=1.0, fall_rate=1.0)
assert signal.update(0.55)
assert signal.update(0.45)
assert not signal.update(0.35)
def test_model_should_stop_triggers_blended_without_valid_trajectory(mock_cp, mock_mpc, default_sm):
mock_cp.radarUnavailable = True
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=False, should_stop=True)
controller.update(default_sm)
assert not controller._trajectory_valid
assert controller.mode() == "blended"
def test_radar_lead_keeps_acc_over_model_slowdown(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(3):
controller.update(default_sm)
assert controller._has_slow_down
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_far_radar_lead_allows_blended_until_acc_relevant(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert not controller._has_radar_acc_lead
assert controller.mode() == "blended"
def test_relevant_radar_lead_smoothly_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller.mode() == "blended"
default_sm['radarState'] = MockRadarState(status=1.0, dRel=45.0, vRel=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_closing_far_radar_lead_returns_to_acc(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0, dRel=120.0, vRel=-25.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
for _ in range(20):
controller.update(default_sm)
assert controller._has_radar_acc_lead
assert controller.mode() == "acc"
def test_radar_lead_keeps_acc_over_fcw_and_standstill(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
default_sm['carState'].standstill = True
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0, should_stop=True)
mock_mpc.crash_cnt = 1
for _ in range(10):
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller._has_mpc_fcw
assert controller.mode() == "acc"
def test_lead_flicker_hold_prevents_one_frame_mode_flip(mock_cp, mock_mpc, default_sm):
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
default_sm['radarState'] = MockRadarState(status=1.0)
controller.update(default_sm)
default_sm['radarState'] = MockRadarState(status=0.0)
default_sm['modelV2'] = MockModelData(valid=True, endpoint_x=0.0)
controller.update(default_sm)
assert controller._has_lead_filtered
assert controller.mode() == "acc"
@@ -9,6 +9,8 @@ from cereal import messaging, custom
from opendbc.car import structs
from openpilot.common.constants import CV
from openpilot.selfdrive.car.cruise import V_CRUISE_MAX
from openpilot.sunnypilot.selfdrive.controls.lib.accel_personality.accel_controller import AccelController
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import RadarDistanceController
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
from openpilot.sunnypilot.selfdrive.controls.lib.e2e_alerts_helper import E2EAlertsHelper
from openpilot.sunnypilot.selfdrive.controls.lib.smart_cruise_control.smart_cruise_control import SmartCruiseControl
@@ -26,6 +28,8 @@ class LongitudinalPlannerSP:
self.events_sp = EventsSP()
self.resolver = SpeedLimitResolver()
self.dec = DynamicExperimentalController(CP, mpc)
self.accel = AccelController(CP, mpc)
self.radar_distance = RadarDistanceController(CP)
self.scc = SmartCruiseControl()
self.resolver = SpeedLimitResolver()
self.sla = SpeedLimitAssist(CP, CP_SP)
@@ -76,8 +80,13 @@ class LongitudinalPlannerSP:
def update(self, sm: messaging.SubMaster) -> None:
self.events_sp.clear()
self.dec.update(sm)
self.accel.update(sm)
self.radar_distance.update(sm)
self.e2e_alerts_helper.update(sm, self.events_sp)
def smooth_radarstate(self, radarstate):
return self.radar_distance.smooth_radarstate(radarstate)
def publish_longitudinal_plan_sp(self, sm: messaging.SubMaster, pm: messaging.PubMaster) -> None:
plan_sp_send = messaging.new_message('longitudinalPlanSP')
@@ -138,4 +147,17 @@ class LongitudinalPlannerSP:
e2eAlerts.greenLightAlert = self.e2e_alerts_helper.green_light_alert
e2eAlerts.leadDepartAlert = self.e2e_alerts_helper.lead_depart_alert
# Acceleration Personality (telemetry only; brakeNeed/decelTarget/smoothActive repurposed for the
# input-shaping design -- see cereal custom.capnp Acceleration).
acceleration = longitudinalPlanSP.acceleration
acceleration.personality = self.accel.personality()
acceleration.enabled = self.accel.enabled()
acceleration.maxAccel = float(self.accel.max_accel())
acceleration.brakeNeed = float(self.accel.follow_widen()) # follow-gap widen added on top of stock (s)
acceleration.decelTarget = float(self.accel.t_follow()) # t_follow handed to the MPC (s)
acceleration.smoothActive = self.accel.widen_active() # follow-gap widen currently active
acceleration.bypassed = False # unused (no output shaping / bypass anymore)
acceleration.leadUnstable = bool(self.radar_distance.lead_unstable())
pm.send('longitudinalPlanSP', plan_sp_send)
@@ -0,0 +1,229 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
RadarDistance de-noises the lead the longitudinal MPC follows on a noisy (TSS2-class) radar. It NEVER
reports a farther-or-faster lead than reality, so braking is always >= stock, and it changes nothing about
the desired gap -- that is the AccelController's job (t_follow). Two mechanisms only:
* flicker-hold: keep a just-dropped, recently-sustained lead alive (dead-reckoned) through a brief radar
dropout so the MPC does not lose and re-grab it (which reads as a phantom release then a catch-up brake);
* churn smoother: a short SYMMETRIC EMA on a trackId-churning lead's dRel/vLead/vRel so the MPC stops
hunting the gap (removes the follow-jitter that reads as rubber-banding).
Also publishes a read-only lead-instability flag (telemetry). Below LOW_SPEED_PASSTHROUGH_V it is a
byte-stock passthrough (stop distance stays exactly stock). Disabled => byte-stock passthrough always.
"""
from collections import deque
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
HOLD_MAX_FRAMES = 20 # ~1.0s flicker-hold cap, since the last sustained lead
SUSTAIN_FRAMES = 2
DROPOUT_DREL = 1.0
FCW_PROB_CAP = 0.9
MIN_HELD_DREL = 0.5
LOW_SPEED_PASSTHROUGH_V = 5.0 # m/s: below this, no flicker-hold (holding a stale lead near a stop would
# delay the launch); the churn smoother still runs down to CREEP_PASSTHROUGH_V
CREEP_PASSTHROUGH_V = 1.0 # m/s: below this, full byte-stock passthrough (protect the stock stop distance)
SWITCH_DREL = 8.0 # m, dRel jump = a track switch (used by the instability detector)
# Lead-instability detector (telemetry only): flags a bimodal/bouncing radar lead.
STABILITY_WINDOW = 5 # frames (~0.25s @ 20Hz)
VLEAD_SPREAD = 4.0 # m/s, vLead range over the window above which the lead is "unstable"
ID_CHURN_WINDOW = 10 # frames (~0.5s) for radarTrackId-churn detection (steady lead, flipping ids)
ID_CHURN = 3 # trackId switches in the window above which the lead is "unstable" (follow-hunting)
# Churn smoother: during trackId churn the per-track dRel/vRel jitter makes the MPC hunt the follow gap. A
# short SYMMETRIC EMA on the churning lead removes the jitter so the MPC sees a steady lead. Active ONLY
# during churn (NOT bimodal vLead -> never averages two real tracks). Bounded symmetric lag ~LEAD_SMOOTH_TAU.
LEAD_SMOOTH_TAU = 0.5 # s, EMA time constant
LEAD_SMOOTH_HOLD = 20 # frames (~1s): keep smoothing through brief churn gaps (churn toggles on/off)
class _SmoothedLead:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'vLeadK', 'aLeadK', 'aLeadTau', 'modelProb')
def __init__(self, src, dRel, vLead, vRel):
self.status = src.status
self.dRel = dRel
self.yRel = src.yRel
self.vRel = vRel
self.vLead = vLead
self.vLeadK = vLead
self.aLeadK = src.aLeadK
self.aLeadTau = src.aLeadTau
self.modelProb = src.modelProb
class _HeldLead:
__slots__ = ('status', 'dRel', 'yRel', 'vRel', 'vLead', 'vLeadK', 'aLeadK', 'aLeadTau', 'modelProb')
def __init__(self, dRel, vRel, vLead, aLeadK, aLeadTau, modelProb):
self.status = True
self.dRel = dRel
self.vRel = vRel
self.vLead = vLead
self.vLeadK = vLead
self.aLeadK = aLeadK
self.aLeadTau = aLeadTau
self.modelProb = modelProb
self.yRel = 0.0
class _RadarStateProxy:
__slots__ = ('leadOne', 'leadTwo')
def __init__(self, lead_one, lead_two):
self.leadOne = lead_one
self.leadTwo = lead_two
class _LeadSmoother:
# Short symmetric EMA on a churning lead's dRel/vLead/vRel (jitter removal). A hold keeps it active through
# brief churn gaps (churn toggles); passthrough + reset only after the hold lapses.
def __init__(self):
self._d = None
self._vl = None
self._vr = None
self._hold = 0
def reset(self):
self._d = None
self._vl = None
self._vr = None
self._hold = 0
def update(self, lead, churn: bool):
self._hold = LEAD_SMOOTH_HOLD if churn else self._hold - 1
if self._hold <= 0 or not lead.status:
self.reset()
return lead
if self._d is None:
self._d, self._vl, self._vr = lead.dRel, lead.vLead, lead.vRel
return lead
a = DT_MDL / LEAD_SMOOTH_TAU
self._d += (lead.dRel - self._d) * a
self._vl += (lead.vLead - self._vl) * a
self._vr += (lead.vRel - self._vr) * a
return _SmoothedLead(lead, self._d, self._vl, self._vr)
class _LeadHold:
def __init__(self):
self._last = None
self._sustained = 0
self._since_real = 0
self._armed = False
self._held_dRel = 0.0
def reset(self):
self.__init__()
def step(self, raw):
if raw.status and raw.dRel > DROPOUT_DREL:
self._last = (raw.dRel, raw.vRel, raw.vLead, raw.aLeadK, raw.aLeadTau, raw.modelProb)
self._sustained += 1
if self._sustained >= SUSTAIN_FRAMES:
self._since_real = 0
self._armed = True
return raw
self._sustained = 0
self._since_real += 1
if self._armed and self._last is not None and self._since_real <= HOLD_MAX_FRAMES:
dRel0, vRel0, vLead0, aLeadK0, aLeadTau0, prob0 = self._last
if self._since_real == 1:
self._held_dRel = dRel0
self._held_dRel = max(MIN_HELD_DREL, self._held_dRel - max(-vRel0, 0.0) * DT_MDL)
return _HeldLead(self._held_dRel, vRel0, vLead0, min(aLeadK0, 0.0), aLeadTau0, min(prob0, FCW_PROB_CAP))
self._armed = False
return raw
class _LeadStability:
# Read-only monitor: flags an unstable leadOne -- bimodal/bouncing vLead, dRel track-switch jumps, or
# radarTrackId churn (a steady lead flipping track ids -> vRel jitter -> follow-hunting). Telemetry only.
def __init__(self):
self._v = deque(maxlen=STABILITY_WINDOW)
self._d = deque(maxlen=STABILITY_WINDOW)
self._id = deque(maxlen=ID_CHURN_WINDOW)
self.unstable = False
self.churn = False
def reset(self):
self._v.clear()
self._d.clear()
self._id.clear()
self.unstable = False
self.churn = False
def update(self, lead, v_ego: float) -> None:
if not lead.status or v_ego < CREEP_PASSTHROUGH_V:
self.reset()
return
self._v.append(float(lead.vLead))
self._d.append(float(lead.dRel))
self._id.append(int(getattr(lead, 'radarTrackId', -1)))
if len(self._v) < STABILITY_WINDOW:
self.unstable = False
return
v_spread = max(self._v) - min(self._v)
d_jumps = sum(abs(b - a) > SWITCH_DREL for a, b in zip(self._d, list(self._d)[1:], strict=False))
ids = list(self._id)
id_churn = sum(1 for a, b in zip(ids, ids[1:], strict=False) if a != b and a > 0 and b > 0)
self.churn = id_churn >= ID_CHURN and v_spread <= VLEAD_SPREAD # steady lead, flipping ids (not bimodal)
self.unstable = v_spread > VLEAD_SPREAD or d_jumps >= 2 or self.churn
class RadarDistanceController:
def __init__(self, CP: structs.CarParams, params=None):
# CP accepted for the planner's constructor signature; unused.
self._params = params or Params()
self._frame = 0
self._v_ego = 0.0
self._enabled = self._params.get_bool("RadarDistance")
self._one = _LeadHold()
self._two = _LeadHold()
self._stability = _LeadStability()
self._smoother = _LeadSmoother()
def _read_params(self) -> None:
enabled = self._params.get_bool("RadarDistance")
if not enabled and self._enabled:
self._one.reset()
self._two.reset()
self._smoother.reset()
self._enabled = enabled
def update(self, sm) -> None:
if self._frame % int(1. / DT_MDL) == 0:
self._read_params()
self._v_ego = float(sm['carState'].vEgo)
self._frame += 1
def enabled(self) -> bool:
return self._enabled
def lead_unstable(self) -> bool:
return self._stability.unstable
def smooth_radarstate(self, radarstate):
self._stability.update(radarstate.leadOne, self._v_ego) # telemetry, runs every cycle
if not self._enabled or self._v_ego < CREEP_PASSTHROUGH_V:
return radarstate # off / full standstill: byte-stock (stock stop)
if self._v_ego < LOW_SPEED_PASSTHROUGH_V:
# creep band: churn de-jitter ONLY (symmetric EMA, mean-preserving), no flicker-hold. Smooths the
# radar jitter that makes stop-and-go feel like gas-brake-gas-brake, without holding a stale lead.
one = self._smoother.update(radarstate.leadOne, self._stability.churn)
return radarstate if one is radarstate.leadOne else _RadarStateProxy(one, radarstate.leadTwo)
one = self._one.step(radarstate.leadOne) # >= LOW_SPEED: flicker-hold ...
two = self._two.step(radarstate.leadTwo)
one = self._smoother.update(one, self._stability.churn) # ... + churn de-jitter (anti follow-hunt)
return _RadarStateProxy(one, two)
@@ -0,0 +1,241 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
RadarDistance is a pure lead DE-NOISER: flicker-hold + churn smoother + instability telemetry, and nothing
else (no dRel biasing). These tests pin: off / low-speed == byte-stock (stock stop distance); the hold is
obstacle-monotone (brake >= stock) and bounded; the churn smoother de-jitters only a track-flipping lead;
and the instability flag is telemetry that runs regardless of the gate.
"""
from types import SimpleNamespace
import pytest
from openpilot.sunnypilot.selfdrive.controls.lib.radar_distance.radar_distance import \
RadarDistanceController, HOLD_MAX_FRAMES, FCW_PROB_CAP, LOW_SPEED_PASSTHROUGH_V, CREEP_PASSTHROUGH_V, DROPOUT_DREL
COMFORT_BRAKE = 2.5
class FakeParams:
def __init__(self, store=None):
self.store = dict(store or {})
def get_bool(self, key):
return bool(self.store.get(key, False))
def lead(status=True, dRel=40.0, vRel=-2.0, vLead=18.0, aLeadK=0.0, aLeadTau=1.5, modelProb=0.95, radarTrackId=-1):
return SimpleNamespace(status=status, dRel=dRel, yRel=0.0, vRel=vRel, vLead=vLead, vLeadK=vLead,
aLeadK=aLeadK, aLeadTau=aLeadTau, modelProb=modelProb, radarTrackId=radarTrackId)
def rs(one, two=None):
return SimpleNamespace(leadOne=one, leadTwo=two or lead(status=False, dRel=0.0, modelProb=0.0))
def obstacle(ld):
return ld.dRel + ld.vLead ** 2 / (2 * COMFORT_BRAKE)
def ctrl(enabled=True, v_ego=10.0):
c = RadarDistanceController(CP=SimpleNamespace(), params=FakeParams({'RadarDistance': enabled}))
c._v_ego = v_ego # above the low-speed gate so the hold + smoother run
return c
def churn_frames(n, d_a=40.0, d_b=42.0, vLead=18.0):
# a steady lead whose radarTrackId flips every frame (dRel jitters with it) -> the churn detector fires and
# the smoother should de-jitter dRel. vLead is steady so it is NOT flagged bimodal (never averages 2 tracks).
for i in range(n):
even = i % 2 == 0
yield lead(dRel=d_a if even else d_b, vLead=vLead, vRel=-1.0, radarTrackId=1 if even else 2)
# --- off / low-speed == byte-stock ------------------------------------------------------------------------
def test_disabled_is_identity():
c = ctrl(enabled=False)
r = rs(lead())
assert c.smooth_radarstate(r) is r # byte-stock passthrough
def test_valid_lead_passthrough():
c = ctrl()
one = lead(dRel=40.0)
out = c.smooth_radarstate(rs(one))
assert out.leadOne is one # clean lead, no churn -> unchanged
def test_full_standstill_returns_raw_object():
# Full standstill (< CREEP_PASSTHROUGH_V): ENABLED returns the EXACT raw radarstate object (byte-identical
# to OFF) so the stock stop distance is preserved and no stale lead is held near a stop.
c = ctrl(v_ego=CREEP_PASSTHROUGH_V - 0.5)
r = rs(lead(dRel=3.0, vLead=0.5))
assert c.smooth_radarstate(r) is r
def test_creep_dejitters_churn_but_no_hold():
# Creep band [CREEP, LOW_SPEED): the churn smoother runs (de-jitter -> smooth stop-and-go), but the
# flicker-hold does NOT (a dropped/departed lead must not be held, or launch would be delayed).
c = ctrl(v_ego=(CREEP_PASSTHROUGH_V + LOW_SPEED_PASSTHROUGH_V) / 2)
out = None
for f in churn_frames(30, d_a=6.0, d_b=8.0, vLead=1.0):
out = c.smooth_radarstate(rs(f))
assert 6.0 < out.leadOne.dRel < 8.0 # jitter smoothed
# a dropout in the creep band is NOT held -> raw passes through (no stale lead)
drop = rs(lead(status=False, dRel=0.0, modelProb=0.0))
assert c.smooth_radarstate(drop) is drop
def test_creep_clean_lead_passthrough():
# creep band, steady single lead (no churn) -> smoother inert -> exact raw object (stop distance unbiased)
c = ctrl(v_ego=(CREEP_PASSTHROUGH_V + LOW_SPEED_PASSTHROUGH_V) / 2)
r = rs(lead(dRel=4.0, vLead=1.5, radarTrackId=3))
assert c.smooth_radarstate(r) is r
def test_low_speed_override_lead_passthrough():
# radard low_speed_override emits a real closest-track lead with modelProb=0.0. It must be honored, not
# rejected in favor of a stale farther held lead (which would under-brake / stop too close).
c = ctrl()
one = lead(status=True, dRel=2.5, vRel=0.0, vLead=0.0, modelProb=0.0)
out = c.smooth_radarstate(rs(one))
assert out.leadOne is one
# --- flicker-hold -----------------------------------------------------------------------------------------
def test_holds_after_sustained_dropout():
c = ctrl()
for _ in range(3):
c.smooth_radarstate(rs(lead(dRel=30.0, vRel=-4.0, vLead=16.0)))
held = c.smooth_radarstate(rs(lead(status=False, dRel=0.0, modelProb=0.0))).leadOne
assert held.status is True
assert held.dRel < 30.0 # dead-reckoned closer
assert held.dRel == pytest.approx(30.0 - 4.0 * 0.05, abs=1e-6)
def test_no_hold_without_sustained_lead():
c = ctrl()
c.smooth_radarstate(rs(lead(dRel=30.0))) # single frame < SUSTAIN_FRAMES
out = c.smooth_radarstate(rs(lead(status=False, dRel=0.0, modelProb=0.0)))
assert out.leadOne.status is False # no hold armed
def test_releases_after_hold_cap():
c = ctrl()
for _ in range(3):
c.smooth_radarstate(rs(lead(dRel=30.0, vRel=-2.0)))
drop = rs(lead(status=False, dRel=0.0, modelProb=0.0))
for _ in range(HOLD_MAX_FRAMES):
assert c.smooth_radarstate(drop).leadOne.status is True
assert c.smooth_radarstate(drop).leadOne.status is False # released after the cap
def test_obstacle_monotone_during_hold():
c = ctrl()
for _ in range(3):
real = lead(dRel=30.0, vRel=-3.0, vLead=15.0)
c.smooth_radarstate(rs(real))
base = obstacle(real)
drop = rs(lead(status=False, dRel=0.0, modelProb=0.0))
prev = base
for _ in range(HOLD_MAX_FRAMES):
held = c.smooth_radarstate(drop).leadOne
assert obstacle(held) <= prev + 1e-6 # never reports a farther obstacle -> brake >= stock
prev = obstacle(held)
def test_fcw_prob_capped_and_aleadk_not_positive():
c = ctrl()
for _ in range(3):
c.smooth_radarstate(rs(lead(dRel=30.0, aLeadK=1.5, modelProb=0.99)))
held = c.smooth_radarstate(rs(lead(status=False, dRel=0.0, modelProb=0.0))).leadOne
assert held.modelProb <= FCW_PROB_CAP
assert held.aLeadK <= 0.0
def test_flicker_does_not_reset_wall_clock():
c = ctrl()
for _ in range(3):
c.smooth_radarstate(rs(lead(dRel=30.0, vRel=-2.0)))
# alternating drop/reacquire must not refill the hold budget: after > HOLD_MAX_FRAMES wall time it releases
for i in range(HOLD_MAX_FRAMES + 4):
frame = rs(lead(status=False, dRel=0.0, modelProb=0.0)) if i % 2 else rs(lead(dRel=0.5)) # dRel<=DROPOUT: not real
c.smooth_radarstate(frame)
out = c.smooth_radarstate(rs(lead(status=False, dRel=0.0, modelProb=0.0)))
assert out.leadOne.status is False
assert DROPOUT_DREL == 1.0
# --- churn smoother ---------------------------------------------------------------------------------------
def test_churn_smoother_removes_jitter():
c = ctrl()
out = None
for f in churn_frames(30):
out = c.smooth_radarstate(rs(f))
assert c.lead_unstable() # churn detected
assert 40.0 < out.leadOne.dRel < 42.0 # EMA settled between the two jittering tracks
assert out.leadOne.dRel not in (40.0, 42.0) # not the raw alternating value
def test_churn_smoother_off_when_disabled():
c = ctrl(enabled=False)
out = None
for f in churn_frames(30):
r = rs(f)
out = c.smooth_radarstate(r)
assert out is r # disabled -> raw passthrough, no smoothing
def test_smoother_inactive_without_churn():
c = ctrl()
one = lead(dRel=40.0, radarTrackId=7)
for _ in range(10):
out = c.smooth_radarstate(rs(lead(dRel=40.0, radarTrackId=7)))
out = c.smooth_radarstate(rs(one))
assert out.leadOne is one # steady id -> no churn -> exact passthrough
# --- instability telemetry --------------------------------------------------------------------------------
def test_stability_quiet_on_clean_lead():
c = ctrl()
for _ in range(10):
c.smooth_radarstate(rs(lead(dRel=40.0, vLead=18.0, radarTrackId=5)))
assert not c.lead_unstable()
def test_stability_flags_bimodal_lead():
c = ctrl()
for i in range(10):
c.smooth_radarstate(rs(lead(dRel=40.0, vLead=18.0 if i % 2 else 10.0, radarTrackId=5)))
assert c.lead_unstable()
def test_stability_flags_trackid_churn():
c = ctrl()
for f in churn_frames(20):
c.smooth_radarstate(rs(f))
assert c.lead_unstable()
def test_stability_resets_on_dropout():
c = ctrl()
for i in range(10):
c.smooth_radarstate(rs(lead(dRel=40.0, vLead=18.0 if i % 2 else 10.0)))
assert c.lead_unstable()
c.smooth_radarstate(rs(lead(status=False, dRel=0.0, modelProb=0.0)))
assert not c.lead_unstable()
def test_stability_runs_even_when_disabled():
c = ctrl(enabled=False)
for i in range(10):
c.smooth_radarstate(rs(lead(dRel=40.0, vLead=18.0 if i % 2 else 10.0)))
assert c.lead_unstable() # telemetry not gated by the RadarDistance param
+106
View File
@@ -0,0 +1,106 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import numpy as np
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.common.params import Params
NEARSIDE_PROB = 0.25
EDGE_PROB = 0.35
EDGE_REACTION_TIME = 1.0
EDGE_CLEAR_TIME = 0.3
MIN_SPEED = 20 * CV.MPH_TO_MS
NEAR_EDGE_DISTANCE = 4.5
LEFT_NEARSIDE_LANE_IDX = 1
RIGHT_NEARSIDE_LANE_IDX = 2
class RoadEdgeLaneChangeController:
def __init__(self, desire_helper):
self.DH = desire_helper
self.params = Params()
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
self.param_read_counter = 0
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def read_params(self) -> None:
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
def update_params(self) -> None:
if self.param_read_counter % 50 == 0:
self.read_params()
self.param_read_counter += 1
def reset(self) -> None:
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
@staticmethod
def _road_edge_y(road_edges, idx: int) -> float | None:
if road_edges is None or len(road_edges) <= idx or len(road_edges[idx].y) == 0:
return None
return road_edges[idx].y[0]
@staticmethod
def _edge_is_near(edge_y: float | None, left: bool) -> bool:
if edge_y is None:
return False
if left:
return bool(-NEAR_EDGE_DISTANCE < edge_y < 0.0)
return bool(0.0 < edge_y < NEAR_EDGE_DISTANCE)
def update(self, road_edge_stds, lane_line_probs, v_ego: float, road_edges=None) -> None:
self.update_params()
if not self.enabled or v_ego < MIN_SPEED:
self.reset()
return
left_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
right_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
left_lane_prob = lane_line_probs[LEFT_NEARSIDE_LANE_IDX]
right_lane_prob = lane_line_probs[RIGHT_NEARSIDE_LANE_IDX]
left_edge_y = self._road_edge_y(road_edges, 0)
right_edge_y = self._road_edge_y(road_edges, 1)
left_edge_near = self._edge_is_near(left_edge_y, True)
right_edge_near = self._edge_is_near(right_edge_y, False)
left_cond = left_edge_prob > EDGE_PROB and (left_edge_near or (left_edge_y is None and left_lane_prob < NEARSIDE_PROB))
right_cond = right_edge_prob > EDGE_PROB and (right_edge_near or (right_edge_y is None and right_lane_prob < NEARSIDE_PROB))
if left_cond:
self.left_edge_timer = min(self.left_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.left_clear_timer = 0.0
if self.left_edge_timer > EDGE_REACTION_TIME:
self.left_edge_detected = True
else:
self.left_clear_timer += DT_MDL
if self.left_clear_timer > EDGE_CLEAR_TIME:
self.left_edge_timer = 0.0
self.left_edge_detected = False
if right_cond:
self.right_edge_timer = min(self.right_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.right_clear_timer = 0.0
if self.right_edge_timer > EDGE_REACTION_TIME:
self.right_edge_detected = True
else:
self.right_clear_timer += DT_MDL
if self.right_clear_timer > EDGE_CLEAR_TIME:
self.right_edge_timer = 0.0
self.right_edge_detected = False
@@ -5,6 +5,8 @@ from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.lane_turn_desire import LaneTurnController, LANE_CHANGE_SPEED_MIN
from openpilot.sunnypilot.selfdrive.controls.lib.auto_lane_change import AutoLaneChangeMode
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
TurnDirection = custom.ModelDataV2SP.TurnDirection
@@ -107,7 +109,11 @@ def set_lane_turn_params():
])
def test_desire_helper_integration(carstate, lateral_active, lane_change_prob, expected_desire, set_lane_turn_params):
dh = DesireHelper()
relc = RoadEdgeLaneChangeController(dh)
relc.enabled = True
dh.alc.lane_change_set_timer = AutoLaneChangeMode.NUDGE
for _ in range(10):
dh.update(carstate, lateral_active, lane_change_prob)
dh.update(carstate, lateral_active, lane_change_prob,
left_edge_detected=relc.left_edge_detected, right_edge_detected=relc.right_edge_detected)
assert dh.desire == expected_desire # The first four tests were unit tests to test the controller, where this tests the integration in desire helpers
@@ -0,0 +1,133 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.relc import (
RoadEdgeLaneChangeController, EDGE_REACTION_TIME, EDGE_CLEAR_TIME, MIN_SPEED,
)
V_HIGH = MIN_SPEED + 2.0
V_LOW = MIN_SPEED - 1.0
class DummyRoadEdge:
def __init__(self, y):
self.y = [y]
@pytest.fixture
def relc(mocker):
mock_params = mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.relc.Params")
mock_params.return_value.get_bool.return_value = True
controller = RoadEdgeLaneChangeController(DesireHelper())
controller.enabled = True
return controller
def make_road_edges(left_y=-3.0, right_y=3.0):
return [DummyRoadEdge(left_y), DummyRoadEdge(right_y)]
def drive(controller, road_edge_stds, lane_line_probs, seconds, v_ego=V_HIGH, road_edges=None):
for _ in range(int(seconds / DT_MDL) + 1):
controller.update(road_edge_stds, lane_line_probs, v_ego, road_edges)
@pytest.mark.parametrize("road_edge_stds,lane_line_probs,attr", [
([0.0, 0.9], [0.8, 0.0, 0.8, 0.8], "left_edge_detected"),
([0.9, 0.0], [0.8, 0.8, 0.0, 0.8], "right_edge_detected"),
])
def test_edge_detection(relc, road_edge_stds, lane_line_probs, attr):
drive(relc, road_edge_stds, lane_line_probs, EDGE_REACTION_TIME + 0.1)
assert getattr(relc, attr)
def test_edge_detection_requires_time(relc):
drive(relc, [0.0, 0.9], [0.8, 0.0, 0.8, 0.8], EDGE_REACTION_TIME - 0.05)
assert not relc.left_edge_detected
def test_both_edges_detected(relc):
drive(relc, [0.0, 0.0], [0.8, 0.0, 0.0, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
assert relc.right_edge_detected
def test_noise_doesnt_clear(relc):
edge = ([0.0, 0.9], [0.8, 0.0, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update(*clear, V_HIGH)
relc.update(*edge, V_HIGH)
assert relc.left_edge_detected
def test_clears_after_window(relc):
edge = ([0.0, 0.9], [0.8, 0.0, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
drive(relc, *clear, EDGE_CLEAR_TIME + 0.05)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_low_speed_skips(relc):
drive(relc, [0.0, 0.9], [0.8, 0.0, 0.8, 0.8], EDGE_REACTION_TIME + 0.1, v_ego=V_LOW)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_speed_drop_resets(relc):
drive(relc, [0.0, 0.9], [0.8, 0.0, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update([0.0, 0.9], [0.8, 0.0, 0.8, 0.8], V_LOW)
assert not relc.left_edge_detected
def test_param_off_resets(relc):
drive(relc, [0.0, 0.9], [0.8, 0.0, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.params.get_bool.return_value = False
relc.read_params()
relc.update([0.0, 0.9], [0.8, 0.0, 0.8, 0.8], V_HIGH)
assert not relc.left_edge_detected
assert not relc.right_edge_detected
@pytest.mark.parametrize("lane_line_probs", [
[0.0, 0.8, 0.8, 0.8],
[0.8, 0.8, 0.8, 0.0],
])
def test_outer_lane_lines_do_not_drive_edge_detection(relc, lane_line_probs):
drive(relc, [0.0, 0.0], lane_line_probs, EDGE_REACTION_TIME + 0.1)
assert not relc.left_edge_detected
assert not relc.right_edge_detected
@pytest.mark.parametrize("road_edge_stds,road_edges,attr", [
([0.0, 0.9], make_road_edges(left_y=-3.0, right_y=8.0), "left_edge_detected"),
([0.9, 0.0], make_road_edges(left_y=-8.0, right_y=3.0), "right_edge_detected"),
])
def test_near_road_edge_geometry_blocks_with_visible_lane_lines(relc, road_edge_stds, road_edges, attr):
drive(relc, road_edge_stds, [0.8, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1, road_edges=road_edges)
assert getattr(relc, attr)
def test_far_road_edge_geometry_does_not_block(relc):
drive(relc, [0.0, 0.0], [0.8, 0.0, 0.0, 0.8], EDGE_REACTION_TIME + 0.1, road_edges=make_road_edges(left_y=-8.0, right_y=8.0))
assert not relc.left_edge_detected
assert not relc.right_edge_detected
@@ -243,4 +243,12 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.prompt, 3.),
},
EventNameSP.laneChangeRoadEdge: {
ET.WARNING: Alert(
"Lane Change Unavailable: Road Edge",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 0.1),
},
}
+32
View File
@@ -1,4 +1,12 @@
{
"AccelPersonality": {
"title": "Acceleration Profile",
"description": "Eco takes off gently and keeps the roomiest, smoothest following gap; Sport takes off briskly and follows a touch tighter. Hard-braking authority is always preserved."
},
"AccelPersonalityEnabled": {
"title": "Enable Acceleration Profiles",
"description": "Enables Eco/Normal/Sport profiles -- a tier-scaled take-off from a stop and a slightly wider, smoother following gap. Braking is never reduced below stock."
},
"AccessToken": {
"title": "AccessTokenIsNice",
"description": ""
@@ -1098,6 +1106,10 @@
"title": "Quiet Mode",
"description": ""
},
"RadarDistance": {
"title": "Radar Distance",
"description": "De-noises the lead sunnypilot follows -- holds it through brief radar flicker/dropout and smooths a jittery (track-flipping) lead -- so it does not lose and re-grab the lead or hunt the gap. Braking is never reduced below stock."
},
"RainbowMode": {
"title": "Rainbow Mode",
"description": ""
@@ -1118,6 +1130,10 @@
"title": "Record Front Lock",
"description": ""
},
"RoadEdgeLaneChangeEnabled": {
"title": "Block Lane Change: Road Edge Detection",
"description": "Blocks lane changes when the model sees a road edge on the signaled side."
},
"RoadName": {
"title": "Road Name",
"description": ""
@@ -1324,14 +1340,30 @@
"step": 0.1,
"unit": "m/s\u00b2"
},
"ToyotaAutoHold": {
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"description": ""
},
"ToyotaDriveMode": {
"title": "Enable drive mode btn link",
"description": ""
},
"ToyotaEnforceStockLongitudinal": {
"title": "Toyota: Enforce Factory Longitudinal Control",
"description": "When enabled, sunnypilot will not take over control of gas and brakes. Factory Toyota longitudinal control will be used."
},
"ToyotaEnhancedBsm": {
"title": "Toyota: Prius TSS2 BSM and some tssp",
"description": ""
},
"ToyotaStopAndGoHack": {
"title": "Toyota: Stop and Go Hack (Alpha)",
"description": "sunnypilot will allow some Toyota/Lexus cars to auto resume during stop and go traffic. This feature is only applicable to certain models that are able to use longitudinal control. This is an alpha feature. Use at your own risk."
},
"ToyotaTSS2Long": {
"title": "Toyota: custom longitudinal for TSS2",
"description": ""
},
"TrainingVersion": {
"title": "Training Version",
"description": ""
+138
View File
@@ -537,6 +537,12 @@
"value": 0
}
]
},
{
"key": "RoadEdgeLaneChangeEnabled",
"widget": "toggle",
"title": "Block Lane Change: Road Edge Detection",
"description": "Blocks the lane change if the model sees a road edge on your signaled side."
}
]
}
@@ -587,6 +593,26 @@
}
]
},
{
"key": "RadarDistance",
"widget": "toggle",
"title": "Radar Distance",
"description": "De-noises the lead sunnypilot follows -- holds it through brief radar flicker/dropout and smooths a jittery (track-flipping) lead -- so it does not lose and re-grab the lead or hunt the gap. Braking is never reduced below stock.",
"visibility": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
],
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "DisengageOnAccelerator",
"widget": "toggle",
@@ -620,6 +646,58 @@
}
]
},
{
"key": "AccelPersonalityEnabled",
"widget": "toggle",
"title": "Enable Acceleration Profiles",
"description": "Enables Eco/Normal/Sport profiles -- a tier-scaled take-off from a stop and a slightly wider, smoother following gap. Braking is never reduced below stock.",
"visibility": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
],
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
}
]
},
{
"key": "AccelPersonality",
"widget": "multiple_button",
"title": "Acceleration Profile",
"description": "Eco takes off gently and keeps the roomiest, smoothest following gap; Sport takes off briskly and follows a touch tighter. Hard-braking authority is always preserved.",
"options": [
{
"value": 0,
"label": "Eco"
},
{
"value": 1,
"label": "Normal"
},
{
"value": 2,
"label": "Sport"
}
],
"enablement": [
{
"type": "capability",
"field": "has_longitudinal_control",
"equals": true
},
{
"type": "param",
"key": "AccelPersonalityEnabled",
"equals": true
}
]
},
{
"key": "IntelligentCruiseButtonManagement",
"widget": "toggle",
@@ -2001,6 +2079,22 @@
"equals": true
}
]
},
{
"key": "PlanplusControl",
"widget": "option",
"title": "Plan Plus Controls",
"description": "Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover to lane center; too high and it will ping-pong.",
"min": 0.0,
"max": 2.0,
"step": 0.1,
"enablement": [
{
"type": "param",
"key": "ShowAdvancedControls",
"equals": true
}
]
}
]
},
@@ -2168,6 +2262,50 @@
"title": "Toyota / Lexus Settings",
"description": "",
"items": [
{
"key": "ToyotaAutoHold",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnhancedBsm",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: Prius TSS2 BSM and some tssp",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaTSS2Long",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Toyota: custom longitudinal for TSS2",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaDriveMode",
"widget": "toggle",
"needs_onroad_cycle": true,
"title": "Enable drive mode btn link",
"enablement": [
{
"type": "not_engaged"
}
]
},
{
"key": "ToyotaEnforceStockLongitudinal",
"widget": "toggle",
@@ -24,6 +24,16 @@ sections:
- $ref: '#/macros/longitudinal'
enablement:
- $ref: '#/macros/longitudinal'
- key: RadarDistance
widget: toggle
title: Radar Distance
description: De-noises the lead sunnypilot follows -- holds it through brief radar flicker/dropout and
smooths a jittery (track-flipping) lead -- so it does not lose and re-grab the lead or hunt the gap.
Braking is never reduced below stock.
visibility:
- $ref: '#/macros/longitudinal'
enablement:
- $ref: '#/macros/longitudinal'
- key: DisengageOnAccelerator
widget: toggle
title: Disengage Cruise on Accelerator Pedal
@@ -43,6 +53,32 @@ sections:
label: Relaxed
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonalityEnabled
widget: toggle
title: Enable Acceleration Profiles
description: Enables Eco/Normal/Sport profiles -- a tier-scaled take-off from a stop and a slightly wider,
smoother following gap. Braking is never reduced below stock.
visibility:
- $ref: '#/macros/longitudinal'
enablement:
- $ref: '#/macros/longitudinal'
- key: AccelPersonality
widget: multiple_button
title: Acceleration Profile
description: Eco takes off gently and keeps the roomiest, smoothest following gap; Sport takes off briskly
and follows a touch tighter. Hard-braking authority is always preserved.
options:
- value: 0
label: Eco
- value: 1
label: Normal
- value: 2
label: Sport
enablement:
- $ref: '#/macros/longitudinal'
- type: param
key: AccelPersonalityEnabled
equals: true
- key: IntelligentCruiseButtonManagement
widget: toggle
title: Intelligent Cruise Button Management (ICBM) (Alpha)
@@ -51,6 +51,16 @@ sections:
key: LagdToggle
equals: true
- $ref: '#/macros/advanced_only'
- key: PlanplusControl
widget: option
title: Plan Plus Controls
description: Adjust planplus model recentering strength. The higher this number the more aggressively the model will recover
to lane center; too high and it will ping-pong.
min: 0.0
max: 2.0
step: 0.1
enablement:
- $ref: '#/macros/advanced_only'
- id: lateral_control
title: Lateral Control
description: Neural network lateral control for supported models
@@ -255,3 +255,7 @@ sections:
key: AutoLaneChangeTimer
op: '>'
value: 0
- key: RoadEdgeLaneChangeEnabled
widget: toggle
title: 'Block Lane Change: Road Edge Detection'
description: Blocks the lane change if the model sees a road edge on your signaled side.
@@ -60,6 +60,30 @@ sections:
title: Toyota / Lexus Settings
description: ''
items:
- key: ToyotaAutoHold
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Auto Brake Hold FOR TSS2 HYBRID CARS'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnhancedBsm
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: Prius TSS2 BSM and some tssp'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaTSS2Long
widget: toggle
needs_onroad_cycle: true
title: 'Toyota: custom longitudinal for TSS2'
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaDriveMode
widget: toggle
needs_onroad_cycle: true
title: Enable drive mode btn link
enablement:
- $ref: '#/macros/not_engaged'
- key: ToyotaEnforceStockLongitudinal
widget: toggle
needs_onroad_cycle: true
+3 -2
View File
@@ -45,8 +45,9 @@ class ScrollState(Enum):
class GuiScrollPanel2:
def __init__(self, horizontal: bool = True) -> None:
def __init__(self, horizontal: bool = True, handle_out_of_bounds: bool = True) -> None:
self._horizontal = horizontal
self._handle_out_of_bounds = handle_out_of_bounds
self._state = ScrollState.STEADY
self._offset: rl.Vector2 = rl.Vector2(0, 0)
self._initial_click_event: MouseEvent | None = None
@@ -98,7 +99,7 @@ class GuiScrollPanel2:
# simple exponential return if out of bounds
# out of bounds is handled by snapping, so skip if set
out_of_bounds = self.get_offset() > max_offset or self.get_offset() < min_offset
if out_of_bounds and snap_target is None:
if out_of_bounds and snap_target is None and self._handle_out_of_bounds:
target = max_offset if self.get_offset() > max_offset else min_offset
dt = rl.get_frame_time() or 1e-6
+6 -3
View File
@@ -75,7 +75,6 @@ class _Scroller(Widget):
self._items: list[Widget] = []
self._horizontal = horizontal
self._snap_items = snap_items
assert not self._snap_items or self._horizontal, "Snapping is only supported for horizontal scrolling"
self._spacing = spacing
self._pad = pad
@@ -191,8 +190,12 @@ class _Scroller(Widget):
snap_target: float | None = None
if self._snap_items and visible_items and self._scrolling_to[0] is None:
# TODO: this doesn't handle two small buttons at the edges well
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
if self._horizontal:
center_pos = self._rect.x + self._rect.width / 2
closest_delta_pos = min((((item.rect.x + item.rect.width / 2) - center_pos) for item in visible_items), key=abs)
else:
center_pos = self._rect.y + self._rect.height / 2
closest_delta_pos = min((((item.rect.y + item.rect.height / 2) - center_pos) for item in visible_items), key=abs)
snap_target = self.scroll_panel.get_offset() - closest_delta_pos
return self.scroll_panel.update(self._rect, content_size, snap_target=snap_target)