Files
StarPilot/selfdrive/controls/lib/longcontrol.py
T
Woohyun Rho 1e36f01748 fixed: Implement urgency-based rate limiting for longitudinal control
Modified the acceleration and deceleration smoothing logic in longcontrol.py to dynamically adjust the rate of acceleration change based on deceleration urgency. This enhances responsiveness during urgent braking scenarios while maintaining smooth acceleration.

Generated by Copilot

(cherry picked from commit f6cbb584a3420b2f27af3c911106f34f43aa3549)
(cherry picked from commit ccf722452e61bb62869d2d6885c0679d9ffa2c8f)
2025-11-24 13:50:05 +09:00

270 lines
12 KiB
Python

from cereal import car
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone
from openpilot.selfdrive.controls.lib.pid import PIDController
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.car.gm.values import CarControllerParams
CONTROL_N_T_IDX = ModelConstants.T_IDXS[:CONTROL_N]
LongCtrlState = car.CarControl.Actuators.LongControlState
def long_control_state_trans(CP, active, long_control_state, v_ego,
should_stop, brake_pressed, cruise_standstill, frogpilot_toggles):
# Ignore cruise standstill if car has a gas interceptor
cruise_standstill = cruise_standstill and not CP.enableGasInterceptor
stopping_condition = should_stop
starting_condition = (not should_stop and
not cruise_standstill and
not brake_pressed)
started_condition = v_ego > frogpilot_toggles.vEgoStarting
if not active:
long_control_state = LongCtrlState.off
else:
if long_control_state == LongCtrlState.off:
if not starting_condition:
long_control_state = LongCtrlState.stopping
else:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
else:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.stopping:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
elif starting_condition:
long_control_state = LongCtrlState.pid
elif long_control_state in [LongCtrlState.starting, LongCtrlState.pid]:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif started_condition:
long_control_state = LongCtrlState.pid
return long_control_state
def long_control_state_trans_old_long(CP, active, long_control_state, v_ego, v_target,
v_target_1sec, brake_pressed, cruise_standstill, frogpilot_toggles):
accelerating = v_target_1sec > v_target
planned_stop = (v_target < frogpilot_toggles.vEgoStopping and
v_target_1sec < frogpilot_toggles.vEgoStopping and
not accelerating)
stay_stopped = (v_ego < frogpilot_toggles.vEgoStopping and
(brake_pressed or cruise_standstill))
stopping_condition = planned_stop or stay_stopped
starting_condition = (v_target_1sec > frogpilot_toggles.vEgoStarting and
accelerating and
not cruise_standstill and
not brake_pressed)
started_condition = v_ego > frogpilot_toggles.vEgoStarting
if not active:
long_control_state = LongCtrlState.off
else:
if long_control_state in (LongCtrlState.off, LongCtrlState.pid):
long_control_state = LongCtrlState.pid
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif long_control_state == LongCtrlState.stopping:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
elif starting_condition:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.starting:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif started_condition:
long_control_state = LongCtrlState.pid
return long_control_state
class LongControl:
def __init__(self, CP):
self.CP = CP
self.long_control_state = LongCtrlState.off
self.experimental_mode = False
pos_p_limit = 0.0 # if params("NoPositivePResponse") else None # put parameter-based control here
self.pid = PIDController((CP.longitudinalTuning.kpBP, CP.longitudinalTuning.kpV),
(CP.longitudinalTuning.kiBP, CP.longitudinalTuning.kiV),
rate=1 / DT_CTRL, pos_p_limit=pos_p_limit)
# Preserve legacy behaviour when no feedforward gain is provided (default of 0.0)
kf = getattr(CP.longitudinalTuning, 'kfDEPRECATED', 0.0)
self.feedforward_gain = kf if kf != 0.0 else 1.0
self.v_pid = 0.0
self._mode_setup()
self.last_output_accel = 0.0
def update_mpc_mode(self, experimental_mode):
new_mode = 'blended' if experimental_mode else 'acc'
if self.transitioning and self.prev_mode == 'blended' and self.current_mode == 'acc':
self.mode_transition_timer = 0.0
if new_mode != self.current_mode:
self.prev_mode = self.current_mode
self.transitioning = True
self.mode_transition_timer = 0.0
self.mode_transition_filter.x = self.last_output_accel
self.current_mode = new_mode
if self.transitioning:
self.mode_transition_timer += DT_CTRL
if self.mode_transition_timer >= self.mode_transition_duration:
self.transitioning = False
def _mode_setup(self):
self.prev_mode = 'acc'
self.current_mode = 'acc'
self.mode_transition_filter = FirstOrderFilter(0.0, 0.5, DT_CTRL)
self.mode_transition_timer = 0.0
self.mode_transition_duration = 1.0
self.transitioning = False
def reset(self):
self.pid.reset()
def update(self, active, CS, a_target, should_stop, accel_limits, frogpilot_toggles):
"""Update longitudinal control. This updates the state machine and runs a PID loop"""
self.pid.neg_limit = accel_limits[0]
self.pid.pos_limit = accel_limits[1]
self.long_control_state = long_control_state_trans(self.CP, active, self.long_control_state, CS.vEgo,
should_stop, CS.brakePressed,
CS.cruiseState.standstill, frogpilot_toggles)
if self.long_control_state == LongCtrlState.off:
self.reset()
output_accel = 0.
elif self.long_control_state == LongCtrlState.stopping:
output_accel = self.last_output_accel
if output_accel > frogpilot_toggles.stopAccel:
output_accel = min(output_accel, 0.0)
output_accel -= frogpilot_toggles.stoppingDecelRate * DT_CTRL
self.reset()
elif self.long_control_state == LongCtrlState.starting:
output_accel = (a_target if frogpilot_toggles.human_acceleration else frogpilot_toggles.startAccel)
self.reset()
else: # LongCtrlState.pid
error = a_target - CS.aEgo
self.update_mpc_mode(self.experimental_mode)
feedforward = a_target * self.feedforward_gain
raw_output_accel = self.pid.update(error, speed=CS.vEgo, feedforward=feedforward)
if self.transitioning and self.prev_mode == 'acc' and self.current_mode == 'blended':
if raw_output_accel < 0 and raw_output_accel < self.last_output_accel:
progress = min(1.0, self.mode_transition_timer / self.mode_transition_duration)
# Soften transition at low urgency, but keep sharp for high decel
# 20% smoother for chill decel (lower exponent)
urgency = abs(raw_output_accel / CarControllerParams.ACCEL_MIN)
urgency_smooth = min(1.0, urgency ** 0.4) # 20% smoother for chill decel
blend_factor = 1.0 - (1.0 - progress) * (1.0 - urgency_smooth)
output_accel = self.last_output_accel + (raw_output_accel - self.last_output_accel) * blend_factor
else:
output_accel = raw_output_accel
else:
output_accel = raw_output_accel
if self.long_control_state == LongCtrlState.pid:
# Smooth acceleration and deceleration with urgency-based rate limiting
base_rate = 1.0
if output_accel < self.last_output_accel: # Deceleration requested
decel_needed = self.last_output_accel - output_accel
# Use a safe default for ACCEL_MIN if not available, to prevent division by zero
max_decel = abs(CarControllerParams.ACCEL_MIN) if CarControllerParams.ACCEL_MIN != 0 else 4.0
urgency = min(1.0, decel_needed / max_decel)
# Adjust rate based on urgency (1.0 m/s^3 for low urgency, up to 4.0 m/s^3 for high urgency)
max_rate = 1.0 + 3.0 * urgency
else:
max_rate = base_rate # Acceleration is always smooth
max_accel_change = max_rate * DT_CTRL
output_accel = clip(output_accel,
self.last_output_accel - max_accel_change,
self.last_output_accel + max_accel_change)
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel
def reset_old_long(self, v_pid):
"""Reset PID controller and change setpoint"""
self.pid.reset()
self.v_pid = v_pid
def update_old_long(self, active, CS, long_plan, accel_limits, t_since_plan, frogpilot_toggles):
"""Update longitudinal control. This updates the state machine and runs a PID loop"""
# Interp control trajectory
speeds = long_plan.speeds
if len(speeds) == CONTROL_N:
v_target_now = interp(t_since_plan, CONTROL_N_T_IDX, speeds)
a_target_now = interp(t_since_plan, CONTROL_N_T_IDX, long_plan.accels)
v_target = interp(frogpilot_toggles.longitudinalActuatorDelay + t_since_plan, CONTROL_N_T_IDX, speeds)
a_target = 2 * (v_target - v_target_now) / frogpilot_toggles.longitudinalActuatorDelay - a_target_now
v_target_1sec = interp(frogpilot_toggles.longitudinalActuatorDelay + t_since_plan + 1.0, CONTROL_N_T_IDX, speeds)
else:
v_target = 0.0
v_target_now = 0.0
v_target_1sec = 0.0
a_target = 0.0
self.pid.neg_limit = accel_limits[0]
self.pid.pos_limit = accel_limits[1]
output_accel = self.last_output_accel
self.long_control_state = long_control_state_trans_old_long(self.CP, active, self.long_control_state, CS.vEgo,
v_target, v_target_1sec, CS.brakePressed,
CS.cruiseState.standstill, frogpilot_toggles)
if self.long_control_state == LongCtrlState.off:
self.reset_old_long(CS.vEgo)
output_accel = 0.
elif self.long_control_state == LongCtrlState.stopping:
if output_accel > frogpilot_toggles.stopAccel:
output_accel = min(output_accel, 0.0)
output_accel -= frogpilot_toggles.stoppingDecelRate * DT_CTRL
self.reset_old_long(CS.vEgo)
elif self.long_control_state == LongCtrlState.starting:
output_accel = frogpilot_toggles.startAccel
self.reset_old_long(CS.vEgo)
elif self.long_control_state == LongCtrlState.pid:
self.v_pid = v_target_now
# Toyota starts braking more when it thinks you want to stop
# Freeze the integrator so we don't accelerate to compensate, and don't allow positive acceleration
# TODO too complex, needs to be simplified and tested on toyotas
prevent_overshoot = not self.CP.stoppingControl and CS.vEgo < 1.5 and v_target_1sec < 0.7 and v_target_1sec < self.v_pid
deadzone = interp(CS.vEgo, self.CP.longitudinalTuning.deadzoneBP, self.CP.longitudinalTuning.deadzoneV)
freeze_integrator = prevent_overshoot
error = self.v_pid - CS.vEgo
error_deadzone = apply_deadzone(error, deadzone)
feedforward = a_target * self.feedforward_gain
output_accel = self.pid.update(error_deadzone, speed=CS.vEgo,
feedforward=feedforward,
freeze_integrator=freeze_integrator)
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel