Files
onepilot/selfdrive/controls/controlsd.py
github-actions[bot] 36ff86f8a4 sunnypilot vtesla release
date: 2026-04-25T20:12:51
master commit: bb6c0f73e726caadf39c6857b7af750aaea142a0
2026-04-25 20:13:00 +08:00

275 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import math
from numbers import Number
from cereal import car, log
import cereal.messaging as messaging
from openpilot.common.constants import CV
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, DT_CTRL, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog
from opendbc.car.car_helpers import interfaces
from opendbc.car.vehicle_model import VehicleModel
from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.modeld.modeld import LAT_SMOOTH_SECONDS
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
from openpilot.sunnypilot.selfdrive.controls.controlsd_ext import ControlsExt
# ==========================================
# ++ 新增:匯入 HTD 模組 ++
# ==========================================
from openpilot.sunnypilot.selfdrive.controls.lib.human_turn_detection import HumanTurnDetection, HTDState
State = log.SelfdriveState.OpenpilotState
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
class Controls(ControlsExt):
def __init__(self) -> None:
self.params = Params()
cloudlog.info("controlsd is waiting for CarParams")
self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
cloudlog.info("controlsd got CarParams")
# Initialize sunnypilot controlsd extension and base model state
ControlsExt.__init__(self, self.CP, self.params)
self.CI = interfaces[self.CP.carFingerprint](self.CP, self.CP_SP)
self.sm = messaging.SubMaster(['liveDelay', 'liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
'driverMonitoringState', 'onroadEvents', 'driverAssistance', 'liveDelay'] + self.sm_services_ext,
poll='selfdriveState')
self.pm = messaging.PubMaster(['carControl', 'controlsState'] + self.pm_services_ext)
self.steer_limited_by_safety = False
self.curvature = 0.0
self.desired_curvature = 0.0
self.pose_calibrator = PoseCalibrator()
self.calibrated_pose: Pose | None = None
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CP_SP, self.CI, DT_CTRL)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CP_SP, self.CI, DT_CTRL)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CP_SP, self.CI, DT_CTRL)
self.LaC = ControlsExt.initialize_lateral_control(self, self.LaC, self.CI, DT_CTRL)
# ==========================================
# ++ 新增:初始化 HTD ++
# ==========================================
self.htd = HumanTurnDetection()
self.htd_state = HTDState.INACTIVE
def update(self):
self.sm.update(15)
if self.sm.updated["liveCalibration"]:
self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
if self.sm.updated["livePose"]:
device_pose = Pose.from_live_pose(self.sm['livePose'])
self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
def state_control(self):
CS = self.sm['carState']
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(lp.steerRatio, 0.1)
self.VM.update_params(x, sr)
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
torque_params.frictionCoefficientFiltered)
self.LaC.extension.update_limits()
self.LaC.extension.update_model_v2(self.sm['modelV2'])
self.LaC.extension.update_lateral_lag(self.lat_delay)
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.sm['selfdriveState'].enabled
# Check which actuators can be enabled
standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, 0.3) or CS.standstill
# Get which state to use for active lateral control
_lat_active = self.get_lat_active(self.sm)
# ==========================================
# ++ 新增HTD (人工接管大轉彎判斷) 邏輯 ++
# ==========================================
# 1. 永遠執行 update 以記錄扭力與角度數據
htd_allowed, self.htd_state = self.htd.update(
_lat_active,
CS.cruiseState.enabled,
CS.steeringAngleDeg,
CS.steeringTorque,
CS.vEgo,
CS.steeringPressed
)
# 2. 如果 HTD 有在 UI 啟用,則根據判斷結果覆寫 _lat_active
if self.htd._enabled:
_lat_active = _lat_active and htd_allowed
# ==========================================
CC.latActive = _lat_active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
(not standstill or self.CP.steerAtStandstill)
CC.longActive = CC.enabled and (self.CP.openpilotLongitudinalControl or not self.CP_SP.pcmCruiseSpeed)
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
self.LoC.reset()
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, self.CP_SP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
override_longitudinal = any(e.overrideLongitudinal for e in self.sm['onroadEvents'])
actuators.accel = float(self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop,
pid_accel_limits, freeze_integrator=override_longitudinal))
# Steering PID loop and lateral MPC
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
lat_delay = self.sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS
actuators.curvature = self.desired_curvature
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited_by_safety, self.desired_curvature,
self.calibrated_pose, curvature_limited, lat_delay)
actuators.torque = float(steer)
actuators.steeringAngleDeg = float(steeringAngleDeg)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, Number):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
return CC, lac_log
def publish(self, CC, lac_log):
CS = self.sm['carState']
# Orientation and angle rates can be useful for carcontroller
CC.currentCurvature = self.curvature
if self.calibrated_pose is not None:
CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
CC.cruiseControl.override = CC.enabled and not CC.longActive and (self.CP.openpilotLongitudinalControl or not self.CP_SP.pcmCruiseSpeed)
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and not self.sm['longitudinalPlan'].shouldStop
hudControl = CC.hudControl
hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
hudControl.speedVisible = CC.enabled
hudControl.lanesVisible = CC.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
if self.sm.valid['driverAssistance']:
hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
if self.get_lat_active(self.sm):
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited_by_safety = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited_by_safety = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
cs = dat.controlsState
cs.curvature = self.curvature
cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
cs.desiredCurvature = self.desired_curvature
cs.longControlState = self.LoC.long_control_state
cs.upAccelCmd = float(self.LoC.pid.p)
cs.uiAccelCmd = float(self.LoC.pid.i)
cs.ufAccelCmd = float(self.LoC.pid.f)
cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or
(self.sm['selfdriveState'].state == State.softDisabling))
lat_tuning = self.CP.lateralTuning.which()
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
cs.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
cs.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
cs.lateralControlState.torqueState = lac_log
self.pm.send('controlsState', dat)
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
def run(self):
rk = Ratekeeper(100, print_delay_threshold=None)
while True:
self.update()
CC, lac_log = self.state_control()
self.publish(CC, lac_log)
self.get_params_sp(self.sm)
self.run_ext(self.sm, self.pm)
rk.monitor_time()
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.run()
if __name__ == "__main__":
main()