Long changes

This commit is contained in:
firestar5683
2026-03-25 12:09:35 -05:00
parent 187eb34733
commit 00b8eb2b20
5 changed files with 535 additions and 212 deletions
@@ -1,13 +1,12 @@
#!/usr/bin/env python3
import time
import numpy as np
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.realtime import DT_MDL
from openpilot.common.constants import CV
from openpilot.frogpilot.common.frogpilot_variables import CRUISING_SPEED, THRESHOLD
from openpilot.frogpilot.common.frogpilot_variables import CRUISING_SPEED, THRESHOLD, params_memory
CEStatus = {
"OFF": 0, # Off
@@ -21,51 +20,78 @@ CEStatus = {
"STOP_LIGHT": 8 # Stop light or sign condition
}
def interp(x, xp, fp):
return float(np.interp(x, xp, fp))
def scale_threshold(v_ego):
# Keep StarPilot speed-based lead threshold behavior (v_ego in m/s)
return interp(v_ego, [0.0, 17.9, 26.8, 35.8, 44.7], [0.58, 0.60, 0.62, 0.75, 0.90])
class ConditionalExperimentalMode:
# ===== CONDITIONAL EXPERIMENTAL MODE SPEED-BASED TUNING =====
# Speed ranges: [0-35, 35-55, 55-70, 70+ mph]
FILTER_TIME_CURVES = [0.9, 0.8, 0.6, 0.5]
FILTER_TIME_LEADS = [0.9, 0.8, 0.7, 0.5]
FILTER_TIME_LIGHTS = [0.9, 0.8, 0.75, 0.55]
LIGHT_BOOSTS = [1.0, 1.2, 1.1, 1.0]
LIGHT_MAX_TIME = 9.0
# FILTER TIME CONSTANTS (Lower = More responsive, Higher = Smoother)
# [City, Urban Hwy, Rural Hwy, High Speed]
FILTER_TIME_CURVES = [0.9, 0.8, 0.6, 0.5] # Faster detection at highway speeds
FILTER_TIME_LEADS = [0.9, 0.8, 0.7, 0.5] # Less sensitive at 70+ mph for slow leads
FILTER_TIME_LIGHTS = [0.9, 0.8, 0.75, 0.55] # Less sensitive at 60+ mph for stoplights
# HIGHWAY LIGHT DETECTION MULTIPLIERS
# How much to increase model stop time at highway speeds
LIGHT_BOOSTS = [1.0, 1.2, 1.1, 1.0] # Keep conservative boost for highest speeds
LIGHT_SPEED_LOW = 50 * CV.MPH_TO_MS # 50 mph threshold
LIGHT_SPEED_HIGH = 60 * CV.MPH_TO_MS # 60 mph threshold
LIGHT_MAX_TIME = 9 # Balanced max time preserving city performance
# ===== END TUNING PARAMETERS =====
# Current active values
FILTER_TIME_CURVE = 0.8
FILTER_TIME_LEAD = 0.8
FILTER_TIME_LIGHT = 0.8
LIGHT_BOOST_LOW = 1.15
LIGHT_BOOST_HIGH = 1.2
# Small latch to avoid frame-to-frame mode chatter.
CEM_TRANSITION_GUARD_TIME = 0.50
CEM_TRANSITION_BUFFER_TIME = 0.25
@staticmethod
def get_speed_based_param(speed_mph, param_array):
"""Get parameter value based on current speed using smooth interpolation between breakpoints [0, 35, 55, 70]"""
return interp(speed_mph, [0, 35, 55, 70], param_array)
def __init__(self, FrogPilotPlanner):
self.frogpilot_planner = FrogPilotPlanner
self.curvature_filter = FirstOrderFilter(0.0, self.FILTER_TIME_CURVES[1], DT_MDL)
self.slow_lead_filter = FirstOrderFilter(0.0, self.FILTER_TIME_LEADS[1], DT_MDL)
self.stop_light_filter = FirstOrderFilter(0.0, self.FILTER_TIME_LIGHTS[1], DT_MDL)
# Faster filters with hysteresis for better responsiveness
self.curvature_filter = FirstOrderFilter(0, self.FILTER_TIME_CURVE, DT_MDL)
self.slow_lead_filter = FirstOrderFilter(0, self.FILTER_TIME_LEAD, DT_MDL)
self.stop_light_filter = FirstOrderFilter(0, self.FILTER_TIME_LIGHT, DT_MDL)
self.curve_detected = False
self.slow_lead_detected = False
self.experimental_mode = False
self.stop_light_detected = False
self.prev_experimental_mode = False # For hysteresis
self.mode_hold_until = 0.0
self.mode_false_since = 0.0
def _update_filter_time_constants(self, v_ego):
speed_mph = v_ego * CV.MS_TO_MPH
curve_time = float(np.interp(speed_mph, [0, 35, 55, 70], self.FILTER_TIME_CURVES))
lead_time = float(np.interp(speed_mph, [0, 35, 55, 70], self.FILTER_TIME_LEADS))
light_time = float(np.interp(speed_mph, [0, 35, 55, 70], self.FILTER_TIME_LIGHTS))
self.curvature_filter = FirstOrderFilter(self.curvature_filter.x, curve_time, DT_MDL)
self.slow_lead_filter = FirstOrderFilter(self.slow_lead_filter.x, lead_time, DT_MDL)
self.stop_light_filter = FirstOrderFilter(self.stop_light_filter.x, light_time, DT_MDL)
def update(self, v_ego, sm, frogpilot_toggles):
now = time.monotonic()
if frogpilot_toggles.experimental_mode_via_press:
self.status_value = self.frogpilot_planner.params_memory.get_int("CEStatus")
self.status_value = params_memory.get_int("CEStatus")
else:
self.status_value = CEStatus["OFF"]
if self.status_value not in (CEStatus["USER_DISABLED"], CEStatus["USER_OVERRIDDEN"]) and not sm["carState"].standstill:
self.update_conditions(v_ego, sm, frogpilot_toggles)
triggered = self.check_conditions(v_ego, sm, frogpilot_toggles)
triggered = self.check_conditions(v_ego, sm, frogpilot_toggles)
if triggered:
self.mode_hold_until = now + self.CEM_TRANSITION_GUARD_TIME
self.mode_false_since = 0.0
@@ -76,90 +102,113 @@ class ConditionalExperimentalMode:
transition_buffer_active = self.mode_false_since != 0.0 and (now - self.mode_false_since) < self.CEM_TRANSITION_BUFFER_TIME
self.experimental_mode = triggered or hold_active or transition_buffer_active
self.frogpilot_planner.params_memory.put_int("CEStatus", self.status_value if self.experimental_mode else CEStatus["OFF"])
self.prev_experimental_mode = self.experimental_mode
params_memory.put_int("CEStatus", self.status_value if self.experimental_mode else CEStatus["OFF"])
else:
self.mode_hold_until = 0.0
self.mode_false_since = 0.0
self.experimental_mode &= sm["carState"].standstill and self.frogpilot_planner.model_stopped
self.experimental_mode &= self.status_value != CEStatus["USER_DISABLED"]
self.experimental_mode |= self.status_value == CEStatus["USER_OVERRIDDEN"]
self.experimental_mode = (self.status_value == CEStatus["USER_OVERRIDDEN"] or
(sm["carState"].standstill and self.experimental_mode and self.frogpilot_planner.model_stopped))
self.stop_light_detected &= self.status_value not in (CEStatus["USER_DISABLED"], CEStatus["USER_OVERRIDDEN"])
self.stop_light_filter.x = 0
def check_conditions(self, v_ego, sm, frogpilot_toggles):
if self.curve_detected and frogpilot_toggles.conditional_curves and (not self.frogpilot_planner.frogpilot_following.following_lead or frogpilot_toggles.conditional_curves_lead):
below_speed = frogpilot_toggles.conditional_limit > v_ego >= 1 and not self.frogpilot_planner.frogpilot_following.following_lead
below_speed_with_lead = frogpilot_toggles.conditional_limit_lead > v_ego >= 1 and self.frogpilot_planner.frogpilot_following.following_lead
if below_speed or below_speed_with_lead:
self.status_value = CEStatus["SPEED"]
return True
desired_lane = self.frogpilot_planner.lane_width_left if sm["carState"].leftBlinker else self.frogpilot_planner.lane_width_right
lane_available = desired_lane >= frogpilot_toggles.lane_detection_width or not frogpilot_toggles.conditional_signal_lane_detection
if v_ego < frogpilot_toggles.conditional_signal and (sm["carState"].leftBlinker or sm["carState"].rightBlinker) and not lane_available:
self.status_value = CEStatus["SIGNAL"]
return True
if frogpilot_toggles.conditional_curves and self.curve_detected and (frogpilot_toggles.conditional_curves_lead or not self.frogpilot_planner.frogpilot_following.following_lead):
self.status_value = CEStatus["CURVATURE"]
return True
if self.slow_lead_detected and frogpilot_toggles.conditional_lead:
if frogpilot_toggles.conditional_lead and self.slow_lead_detected and v_ego <= 35.31:
self.status_value = CEStatus["LEAD"]
return True
if (sm["carState"].leftBlinker or sm["carState"].rightBlinker) and v_ego < frogpilot_toggles.conditional_signal:
desired_lane = self.frogpilot_planner.lane_width_left if sm["carState"].leftBlinker else self.frogpilot_planner.lane_width_right
if desired_lane < frogpilot_toggles.lane_detection_width or not frogpilot_toggles.conditional_signal_lane_detection:
self.status_value = CEStatus["SIGNAL"]
return True
below_speed = 1 <= v_ego < frogpilot_toggles.conditional_limit and not self.frogpilot_planner.frogpilot_following.following_lead
below_speed_with_lead = 1 <= v_ego < frogpilot_toggles.conditional_limit_lead and self.frogpilot_planner.frogpilot_following.following_lead
if below_speed or below_speed_with_lead:
self.status_value = CEStatus["SPEED"]
if frogpilot_toggles.conditional_model_stop_time != 0 and self.stop_light_detected:
self.status_value = CEStatus["STOP_LIGHT"]
return True
if self.frogpilot_planner.frogpilot_vcruise.slc.experimental_mode:
self.status_value = CEStatus["SPEED_LIMIT"]
return True
if self.stop_light_detected and frogpilot_toggles.conditional_model_stop_time != 0:
self.status_value = CEStatus["STOP_LIGHT"]
return True
return False
def update_conditions(self, v_ego, sm, frogpilot_toggles):
self._update_filter_time_constants(v_ego)
self.curve_detection(v_ego, frogpilot_toggles)
self.slow_lead(v_ego, frogpilot_toggles)
self.slow_lead(frogpilot_toggles, v_ego)
self.stop_sign_and_light(v_ego, sm, frogpilot_toggles.conditional_model_stop_time)
def curve_detection(self, v_ego, frogpilot_toggles):
self.curvature_filter.update(self.frogpilot_planner.driving_in_curve or self.frogpilot_planner.road_curvature_detected)
self.curve_detected = self.curvature_filter.x >= THRESHOLD and v_ego > CRUISING_SPEED
self.curvature_filter.update(self.frogpilot_planner.road_curvature_detected or self.frogpilot_planner.driving_in_curve)
self.curve_detected = bool(self.curvature_filter.x >= THRESHOLD and v_ego > CRUISING_SPEED)
def slow_lead(self, v_ego, frogpilot_toggles):
def slow_lead(self, frogpilot_toggles, v_ego):
if self.frogpilot_planner.tracking_lead:
slower_lead = (v_ego - self.frogpilot_planner.lead_one.vLead) > CRUISING_SPEED and frogpilot_toggles.conditional_slower_lead
slower_lead |= getattr(self.frogpilot_planner.frogpilot_following, "slower_lead", False) and frogpilot_toggles.conditional_slower_lead
stopped_lead = self.frogpilot_planner.lead_one.vLead < 1 and frogpilot_toggles.conditional_stopped_lead
slower_lead = frogpilot_toggles.conditional_slower_lead and self.frogpilot_planner.frogpilot_following.slower_lead
stopped_lead = frogpilot_toggles.conditional_stopped_lead and self.frogpilot_planner.lead_one.vLead < 1
lead_threshold = scale_threshold(v_ego)
# Adjust threshold based on lead probability for vision-only accuracy
lead_prob = getattr(self.frogpilot_planner.lead_one, 'modelProb', 1.0)
adjusted_threshold = lead_threshold * (1.0 + 0.2 * (1.0 - lead_prob)) # Higher threshold for lower confidence
self.slow_lead_filter.update(slower_lead or stopped_lead)
lead_prob = getattr(self.frogpilot_planner.lead_one, 'modelProb', 1.0)
adjusted_threshold = THRESHOLD * (1.0 + 0.2 * (1.0 - lead_prob))
self.slow_lead_detected = self.slow_lead_filter.x >= adjusted_threshold
self.slow_lead_detected = bool(self.slow_lead_filter.x >= adjusted_threshold)
else:
self.slow_lead_filter.x = 0
self.slow_lead_detected = False
def stop_sign_and_light(self, v_ego, sm, model_time):
if sm["frogpilotCarState"].trafficModeEnabled:
if not sm["frogpilotCarState"].trafficModeEnabled:
speed_mph = v_ego * CV.MS_TO_MPH # Convert m/s to mph
# Interp for smooth scaling in 35-45 mph
bp = [0, 35, 45]
low_filter_time = 0.0 # No filtering under 35 mph
tuned_filter_time_curves = self.FILTER_TIME_CURVES[1] # At 35-55 mph
tuned_filter_time_leads = self.FILTER_TIME_LEADS[1]
tuned_filter_time_lights = self.FILTER_TIME_LIGHTS[1]
low_boost = 1.0
tuned_boost = self.LIGHT_BOOSTS[1]
low_cap_factor = 0.0 # No cap under 35 mph
tuned_cap_factor = 1.0
filter_time_curves = interp(speed_mph, bp, [low_filter_time, low_filter_time, tuned_filter_time_curves])
filter_time_leads = interp(speed_mph, bp, [low_filter_time, low_filter_time, tuned_filter_time_leads])
filter_time_lights = interp(speed_mph, bp, [low_filter_time, low_filter_time, tuned_filter_time_lights])
light_boost = interp(speed_mph, bp, [low_boost, low_boost, tuned_boost])
cap_factor = interp(speed_mph, bp, [low_cap_factor, low_cap_factor, tuned_cap_factor])
# Update filter times with interp
self.curvature_filter = FirstOrderFilter(self.curvature_filter.x, filter_time_curves, DT_MDL)
self.slow_lead_filter = FirstOrderFilter(self.slow_lead_filter.x, filter_time_leads, DT_MDL)
self.stop_light_filter = FirstOrderFilter(self.stop_light_filter.x, filter_time_lights, DT_MDL)
# Disable stoplight detection at very high speeds to prevent false positives
if speed_mph > 75: # Disable above 75 mph
self.stop_light_filter.x = 0
self.stop_light_detected = False
return
# Adjust model time with interp boost and gradual cap
adjusted_model_time = model_time * light_boost
if cap_factor > 0:
adjusted_model_time = min(adjusted_model_time, self.LIGHT_MAX_TIME * cap_factor + model_time * (1 - cap_factor)) # Gradual cap
model_stopping = self.frogpilot_planner.model_length < v_ego * adjusted_model_time
self.stop_light_filter.update(self.frogpilot_planner.model_stopped or model_stopping)
self.stop_light_detected = bool(self.stop_light_filter.x >= THRESHOLD**2 and not self.frogpilot_planner.tracking_lead)
else:
self.stop_light_filter.x = 0
self.stop_light_detected = False
return
speed_mph = v_ego * CV.MS_TO_MPH
if speed_mph > 75:
self.stop_light_filter.x = 0
self.stop_light_detected = False
return
light_boost = float(np.interp(speed_mph, [0, 35, 55, 70], self.LIGHT_BOOSTS))
cap_factor = float(np.interp(speed_mph, [0, 35, 45], [0.0, 0.0, 1.0]))
adjusted_model_time = model_time * light_boost
if cap_factor > 0:
adjusted_model_time = min(adjusted_model_time, self.LIGHT_MAX_TIME * cap_factor + model_time * (1.0 - cap_factor))
model_stopping = self.frogpilot_planner.model_length < v_ego * adjusted_model_time
self.stop_light_filter.update(self.frogpilot_planner.model_stopped or model_stopping)
self.stop_light_detected = self.stop_light_filter.x >= (THRESHOLD ** 2) and not self.frogpilot_planner.tracking_lead
@@ -1,49 +1,58 @@
#!/usr/bin/env python3
import numpy as np
from openpilot.selfdrive.controls.lib.longitudinal_planner import ACCEL_MIN, get_max_accel
from openpilot.selfdrive.controls.lib.longitudinal_planner import A_CRUISE_MIN, get_max_accel
from openpilot.frogpilot.common.frogpilot_variables import CITY_SPEED_LIMIT
def cubic_interp(x, xp, fp):
if x <= xp[0]:
return fp[0]
elif x >= xp[-1]:
return fp[-1]
"""Cubic interpolation using NumPy's native operations for speed."""
# Boundary conditions
if x <= xp[0]:
return fp[0]
elif x >= xp[-1]:
return fp[-1]
i = np.searchsorted(xp, x) - 1
i = max(0, min(i, len(xp) - 2))
t = (x - xp[i]) / float(xp[i + 1] - xp[i])
# Find interval
i = np.searchsorted(xp, x) - 1
i = max(0, min(i, len(xp)-2)) # clamp the index
return fp[i] * (1 - 3 * t ** 2 + 2 * t ** 3) + fp[i + 1] * (3 * t ** 2 - 2 * t ** 3)
# Normalized position
t = (x - xp[i]) / float(xp[i+1] - xp[i])
# Hermite cubic formula
return fp[i]*(1 - 3*t**2 + 2*t**3) + fp[i+1]*(3*t**2 - 2*t**3)
def akima_interp(x, xp, fp):
if x <= xp[0]:
return fp[0]
elif x >= xp[-1]:
return fp[-1]
"""Akima-inspired interpolation with reduced overshoot characteristics."""
if x <= xp[0]:
return fp[0]
elif x >= xp[-1]:
return fp[-1]
i = np.searchsorted(xp, x) - 1
i = max(0, min(i, len(xp) - 2))
t = (x - xp[i]) / float(xp[i + 1] - xp[i])
i = np.searchsorted(xp, x) - 1
i = max(0, min(i, len(xp)-2)) # clamp the index
t2 = t * t
t3 = t2 * t
t4 = t2 * t2
return (fp[i] * (1 - 10 * t3 + 15 * t4 - 6 * t3 * t2)
+ fp[i + 1] * (10 * t3 - 15 * t4 + 6 * t3 * t2))
t = (x - xp[i]) / float(xp[i+1] - xp[i])
A_CRUISE_MIN_ECO = ACCEL_MIN / 2
A_CRUISE_MIN_SPORT = ACCEL_MIN * 2
# Quintic polynomial to reduce overshoot
t2 = t*t
t4 = t2*t2
t3 = t2*t
return (fp[i]*(1 - 10*t3 + 15*t4 - 6*t3*t2)
+ fp[i+1]*(10*t3 - 15*t4 + 6*t3*t2))
A_CRUISE_MIN_ECO = A_CRUISE_MIN / 2
A_CRUISE_MIN_SPORT = A_CRUISE_MIN * 2
# MPH = [0.0, 11, 22, 34, 45, 56, 89]
A_CRUISE_MAX_BP_CUSTOM = [0.0, 5., 10., 15., 20., 25., 40.]
A_CRUISE_MAX_BP_CUSTOM = [0.0, 5., 10., 15., 20., 25., 40.]
A_CRUISE_MAX_VALS_ECO_EV = [1.15, 1.15, 1.15, 1.15, 1.30, 1.30, 1.72]
A_CRUISE_MAX_VALS_STANDARD_EV = [1.25, 1.25, 1.25, 1.25, 1.45, 1.50, 2.00]
A_CRUISE_MAX_VALS_SPORT_EV = [1.35, 1.35, 1.35, 1.35, 1.60, 1.60, 2.10]
A_CRUISE_MAX_VALS_SPORT_PLUS_EV = [1.55, 1.55, 1.55, 1.55, 1.84, 1.84, 2.42]
A_CRUISE_MAX_VALS_ECO_GAS = [2.0, 1.5, 1.0, 0.8, 0.6, 0.4, 0.2]
A_CRUISE_MAX_VALS_SPORT_GAS = [3.0, 2.5, 2.0, 1.5, 1.0, 0.8, 0.6]
A_CRUISE_MAX_VALS_ECO_GAS = [2.0, 1.5, 1.0, 0.8, 0.6, 0.4, 0.2]
A_CRUISE_MAX_VALS_SPORT_GAS = [3.0, 2.5, 2.0, 1.5, 1.0, 0.8, 0.6]
A_CRUISE_MAX_VALS_ECO_TRUCK = [3.00, 1.05, 0.60, 0.50, 0.50, 0.45, 0.35]
A_CRUISE_MAX_VALS_STANDARD_TRUCK = [6.00, 1.10, 0.70, 0.60, 0.55, 0.45, 0.35]
A_CRUISE_MAX_VALS_SPORT_TRUCK = [6.00, 1.15, 0.75, 0.70, 0.60, 0.50, 0.40]
@@ -153,4 +162,4 @@ class FrogPilotAcceleration:
elif frogpilot_toggles.deceleration_profile == DECELERATION_PROFILES["SPORT"]:
self.min_accel = A_CRUISE_MIN_SPORT
else:
self.min_accel = ACCEL_MIN
self.min_accel = A_CRUISE_MIN
+135 -14
View File
@@ -1,20 +1,31 @@
import numpy as np
from cereal import car
import numpy as np
from openpilot.common.realtime import DT_CTRL
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N
from openpilot.common.pid import PIDController
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.common.filter_simple import FirstOrderFilter
from opendbc.car.gm.values import CarControllerParams, GMFlags
CONTROL_N_T_IDX = ModelConstants.T_IDXS[:CONTROL_N]
clip = np.clip
interp = np.interp
LongCtrlState = car.CarControl.Actuators.LongControlState
def apply_deadzone(error, deadzone):
if error > deadzone:
error -= deadzone
elif error < -deadzone:
error += deadzone
else:
error = 0.0
return error
def long_control_state_trans(CP, active, long_control_state, v_ego,
should_stop, brake_pressed, cruise_standstill, frogpilot_toggles):
# Ignore cruise standstill if car has a gas interceptor.
# Ignore cruise standstill if car has a gas interceptor
cruise_standstill = cruise_standstill and not CP.enableGasInterceptorDEPRECATED
stopping_condition = should_stop
starting_condition = (not should_stop and
@@ -48,6 +59,45 @@ def long_control_state_trans(CP, active, long_control_state, v_ego,
long_control_state = LongCtrlState.pid
return long_control_state
def long_control_state_trans_old_long(CP, active, long_control_state, v_ego, v_target,
v_target_1sec, brake_pressed, cruise_standstill, frogpilot_toggles):
accelerating = v_target_1sec > v_target
planned_stop = (v_target < frogpilot_toggles.vEgoStopping and
v_target_1sec < frogpilot_toggles.vEgoStopping and
not accelerating)
stay_stopped = (v_ego < frogpilot_toggles.vEgoStopping and
(brake_pressed or cruise_standstill))
stopping_condition = planned_stop or stay_stopped
starting_condition = (v_target_1sec > frogpilot_toggles.vEgoStarting and
accelerating and
not cruise_standstill and
not brake_pressed)
started_condition = v_ego > frogpilot_toggles.vEgoStarting
if not active:
long_control_state = LongCtrlState.off
else:
if long_control_state in (LongCtrlState.off, LongCtrlState.pid):
long_control_state = LongCtrlState.pid
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif long_control_state == LongCtrlState.stopping:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
elif starting_condition:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.starting:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif started_condition:
long_control_state = LongCtrlState.pid
return long_control_state
class LongControl:
def __init__(self, CP):
self.CP = CP
@@ -56,9 +106,10 @@ class LongControl:
self.pid = PIDController((CP.longitudinalTuning.kpBP, CP.longitudinalTuning.kpV),
(CP.longitudinalTuning.kiBP, CP.longitudinalTuning.kiV),
rate=1 / DT_CTRL)
# Preserve legacy behavior when no feedforward gain is provided (default of 0.0).
# Preserve legacy behaviour when no feedforward gain is provided (default of 0.0)
kf = getattr(CP.longitudinalTuning, 'kfDEPRECATED', 0.0)
self.feedforward_gain = kf if kf != 0.0 else 1.0
self.v_pid = 0.0
self._mode_setup()
self.last_output_accel = 0.0
self.last_a_target = 0.0
@@ -67,8 +118,9 @@ class LongControl:
CP.brand == "gm" and CP.enableGasInterceptorDEPRECATED and (CP.flags & GMFlags.PEDAL_LONG.value)
)
def update_mpc_mode(self, experimental_mode: bool):
def update_mpc_mode(self, experimental_mode):
new_mode = 'blended' if experimental_mode else 'acc'
if self.transitioning and self.prev_mode == 'blended' and self.current_mode == 'acc':
self.mode_transition_timer = 0.0
@@ -77,6 +129,7 @@ class LongControl:
self.transitioning = True
self.mode_transition_timer = 0.0
self.mode_transition_filter.x = self.last_output_accel
self.current_mode = new_mode
if self.transitioning:
@@ -103,9 +156,9 @@ class LongControl:
self.integrator_hold_frames = 0
return False
handoff_threshold = np.interp(v_ego, [0.0, 4.0, 12.0, 25.0], [0.35, 0.45, 0.55, 0.70])
handoff_threshold = interp(v_ego, [0.0, 4.0, 12.0, 25.0], [0.35, 0.45, 0.55, 0.70])
if abs(a_target - self.last_a_target) > handoff_threshold:
hold_frames = int(round(np.interp(v_ego, [0.0, 4.0, 12.0, 25.0], [25.0, 20.0, 14.0, 10.0])))
hold_frames = int(round(interp(v_ego, [0.0, 4.0, 12.0, 25.0], [25.0, 20.0, 14.0, 10.0])))
self.integrator_hold_frames = max(self.integrator_hold_frames, hold_frames)
self.last_a_target = a_target
@@ -146,18 +199,19 @@ class LongControl:
else: # LongCtrlState.pid
error = a_target - CS.aEgo
self.update_mpc_mode(self.experimental_mode)
feedforward = a_target * self.feedforward_gain
freeze_integrator = self._get_pedal_long_freeze(a_target, error, CS.vEgo, accel_limits)
raw_output_accel = self.pid.update(error, speed=CS.vEgo,
feedforward=a_target * self.feedforward_gain,
raw_output_accel = self.pid.update(error, speed=CS.vEgo, feedforward=feedforward,
freeze_integrator=freeze_integrator)
if self.transitioning and self.prev_mode == 'acc' and self.current_mode == 'blended':
if raw_output_accel < 0 and raw_output_accel < self.last_output_accel:
progress = min(1.0, self.mode_transition_timer / self.mode_transition_duration)
# Soften transition at low urgency, but keep sharp for high decel.
urgency_denom = max(1e-3, abs(CarControllerParams.ACCEL_MIN))
urgency = abs(raw_output_accel / urgency_denom)
urgency_smooth = min(1.0, urgency ** 0.4)
# Soften transition at low urgency, but keep sharp for high decel
# 20% smoother for chill decel (lower exponent)
urgency = abs(raw_output_accel / CarControllerParams.ACCEL_MIN)
urgency_smooth = min(1.0, urgency ** 0.4) # 20% smoother for chill decel
blend_factor = 1.0 - (1.0 - progress) * (1.0 - urgency_smooth)
output_accel = self.last_output_accel + (raw_output_accel - self.last_output_accel) * blend_factor
else:
@@ -165,5 +219,72 @@ class LongControl:
else:
output_accel = raw_output_accel
self.last_output_accel = np.clip(output_accel, accel_limits[0], accel_limits[1])
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel
def reset_old_long(self, v_pid):
"""Reset PID controller and change setpoint"""
self.pid.reset()
self.v_pid = v_pid
self.last_a_target = 0.0
self.integrator_hold_frames = 0
def update_old_long(self, active, CS, long_plan, accel_limits, t_since_plan, frogpilot_toggles):
"""Update longitudinal control. This updates the state machine and runs a PID loop"""
# Interp control trajectory
speeds = long_plan.speeds
if len(speeds) == CONTROL_N:
v_target_now = interp(t_since_plan, CONTROL_N_T_IDX, speeds)
a_target_now = interp(t_since_plan, CONTROL_N_T_IDX, long_plan.accels)
v_target = interp(frogpilot_toggles.longitudinalActuatorDelay + t_since_plan, CONTROL_N_T_IDX, speeds)
a_target = 2 * (v_target - v_target_now) / frogpilot_toggles.longitudinalActuatorDelay - a_target_now
v_target_1sec = interp(frogpilot_toggles.longitudinalActuatorDelay + t_since_plan + 1.0, CONTROL_N_T_IDX, speeds)
else:
v_target = 0.0
v_target_now = 0.0
v_target_1sec = 0.0
a_target = 0.0
self.pid.neg_limit = accel_limits[0]
self.pid.pos_limit = accel_limits[1]
output_accel = self.last_output_accel
self.long_control_state = long_control_state_trans_old_long(self.CP, active, self.long_control_state, CS.vEgo,
v_target, v_target_1sec, CS.brakePressed,
CS.cruiseState.standstill, frogpilot_toggles)
if self.long_control_state == LongCtrlState.off:
self.reset_old_long(CS.vEgo)
output_accel = 0.
elif self.long_control_state == LongCtrlState.stopping:
if output_accel > frogpilot_toggles.stopAccel:
output_accel = min(output_accel, 0.0)
output_accel -= frogpilot_toggles.stoppingDecelRate * DT_CTRL
self.reset_old_long(CS.vEgo)
elif self.long_control_state == LongCtrlState.starting:
output_accel = frogpilot_toggles.startAccel
self.reset_old_long(CS.vEgo)
elif self.long_control_state == LongCtrlState.pid:
self.v_pid = v_target_now
# Toyota starts braking more when it thinks you want to stop
# Freeze the integrator so we don't accelerate to compensate, and don't allow positive acceleration
# TODO too complex, needs to be simplified and tested on toyotas
prevent_overshoot = not self.CP.stoppingControl and CS.vEgo < 1.5 and v_target_1sec < 0.7 and v_target_1sec < self.v_pid
deadzone = interp(CS.vEgo, self.CP.longitudinalTuning.deadzoneBP, self.CP.longitudinalTuning.deadzoneV)
error = self.v_pid - CS.vEgo
error_deadzone = apply_deadzone(error, deadzone)
freeze_integrator = prevent_overshoot or self._get_pedal_long_freeze(a_target, error_deadzone, CS.vEgo, accel_limits)
feedforward = a_target * self.feedforward_gain
output_accel = self.pid.update(error_deadzone, speed=CS.vEgo,
feedforward=feedforward,
freeze_integrator=freeze_integrator)
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel
@@ -16,9 +16,6 @@ from openpilot.common.swaglog import cloudlog
# WARNING: imports outside of constants will not trigger a rebuild
from openpilot.selfdrive.modeld.constants import index_function
# Keep in sync with selfdrive/controls/radard.py
_LEAD_ACCEL_TAU = 1.5
if __name__ == '__main__': # generating code
from openpilot.third_party.acados.acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
else:
@@ -41,29 +38,60 @@ COST_DIM = COST_E_DIM + 1
CONSTR_DIM = 4
# ===== VOACC SPEED-BASED TUNING PARAMETERS =====
# City: Emergency-responsive | Highway: Rubber-banding prevention
# Speed ranges: [0-35, 35-55, 55-70, 70+ mph]
SPEED_BREAKPOINTS = [0, 35, 55, 70]
# SPEED BREAKPOINTS (mph)
SPEED_BREAKPOINTS = [0, 35, 55, 70] # 4 ranges: 0-35, 35-55, 55-70, 70+
# ===== CHANGE THESE VALUES FOR DIFFERENT SPEEDS =====
# RESPONSIVENESS TO LEAD CARS (Lower = More responsive, Higher = More stable)
# [City Emergency, Urban Hwy, Rural Hwy, High Speed]
X_EGO_OBSTACLE_COSTS = [3.0, 3.0, 2.5, 2.0] # Less aggressive at low speeds, closer to original
# JERK CONTROL (Lower = More jerky/responsive, Higher = Smoother/conservative)
# [City Emergency, Urban Hwy, Rural Hwy, High Speed]
J_EGO_COSTS = [5.0, 4.75, 4.5, 4.0] # Reverted to original 5.0 at low speeds
# ACCELERATION CHANGE PENALTIES (Lower = More responsive, Higher = Smoother)
# [City Emergency, Urban Hwy, Rural Hwy, High Speed]
A_CHANGE_COSTS = [200, 195, 180, 170] # Reverted to original 200 at low speeds
# SMOOTHING FILTERS - Speed-adaptive for optimal responsiveness
# Lower = More responsive, Higher = Smoother
LEAD_FILTER_TIME_LOW = 0.8 # Under 40 mph: Fast response for city emergency braking
LEAD_FILTER_TIME_HIGH = 1.2 # Over 40 mph: Faster response to prevent highway gaps
SPEED_FILTER_THRESHOLD = 40 * CV.MPH_TO_MS # 40 mph threshold
# DISTANCE ADAPTATION STRENGTH (How much penalties increase when close to lead)
# [City, Urban Hwy, Rural Hwy, High Speed]
X_EGO_OBSTACLE_COSTS = [3.0, 3.0, 2.5, 2.0]
J_EGO_COSTS = [5.0, 4.75, 4.5, 4.0]
A_CHANGE_COSTS = [200.0, 195.0, 180.0, 170.0]
LEAD_FILTER_TIME_LOW = 0.8
LEAD_FILTER_TIME_HIGH = 1.2
DIST_ADAPTS = [0.04, 0.06, 0.06, 0.05]
DIST_ADAPTS = [0.04, 0.06, 0.06, 0.05] # Balanced across speeds
# Current active values
# ===== END TUNING PARAMETERS =====
# Function to get parameter value based on current speed
def get_speed_based_param(speed_mph, param_array):
"""Get parameter value based on current speed using smooth interpolation"""
return float(np.interp(speed_mph, SPEED_BREAKPOINTS, param_array))
# Current active values (set based on speed)
X_EGO_OBSTACLE_COST = 2.75
J_EGO_COST = 5.5
A_CHANGE_COST = 250.0
LEAD_FILTER_TIME = 2.0
DIST_ADAPT = 0.06
X_EGO_COST = 0.
V_EGO_COST = 0.
A_EGO_COST = 0.
J_EGO_COST = 5.5
A_CHANGE_COST = 250.0
DANGER_ZONE_COST = 100.
CRASH_DISTANCE = .25
LEAD_DANGER_FACTOR = 0.75
LIMIT_COST = 1e6
ACADOS_SOLVER_TYPE = 'SQP_RTI'
# Default lead acceleration decay set to 50% at 1s
LEAD_ACCEL_TAU = 1.5
# Fewer timestamps don't hurt performance and lead to
@@ -80,11 +108,6 @@ STOP_DISTANCE = 6.0
CRUISE_MIN_ACCEL = -1.2
CRUISE_MAX_ACCEL = 1.6
def get_speed_based_param(speed_mph, param_array):
return float(np.interp(speed_mph, SPEED_BREAKPOINTS, param_array))
def get_jerk_factor(aggressive_jerk_acceleration=0.5, aggressive_jerk_danger=0.5, aggressive_jerk_speed=0.5,
standard_jerk_acceleration=1.0, standard_jerk_danger=1.0, standard_jerk_speed=1.0,
relaxed_jerk_acceleration=1.0, relaxed_jerk_danger=1.0, relaxed_jerk_speed=1.0,
@@ -132,11 +155,9 @@ def get_T_FOLLOW(aggressive_follow=1.25, standard_follow=1.45, relaxed_follow=1.
def get_stopped_equivalence_factor(v_lead):
return (v_lead**2) / (2 * COMFORT_BRAKE)
def get_safe_obstacle_distance(v_ego, t_follow, stop_distance=STOP_DISTANCE):
return (v_ego**2) / (2 * COMFORT_BRAKE) + t_follow * v_ego + stop_distance
def desired_follow_distance(v_ego, v_lead, t_follow=None, stop_distance=STOP_DISTANCE):
if t_follow is None:
t_follow = get_T_FOLLOW()
@@ -216,11 +237,12 @@ def gen_long_ocp():
# from an obstacle at every timestep. This obstacle can be a lead car
# or other object. In e2e mode we can use x_position targets as a cost
# instead.
accel_change = a_ego - prev_a
costs = [((x_obstacle - x_ego) - (desired_dist_comfort)) / (v_ego + 10.),
x_ego,
v_ego,
a_ego,
a_ego - prev_a,
accel_change,
j_ego]
ocp.model.cost_y_expr = vertcat(*costs)
ocp.model.cost_y_expr_e = vertcat(*costs[:-1])
@@ -279,24 +301,24 @@ class LongitudinalMpc:
self.dt = dt
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.source = SOURCES[2]
# Initialize smoothing filters with default time constants
self.current_filter_time = LEAD_FILTER_TIME_LOW
self.prev_filter_time = self.current_filter_time
self.lead_a_filter = FirstOrderFilter(0.0, self.current_filter_time, self.dt)
self.lead_v_filter = FirstOrderFilter(0.0, self.current_filter_time, self.dt)
# Slew-limited filter factor to avoid abrupt 0.50↔1.00 jumps
self.filter_time_factor = 1.0
self.prev_filter_time_factor = 1.0
self.slew_per_sec = 1.0
# Instance variables to avoid global modifications
self.current_x_ego_cost = X_EGO_OBSTACLE_COSTS[0]
self.current_j_ego_cost = J_EGO_COSTS[0]
self.current_a_change_cost = A_CHANGE_COSTS[0]
self.current_dist_adapt = DIST_ADAPTS[0]
# Initialize acceleration limits to prevent AttributeError
self.cruise_min_a = ACCEL_MIN
self.max_a = min(ACCEL_MAX, 1.2)
self.stop_distance = STOP_DISTANCE
self.reset()
def reset(self):
@@ -349,29 +371,40 @@ class LongitudinalMpc:
uncertainty=0.0, accel_reengage=False, panic_bypass=False, stop_distance=STOP_DISTANCE):
self.stop_distance = max(float(stop_distance), 4.0)
speed_mph = v_ego * CV.MS_TO_MPH
# Update parameters based on current speed with interpolation for smooth scaling
speed_mph = v_ego * CV.MS_TO_MPH # Convert m/s to mph
# Use speed-based parameters for smooth scaling across all breakpoints
self.current_x_ego_cost = get_speed_based_param(speed_mph, X_EGO_OBSTACLE_COSTS)
self.current_j_ego_cost = get_speed_based_param(speed_mph, J_EGO_COSTS)
self.current_a_change_cost = get_speed_based_param(speed_mph, A_CHANGE_COSTS)
# For dist_adapt, start from 0.0 under low speeds while enabling full smooth transitions
dist_adapt_array = [0.0, DIST_ADAPTS[1], DIST_ADAPTS[2], DIST_ADAPTS[3]]
self.current_dist_adapt = get_speed_based_param(speed_mph, dist_adapt_array)
# Update filter time constants with interp and recreate filters if needed
if speed_mph < 47:
self.current_filter_time = 0.0
self.current_filter_time = 0.0
else:
self.current_filter_time = float(np.interp(speed_mph, [47, 65], [0.0, LEAD_FILTER_TIME_HIGH]))
if abs(self.current_filter_time - self.prev_filter_time) > 0.1:
self.lead_a_filter = FirstOrderFilter(self.lead_a_filter.x, self.current_filter_time, self.dt)
self.lead_v_filter = FirstOrderFilter(self.lead_v_filter.x, self.current_filter_time, self.dt)
self.current_filter_time = np.interp(speed_mph, [47, 65], [0.0, LEAD_FILTER_TIME_HIGH])
if abs(self.current_filter_time - getattr(self, 'prev_filter_time', 0)) > 0.1: # Only update if significant change
# Recreate filters with new time constant while preserving current values
current_a = self.lead_a_filter.x if hasattr(self.lead_a_filter, 'x') else 0.0
current_v = self.lead_v_filter.x if hasattr(self.lead_v_filter, 'x') else 0.0
self.lead_a_filter = FirstOrderFilter(current_a, self.current_filter_time, self.dt)
self.lead_v_filter = FirstOrderFilter(current_v, self.current_filter_time, self.dt)
self.prev_filter_time = self.current_filter_time
# Adaptive jerk factors for distance with interp scaling
dist_factor = 1.0 + self.current_dist_adapt * (20.0 / max(lead_dist, 5.0))
acceleration_jerk *= dist_factor
danger_jerk *= dist_factor
speed_jerk *= dist_factor
# Scene complexity adjustment based on model uncertainty
prev_filter_time_factor = getattr(self, 'prev_filter_time_factor', 1.0)
# Target factor from uncertainty
if uncertainty <= 0.45:
tgt_factor = 1.0
elif uncertainty >= 0.70:
@@ -381,15 +414,21 @@ class LongitudinalMpc:
if accel_reengage:
tgt_factor = min(tgt_factor, 0.5)
# Hard bypass of smoothing when approaching fast or magnitude trips
if panic_bypass:
tgt_factor = 0.0
# Slew-limit changes to avoid step-wise filter jumps
max_step = self.slew_per_sec * self.dt
delta = np.clip(tgt_factor - self.filter_time_factor, -max_step, max_step)
self.filter_time_factor += float(delta)
filter_time_factor = float(self.filter_time_factor)
# When uncertainty is moderately elevated, allow accel but cap jerk by increasing jerk cost
if 0.45 <= uncertainty < 0.60:
speed_jerk *= float(np.interp(uncertainty, [0.45, 0.60], [1.2, 1.5]))
scale = float(np.interp(uncertainty, [0.45, 0.60], [1.2, 1.5]))
speed_jerk *= scale
if self.mode == 'acc':
a_change_cost = acceleration_jerk if prev_accel_constraint else 0
@@ -403,11 +442,14 @@ class LongitudinalMpc:
raise NotImplementedError(f'Planner mode {self.mode} not recognized in planner cost set')
self.set_cost_weights(cost_weights, constraint_cost_weights)
if abs(self.filter_time_factor - self.prev_filter_time_factor) > 0.05:
new_filter_time = self.current_filter_time * self.filter_time_factor
self.lead_a_filter = FirstOrderFilter(self.lead_a_filter.x, new_filter_time, self.dt)
self.lead_v_filter = FirstOrderFilter(self.lead_v_filter.x, new_filter_time, self.dt)
self.prev_filter_time_factor = self.filter_time_factor
# Adjust filter time constants for complex scenes
if abs(filter_time_factor - getattr(self, 'prev_filter_time_factor', 1.0)) > 0.05:
current_a = self.lead_a_filter.x if hasattr(self.lead_a_filter, 'x') else 0.0
current_v = self.lead_v_filter.x if hasattr(self.lead_v_filter, 'x') else 0.0
new_filter_time = self.current_filter_time * filter_time_factor
self.lead_a_filter = FirstOrderFilter(current_a, new_filter_time, self.dt)
self.lead_v_filter = FirstOrderFilter(current_v, new_filter_time, self.dt)
self.prev_filter_time_factor = filter_time_factor
def set_cur_state(self, v, a):
v_prev = self.x0[1]
@@ -420,9 +462,11 @@ class LongitudinalMpc:
@staticmethod
def extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau, v_ego=0.0):
speed_mph = v_ego * CV.MS_TO_MPH
exp_weight = float(np.interp(speed_mph, [0, 20, 35], [1.0, 1.0, 0.0]))
bp = [0, 20, 35]
exp_weight = np.interp(speed_mph, bp, [1.0, 1.0, 0.0]) # Full exp at <20, blend to constant at 35
if exp_weight > 0.0:
if exp_weight > 0:
# Exponential decay component
a_lead_traj_exp = a_lead * np.exp(-a_lead_tau * (T_IDXS**2)/2.)
v_lead_traj_exp = np.clip(v_lead + np.cumsum(T_DIFFS * a_lead_traj_exp), 0.0, 1e8)
x_lead_traj_exp = x_lead + np.cumsum(T_DIFFS * v_lead_traj_exp)
@@ -430,11 +474,13 @@ class LongitudinalMpc:
x_lead_traj_exp = np.zeros_like(T_IDXS)
v_lead_traj_exp = np.zeros_like(T_IDXS)
# Constant acceleration component
v_lead_traj_const = np.clip(v_lead + a_lead * T_IDXS, 0.0, 1e8)
x_lead_traj_const = x_lead + v_lead * T_IDXS + 0.5 * a_lead * T_IDXS**2
v_lead_traj = exp_weight * v_lead_traj_exp + (1.0 - exp_weight) * v_lead_traj_const
x_lead_traj = exp_weight * x_lead_traj_exp + (1.0 - exp_weight) * x_lead_traj_const
# Blend based on weight
v_lead_traj = exp_weight * v_lead_traj_exp + (1 - exp_weight) * v_lead_traj_const
x_lead_traj = exp_weight * x_lead_traj_exp + (1 - exp_weight) * x_lead_traj_const
lead_xv = np.column_stack((x_lead_traj, v_lead_traj))
return lead_xv
@@ -447,10 +493,11 @@ class LongitudinalMpc:
a_lead = lead.aLeadK
a_lead_tau = lead.aLeadTau
else:
# Fake a fast lead car, so mpc can keep running in the same mode
x_lead = 50.0
v_lead = v_ego + 10.0
a_lead = 0.0
a_lead_tau = _LEAD_ACCEL_TAU
a_lead_tau = LEAD_ACCEL_TAU
# MPC will not converge if immediate crash is expected
# Clip lead distance to what is still possible to brake for
@@ -458,13 +505,17 @@ class LongitudinalMpc:
x_lead = np.clip(x_lead, min_x_lead, 1e8)
v_lead = np.clip(v_lead, 0.0, 1e8)
a_lead = np.clip(a_lead, -10., 5.)
# Apply smoothing filters with interp scaling
self.lead_a_filter.update(a_lead)
self.lead_v_filter.update(v_lead)
lead_xv = self.extrapolate_lead(x_lead, self.lead_v_filter.x, self.lead_a_filter.x, a_lead_tau, v_ego)
a_lead = self.lead_a_filter.x
v_lead = self.lead_v_filter.x
lead_xv = self.extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau, v_ego)
return lead_xv
def set_accel_limits(self, min_a, max_a):
# TODO this sets a max accel limit, but the minimum limit is only for cruise decel
# needs refactor
self.cruise_min_a = min_a
self.max_a = max_a
+146 -53
View File
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
import math
import time
import numpy as np
import time
import cereal.messaging as messaging
from opendbc.car.interfaces import ACCEL_MIN, ACCEL_MAX
@@ -12,13 +12,14 @@ from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, get_accel_from_plan
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
from openpilot.common.swaglog import cloudlog
from openpilot.frogpilot.common.frogpilot_variables import MINIMUM_LATERAL_ACCELERATION
LON_MPC_STEP = 0.2 # first step is 0.2s
A_CRUISE_MIN = -1.0
A_CRUISE_MAX_BP = [0.0, 5., 10., 15., 20., 25., 40.]
A_CRUISE_MAX_VALS = [1.125, 1.125, 1.125, 1.125, 1.25, 1.25, 1.5]
CONTROL_N_T_IDX = ModelConstants.T_IDXS[:CONTROL_N]
@@ -59,6 +60,44 @@ def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
return [a_target[0], min(a_target[1], a_x_allowed)]
def get_accel_from_plan_classic(CP, speeds, accels, vEgoStopping):
if len(speeds) == CONTROL_N:
v_target_now = np.interp(DT_MDL, CONTROL_N_T_IDX, speeds)
a_target_now = np.interp(DT_MDL, CONTROL_N_T_IDX, accels)
v_target = np.interp(CP.longitudinalActuatorDelay + DT_MDL, CONTROL_N_T_IDX, speeds)
if v_target != v_target_now:
a_target = 2 * (v_target - v_target_now) / CP.longitudinalActuatorDelay - a_target_now
else:
a_target = a_target_now
v_target_1sec = np.interp(CP.longitudinalActuatorDelay + DT_MDL + 1.0, CONTROL_N_T_IDX, speeds)
else:
v_target = 0.0
v_target_1sec = 0.0
a_target = 0.0
should_stop = (v_target < vEgoStopping and
v_target_1sec < vEgoStopping)
return a_target, should_stop
def get_accel_from_plan(speeds, accels, action_t=DT_MDL, vEgoStopping=0.05):
if len(speeds) == CONTROL_N:
v_now = speeds[0]
a_now = accels[0]
v_target = np.interp(action_t, CONTROL_N_T_IDX, speeds)
a_target = 2 * (v_target - v_now) / (action_t) - a_now
v_target_1sec = np.interp(action_t + 1.0, CONTROL_N_T_IDX, speeds)
else:
v_target = 0.0
v_target_1sec = 0.0
a_target = 0.0
should_stop = (v_target < vEgoStopping and
v_target_1sec < vEgoStopping)
return a_target, should_stop
class LongitudinalPlanner:
def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL):
self.CP = CP
@@ -67,10 +106,12 @@ class LongitudinalPlanner:
self.dt = dt
self.allow_throttle = True
self.mode = 'acc'
self.generation = None
self.a_desired = init_a
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt)
self.v_model_error = 0.0
self.prev_accel_clip = [ACCEL_MIN, ACCEL_MAX]
self.output_a_target = 0.0
self.output_should_stop = False
@@ -80,12 +121,18 @@ class LongitudinalPlanner:
self.j_desired_trajectory = np.zeros(CONTROL_N)
self.solverExecutionTime = 0.0
self.uncert_slow = FirstOrderFilter(0.0, 1.6, self.dt)
self.uncert_fast = FirstOrderFilter(0.0, 0.9, self.dt)
# ---- Rubberband mitigation state ----
# Two uncertainty tracks (slow/fast) for asymmetric gating
self.uncert_slow = FirstOrderFilter(0.0, 1.6, self.dt) # ~lam=0.6
self.uncert_fast = FirstOrderFilter(0.0, 0.9, self.dt) # faster cool-down for accel decisions
# Lead stability tracking
self.prev_lead_dist = None
self.last_big_brake_t = 0.0
self.stable_lead = False
# Smoothed lead distance
self.lead_dist_f = None
# Uncertainty slope tracking
self._uncert_last = 0.0
self._uncert_last_t = None
@@ -93,6 +140,11 @@ class LongitudinalPlanner:
def mlsim(self):
return self.generation in ("v8", "v10", "v11", "v12")
def get_mpc_mode(self) -> str:
if not self.mlsim:
return self.mode
return getattr(self.mpc, 'mode', 'acc')
@staticmethod
def get_model_speed_error(model_msg, v_ego):
if len(model_msg.temporalPose.trans):
@@ -115,18 +167,17 @@ class LongitudinalPlanner:
v = np.zeros(len(T_IDXS_MPC))
a = np.zeros(len(T_IDXS_MPC))
j = np.zeros(len(T_IDXS_MPC))
if len(model_msg.meta.disengagePredictions.gasPressProbs) > 1:
throttle_prob = model_msg.meta.disengagePredictions.gasPressProbs[1]
else:
throttle_prob = 1.0
# FrogPilot variables
if frogpilot_toggles.taco_tune:
max_lat_accel = np.interp(v_ego, [5, 10, 20], [1.5, 2.0, 3.0])
curvatures = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.orientationRate.z) / np.clip(v, 0.3, 100.0)
max_v = np.sqrt(max_lat_accel / (np.abs(curvatures) + 1e-3)) - 2.0
v = np.minimum(max_v, v)
if len(model_msg.meta.disengagePredictions.gasPressProbs) > 1:
throttle_prob = model_msg.meta.disengagePredictions.gasPressProbs[1]
else:
throttle_prob = 1.0
return x, v, a, j, throttle_prob
def update(self, sm, frogpilot_toggles):
@@ -157,57 +208,77 @@ class LongitudinalPlanner:
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
if self.mpc.mode == 'acc':
accel_clip = [sm['frogpilotPlan'].minAcceleration, sm['frogpilotPlan'].maxAcceleration]
accel_limits = [sm['frogpilotPlan'].minAcceleration, sm['frogpilotPlan'].maxAcceleration]
steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg
if not sm['frogpilotPlan'].cscControllingSpeed:
accel_clip = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_clip, self.CP)
accel_limits_turns = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_limits, self.CP)
else:
accel_clip = [ACCEL_MIN, ACCEL_MAX]
accel_limits = [ACCEL_MIN, ACCEL_MAX]
accel_limits_turns = [ACCEL_MIN, ACCEL_MAX]
if reset_state:
self.v_desired_filter.x = v_ego
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = np.clip(sm['carState'].aEgo, accel_clip[0], accel_clip[1])
self.a_desired = np.clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
model_error = self.get_model_speed_error(sm['modelV2'], v_ego)
x, v, a, j, throttle_prob = self.parse_model(sm['modelV2'], model_error, v_ego, frogpilot_toggles)
# Compute model v_ego error
self.v_model_error = self.get_model_speed_error(sm['modelV2'], v_ego)
x, v, a, j, throttle_prob = self.parse_model(sm['modelV2'], self.v_model_error, v_ego, frogpilot_toggles)
# Don't clip at low speeds since throttle_prob doesn't account for creep
self.allow_throttle = throttle_prob > ALLOW_THROTTLE_THRESHOLD or v_ego <= MIN_ALLOW_THROTTLE_SPEED
self.allow_throttle &= not sm['frogpilotPlan'].disableThrottle
if not self.allow_throttle:
clipped_accel_coast = max(accel_coast, accel_clip[0])
clipped_accel_coast_interp = np.interp(v_ego, [MIN_ALLOW_THROTTLE_SPEED, MIN_ALLOW_THROTTLE_SPEED*2], [accel_clip[1], clipped_accel_coast])
accel_clip[1] = min(accel_clip[1], clipped_accel_coast_interp)
clipped_accel_coast = max(accel_coast, accel_limits_turns[0])
clipped_accel_coast_interp = np.interp(v_ego, [MIN_ALLOW_THROTTLE_SPEED, MIN_ALLOW_THROTTLE_SPEED*2], [accel_limits_turns[1], clipped_accel_coast])
accel_limits_turns[1] = min(accel_limits_turns[1], clipped_accel_coast_interp)
if force_slow_decel:
v_cruise = 0.0
# clip limits, cannot init MPC outside of bounds
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
accel_clip[0] = min(accel_clip[0], self.a_desired + 0.05)
accel_clip[1] = max(accel_clip[1], self.a_desired - 0.05)
self.lead_one = sm['radarState'].leadOne
self.lead_two = sm['radarState'].leadTwo
lead_one = sm['radarState'].leadOne
lead_dist = lead_one.dRel if lead_one.status else 50.0
lead_dist = self.lead_one.dRel if self.lead_one.status else 50.0
# Smooth lead distance (EMA) to avoid chatter in thresholds
alpha = max(0.02, min(0.15, 0.05 + 0.002 * v_ego))
if self.lead_dist_f is None:
self.lead_dist_f = float(lead_dist)
else:
self.lead_dist_f += alpha * (float(lead_dist) - self.lead_dist_f)
# Lead stability estimation and recent-brake timer
now_t = time.monotonic()
v_rel = (v_ego - lead_one.vLead) if lead_one.status else 0.0
# relative speed (ego - lead) positive when closing
v_rel = (v_ego - self.lead_one.vLead) if self.lead_one.status else 0.0
if self.prev_lead_dist is None:
d_rel_dot = 0.0
else:
d_rel_dot = (lead_dist - self.prev_lead_dist) / max(self.dt, 1e-3)
self.prev_lead_dist = lead_dist
# Remember time of last non-trivial model brake risk
if 'raw_brake_max' in locals() and raw_brake_max is not None and raw_brake_max > 0.02:
self.last_big_brake_t = now_t
# Stable lead heuristic (short window, cheap to compute)
recently_braked = (now_t - self.last_big_brake_t) < 0.7
self.stable_lead = (
self.lead_one.status and
abs(v_rel) < 0.5 and
abs(d_rel_dot) < 0.5 and
not recently_braked
)
# Calculate scene uncertainty from model desire prediction entropy and disengage predictions
uncertainty = 0.0
raw_brake_max = 0.0
if hasattr(sm['modelV2'], 'meta'):
# Desire prediction entropy (maneuver uncertainty), normalized to [0, 1]
desire_entropy = 0.0
if hasattr(sm['modelV2'].meta, 'desirePrediction'):
desire_probs = sm['modelV2'].meta.desirePrediction
@@ -218,37 +289,40 @@ class LongitudinalPlanner:
p = probs / total
entropy = -np.sum(p * np.log(p + 1e-10))
max_entropy = np.log(len(p))
desire_entropy = float(entropy / max(max_entropy, 1e-6))
desire_entropy = float(entropy / max(max_entropy, 1e-6)) # normalized entropy in [0,1]
else:
desire_entropy = 0.0 # guard against all-zero vector
# Disengage prediction risk (intervention likelihood)
disengage_risk = 0.0
raw_brake_max = -1.0
lam = -1.0
if hasattr(sm['modelV2'].meta, 'disengagePredictions'):
# Use brake press probabilities as primary risk indicator
brake_probs = sm['modelV2'].meta.disengagePredictions.brakePressProbs
if len(brake_probs) > 0:
# Exponentially decayed max over the full horizon
probs = np.asarray(brake_probs, dtype=float)
# Clip tiny brake blips so they don't inflate uncertainty
if float(np.max(probs)) < 0.015:
probs = probs * 0.5
raw_brake_max = float(np.max(probs))
# Time vector assuming model horizon step = DT_MDL
t = np.arange(len(probs), dtype=float) * DT_MDL
lam = 0.6
lam = 0.6 # decay rate per second (tunable: 0.50.9 typical)
weights = np.exp(-lam * t)
disengage_risk = float(np.max(probs * weights))
# Combined uncertainty metric (range roughly 0..2), with dual-track filtering
raw_uncertainty = desire_entropy + disengage_risk
# Update filters
self.uncert_slow.update(raw_uncertainty)
self.uncert_fast.update(raw_uncertainty)
# Use a more permissive track for accel decisions
uncertainty = self.uncert_slow.x
uncertainty_accel = min(self.uncert_slow.x, self.uncert_fast.x)
if raw_brake_max > 0.02:
self.last_big_brake_t = now_t
recently_braked = (now_t - self.last_big_brake_t) < 0.7
self.stable_lead = (
lead_one.status and
abs(v_rel) < 0.5 and
abs(d_rel_dot) < 0.5 and
not recently_braked
)
# --- Slope-based panic bypass ---
if self._uncert_last_t is None:
uncert_slope = 0.0
else:
@@ -257,10 +331,15 @@ class LongitudinalPlanner:
self._uncert_last = uncertainty
self._uncert_last_t = now_t
closing_fast = lead_one.status and (v_ego - lead_one.vLead) > 0.5
closing_fast = (self.lead_one.status and (v_ego - self.lead_one.vLead) > 0.5)
# Trigger if either slope is high or magnitude is high; require a valid lead and closing
panic_bypass = closing_fast and (uncert_slope > UNCERT_SLOPE_TRIG or uncertainty >= UNCERT_MAG_TRIG)
if panic_bypass:
cloudlog.error(f"LON_SLOPE slope={uncert_slope:.3f} uncertainty={uncertainty:.3f} v_ego={v_ego:.2f}")
try:
cloudlog.error(f"LON_SLOPE; slope={uncert_slope:.3f}/s; uncertainty={uncertainty:.3f}; v_ego={v_ego:.2f}; v_rel={(v_ego - self.lead_one.vLead) if self.lead_one.status else 0.0:.2f}; lead_dist={self.lead_dist_f if self.lead_dist_f is not None else -1:.2f}; trigger=True")
except Exception:
pass
self.mpc.set_weights(sm['frogpilotPlan'].accelerationJerk,
sm['frogpilotPlan'].dangerJerk,
@@ -272,10 +351,12 @@ class LongitudinalPlanner:
uncertainty=uncertainty,
panic_bypass=panic_bypass,
stop_distance=getattr(frogpilot_toggles, "stop_distance", 6.0))
self.mpc.set_accel_limits(accel_clip[0], accel_clip[1])
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
# Keep StarPilot behavior as the primary gate: desired follow distance implies
# lead tracking in ACC mode. trackingLead is treated as an additive hint.
# After deciding the MPC mode via get_mpc_mode(), ensure MPC uses that mode when not mlsim
dec_mpc_mode = self.get_mpc_mode()
if not self.mlsim:
self.mpc.mode = dec_mpc_mode
tracking_lead = sm['frogpilotPlan'].desiredFollowDistance > 0
if hasattr(sm['frogpilotPlan'], 'trackingLead'):
tracking_lead = tracking_lead or bool(sm['frogpilotPlan'].trackingLead)
@@ -283,6 +364,7 @@ class LongitudinalPlanner:
sm['frogpilotPlan'].dangerFactor, sm['frogpilotPlan'].tFollow,
personality=sm['selfdriveState'].personality, tracking_lead=tracking_lead)
self.a_desired_trajectory_full = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
self.v_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
self.j_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC[:-1], self.mpc.j_solution)
@@ -292,31 +374,42 @@ class LongitudinalPlanner:
if self.fcw:
cloudlog.info("FCW triggered")
# Safety checks for rubber-banding mitigation
max_jerk = np.max(np.abs(self.mpc.j_solution))
max_accel_change = np.max(np.abs(np.diff(self.mpc.a_solution)))
if max_jerk > 5.0: # m/s^3
cloudlog.warning(f"High jerk detected: {max_jerk:.2f} m/s^3")
if max_accel_change > 2.0: # m/s^2
cloudlog.warning(f"High acceleration change: {max_accel_change:.2f} m/s^2")
# Interpolate 0.05 seconds and save as starting point for next iteration
a_prev = self.a_desired
self.a_desired = float(np.interp(self.dt, CONTROL_N_T_IDX, self.a_desired_trajectory))
self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0
if lead_one.status:
rel_v = max(0.0, v_ego - lead_one.vLead)
# Anticipatory pre-brake to avoid "coming in hot" when closing on a lead
if self.lead_one.status:
rel_v = max(0.0, v_ego - self.lead_one.vLead)
# dynamic time headway adds a small buffer when uncertainty is elevated
base_th = 1.6
th = base_th + 0.6 * max(0.0, uncertainty - 0.42)
desired_gap = th * v_ego
if self.lead_dist_f is not None and self.lead_dist_f < desired_gap and rel_v > 0.5:
if (self.lead_dist_f is not None and self.lead_dist_f < desired_gap and rel_v > 0.5):
k_rel, k_unc = 0.04, 0.20
pre_brake = k_rel * rel_v + k_unc * max(0.0, uncertainty - 0.42)
self.a_desired = float(self.a_desired - min(pre_brake, 0.06))
pre_brake = min(pre_brake, 0.06)
self.a_desired = float(self.a_desired - pre_brake)
# Small deadzone around zero accel to kill micro-dithers
if -0.05 < self.a_desired < 0.05:
self.a_desired = 0.0
action_t = frogpilot_toggles.longitudinalActuatorDelay + DT_MDL
output_a_target_mpc, output_should_stop_mpc = get_accel_from_plan(self.v_desired_trajectory, self.a_desired_trajectory, CONTROL_N_T_IDX,
action_t=action_t, vEgoStopping=frogpilot_toggles.vEgoStopping)
output_a_target_mpc, output_should_stop_mpc = get_accel_from_plan(self.v_desired_trajectory, self.a_desired_trajectory,
action_t=action_t, vEgoStopping=frogpilot_toggles.vEgoStopping)
output_a_target_e2e = sm['modelV2'].action.desiredAcceleration
output_should_stop_e2e = sm['modelV2'].action.shouldStop
# Keep StarPilot behavior: for tinygrad v10/v11/v12 in experimental mode, blend with model action output.
if self.mode == 'acc' or self.generation == 'v9':
output_a_target = output_a_target_mpc
self.output_should_stop = output_should_stop_mpc
@@ -325,9 +418,9 @@ class LongitudinalPlanner:
self.output_should_stop = output_should_stop_e2e or output_should_stop_mpc
for idx in range(2):
accel_clip[idx] = np.clip(accel_clip[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
self.output_a_target = np.clip(output_a_target, accel_clip[0], accel_clip[1])
self.prev_accel_clip = accel_clip
accel_limits_turns[idx] = np.clip(accel_limits_turns[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
self.output_a_target = np.clip(output_a_target, accel_limits_turns[0], accel_limits_turns[1])
self.prev_accel_clip = accel_limits_turns
def publish(self, sm, pm):
plan_send = messaging.new_message('longitudinalPlan')