mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-26 14:02:06 +08:00
2675d43adb
Remove duplicate STEER_ANGLE_SATURATION_THRESHOLD import Cleaned up an unnecessary duplicate import of STEER_ANGLE_SATURATION_THRESHOLD from latcontrol_angle_torque. This simplifies the module imports and prevents potential redundancy or confusion. Refactor lateral control to combine torque and angle logic Merged functionalities of LatControlTorque and LatControlAngle into a single LatControlAngleTorque class. Refactored code to utilize methods from both parent classes, reducing duplication and improving maintainability. Add angle-torque hybrid lateral control for Hyundai CAN FD Introduces `LatControlAngleTorque` to enable hybrid angle and torque-based steering for specific Hyundai models. Updates related logic in carcontroller, interface, and controlsd to accommodate this new lateral control method. Adjusts torque parameters for enhanced control in supported models.
261 lines
11 KiB
Python
Executable File
261 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import math
|
|
import threading
|
|
import time
|
|
from typing import SupportsFloat
|
|
|
|
from cereal import car, log
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.conversions import Conversions as CV
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
from opendbc.car.car_helpers import interfaces
|
|
from opendbc.car.vehicle_model import VehicleModel
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature
|
|
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
|
|
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
|
|
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
|
|
from openpilot.selfdrive.controls.lib.latcontrol_angle_torque import LatControlAngleTorque
|
|
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
|
|
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
|
|
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
|
|
|
|
from openpilot.sunnypilot.selfdrive.controls.controlsd_ext import ControlsExt
|
|
|
|
State = log.SelfdriveState.OpenpilotState
|
|
LaneChangeState = log.LaneChangeState
|
|
LaneChangeDirection = log.LaneChangeDirection
|
|
|
|
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
|
|
|
|
|
|
class Controls(ControlsExt):
|
|
def __init__(self) -> None:
|
|
self.params = Params()
|
|
cloudlog.info("controlsd is waiting for CarParams")
|
|
self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
|
|
cloudlog.info("controlsd got CarParams")
|
|
|
|
# Initialize sunnypilot controlsd extension
|
|
ControlsExt.__init__(self, self.CP, self.params)
|
|
|
|
self.CI = interfaces[self.CP.carFingerprint](self.CP, self.CP_SP)
|
|
|
|
self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
|
|
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
|
|
'driverMonitoringState', 'onroadEvents', 'driverAssistance'] + self.sm_services_ext,
|
|
poll='selfdriveState')
|
|
self.pm = messaging.PubMaster(['carControl', 'controlsState'] + self.pm_services_ext)
|
|
|
|
self.steer_limited_by_controls = False
|
|
self.curvature = 0.0
|
|
self.desired_curvature = 0.0
|
|
|
|
self.pose_calibrator = PoseCalibrator()
|
|
self.calibrated_pose: Pose | None = None
|
|
|
|
self.LoC = LongControl(self.CP)
|
|
self.VM = VehicleModel(self.CP)
|
|
self.LaC: LatControl
|
|
if self.CP.steerControlType == car.CarParams.SteerControlType.angle and self.CP.lateralTuning.which() == 'torque':
|
|
self.LaC = LatControlAngleTorque(self.CP, self.CP_SP, self.CI)
|
|
elif self.CP.steerControlType == car.CarParams.SteerControlType.angle:
|
|
self.LaC = LatControlAngle(self.CP, self.CP_SP, self.CI)
|
|
elif self.CP.lateralTuning.which() == 'pid':
|
|
self.LaC = LatControlPID(self.CP, self.CP_SP, self.CI)
|
|
elif self.CP.lateralTuning.which() == 'torque':
|
|
self.LaC = LatControlTorque(self.CP, self.CP_SP, self.CI)
|
|
|
|
def update(self):
|
|
self.sm.update(15)
|
|
if self.sm.updated["liveCalibration"]:
|
|
self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
|
|
if self.sm.updated["livePose"]:
|
|
device_pose = Pose.from_live_pose(self.sm['livePose'])
|
|
self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
|
|
|
|
def state_control(self):
|
|
CS = self.sm['carState']
|
|
|
|
# Update VehicleModel
|
|
lp = self.sm['liveParameters']
|
|
x = max(lp.stiffnessFactor, 0.1)
|
|
sr = max(lp.steerRatio, 0.1)
|
|
self.VM.update_params(x, sr)
|
|
|
|
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
|
|
self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
|
|
|
|
# Update Torque Params
|
|
if self.CP.lateralTuning.which() == 'torque':
|
|
torque_params = self.sm['liveTorqueParameters']
|
|
if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
|
|
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
|
|
torque_params.frictionCoefficientFiltered)
|
|
|
|
self.LaC.extension.update_model_v2(self.sm['modelV2'])
|
|
|
|
long_plan = self.sm['longitudinalPlan']
|
|
model_v2 = self.sm['modelV2']
|
|
|
|
CC = car.CarControl.new_message()
|
|
CC.enabled = self.sm['selfdriveState'].enabled
|
|
|
|
# Check which actuators can be enabled
|
|
standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, 0.3) or CS.standstill
|
|
|
|
# Get which state to use for active lateral control
|
|
_lat_active = self.get_lat_active(self.sm)
|
|
|
|
CC.latActive = _lat_active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
|
|
(not standstill or self.CP.steerAtStandstill)
|
|
CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl
|
|
|
|
actuators = CC.actuators
|
|
actuators.longControlState = self.LoC.long_control_state
|
|
|
|
# Enable blinkers while lane changing
|
|
if model_v2.meta.laneChangeState != LaneChangeState.off:
|
|
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
|
|
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
|
|
|
|
if not CC.latActive:
|
|
self.LaC.reset()
|
|
if not CC.longActive:
|
|
self.LoC.reset()
|
|
|
|
# accel PID loop
|
|
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
|
|
actuators.accel = float(self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits))
|
|
|
|
# Steering PID loop and lateral MPC
|
|
# Reset desired curvature to current to avoid violating the limits on engage
|
|
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
|
|
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
|
|
|
|
actuators.curvature = self.desired_curvature
|
|
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
|
|
self.steer_limited_by_controls, self.desired_curvature,
|
|
self.calibrated_pose, curvature_limited) # TODO what if not available
|
|
actuators.torque = float(steer)
|
|
actuators.steeringAngleDeg = float(steeringAngleDeg)
|
|
# Ensure no NaNs/Infs
|
|
for p in ACTUATOR_FIELDS:
|
|
attr = getattr(actuators, p)
|
|
if not isinstance(attr, SupportsFloat):
|
|
continue
|
|
|
|
if not math.isfinite(attr):
|
|
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
|
|
setattr(actuators, p, 0.0)
|
|
|
|
return CC, lac_log
|
|
|
|
def publish(self, CC, lac_log):
|
|
CS = self.sm['carState']
|
|
|
|
# Orientation and angle rates can be useful for carcontroller
|
|
# Only calibrated (car) frame is relevant for the carcontroller
|
|
CC.currentCurvature = self.curvature
|
|
if self.calibrated_pose is not None:
|
|
CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
|
|
CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
|
|
|
|
CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
|
|
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
|
|
|
|
speeds = self.sm['longitudinalPlan'].speeds
|
|
if len(speeds):
|
|
CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
|
|
|
|
hudControl = CC.hudControl
|
|
hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
|
|
hudControl.speedVisible = CC.enabled
|
|
hudControl.lanesVisible = CC.enabled
|
|
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
|
|
hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
|
|
hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
|
|
|
|
hudControl.rightLaneVisible = True
|
|
hudControl.leftLaneVisible = True
|
|
if self.sm.valid['driverAssistance']:
|
|
hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
|
|
hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
|
|
|
|
if self.sm['selfdriveState'].active:
|
|
CO = self.sm['carOutput']
|
|
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
|
|
self.steer_limited_by_controls = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
|
|
STEER_ANGLE_SATURATION_THRESHOLD
|
|
else:
|
|
self.steer_limited_by_controls = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2
|
|
|
|
# TODO: both controlsState and carControl valids should be set by
|
|
# sm.all_checks(), but this creates a circular dependency
|
|
|
|
# controlsState
|
|
dat = messaging.new_message('controlsState')
|
|
dat.valid = CS.canValid
|
|
cs = dat.controlsState
|
|
|
|
cs.curvature = self.curvature
|
|
cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
|
|
cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
|
|
cs.desiredCurvature = self.desired_curvature
|
|
cs.longControlState = self.LoC.long_control_state
|
|
cs.upAccelCmd = float(self.LoC.pid.p)
|
|
cs.uiAccelCmd = float(self.LoC.pid.i)
|
|
cs.ufAccelCmd = float(self.LoC.pid.f)
|
|
cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or
|
|
(self.sm['selfdriveState'].state == State.softDisabling))
|
|
|
|
lat_tuning = self.CP.lateralTuning.which()
|
|
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
|
|
cs.lateralControlState.angleState = lac_log
|
|
elif lat_tuning == 'pid':
|
|
cs.lateralControlState.pidState = lac_log
|
|
elif lat_tuning == 'torque':
|
|
cs.lateralControlState.torqueState = lac_log
|
|
|
|
self.pm.send('controlsState', dat)
|
|
|
|
# carControl
|
|
cc_send = messaging.new_message('carControl')
|
|
cc_send.valid = CS.canValid
|
|
cc_send.carControl = CC
|
|
self.pm.send('carControl', cc_send)
|
|
|
|
def params_thread(self, evt):
|
|
while not evt.is_set():
|
|
self.get_params_sp()
|
|
|
|
time.sleep(0.1)
|
|
|
|
def run(self):
|
|
rk = Ratekeeper(100, print_delay_threshold=None)
|
|
e = threading.Event()
|
|
t = threading.Thread(target=self.params_thread, args=(e,))
|
|
try:
|
|
t.start()
|
|
while True:
|
|
self.update()
|
|
CC, lac_log = self.state_control()
|
|
self.publish(CC, lac_log)
|
|
self.run_ext(self.sm, self.pm)
|
|
rk.monitor_time()
|
|
finally:
|
|
e.set()
|
|
t.join()
|
|
|
|
|
|
def main():
|
|
config_realtime_process(4, Priority.CTRL_HIGH)
|
|
controls = Controls()
|
|
controls.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|