feat: Squash all pre-brand features into pre

This commit is contained in:
Rick Lan
2026-05-27 20:34:10 +08:00
parent 5e71386e5c
commit 542a25ca7b
32 changed files with 814 additions and 33 deletions
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Honda",
"key": "dp_honda_nidec_stock_long",
"type": "toggle_item",
"title": lambda: tr("Use Stock Longitudinal (Nidec)"),
"description": lambda: tr("Let the Honda Nidec ACC handle gas and brake instead of openpilot. Lateral control (steering) still runs through openpilot."),
"brands": ["honda"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Toyota / Lexus",
"key": "dp_toyota_stock_lon",
"type": "toggle_item",
"title": lambda: tr("Use Stock Longitudinal Control"),
"description": lambda: tr("Let the car's built-in ACC handle gas and brake instead of openpilot. Lateral control (steering) still runs through openpilot."),
"brands": ["toyota"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Toyota / Lexus",
"key": "dp_toyota_tss1_sng",
"type": "toggle_item",
"title": lambda: tr("Enable Stop-and-Go on TSS1"),
"description": lambda: tr("Restores stop-and-go behavior on Toyota Safety Sense 1.0 vehicles, allowing openpilot to resume from a full stop without driver intervention."),
"brands": ["toyota"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
+15
View File
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "VAG",
"key": "dp_vag_a0_sng",
"type": "toggle_item",
"title": lambda: tr("Enable Stop-and-Go on A0 Platform"),
"description": lambda: tr("Restores stop-and-go behavior on VAG A0 platform vehicles (Polo, Fabia, Ibiza, etc.), allowing openpilot to resume from a full stop without driver intervention."),
"brands": ["volkswagen"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "VAG",
"key": "dp_vag_avoid_eps_lockout",
"type": "toggle_item",
"title": lambda: tr("Avoid EPS Lockout"),
"description": lambda: tr("Scale steering torque down at low speeds to avoid EPS lockout."),
"brands": ["volkswagen"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Toyota / Lexus",
"key": "dp_toyota_door_auto_lock_unlock",
"type": "toggle_item",
"title": lambda: tr("Door Auto Lock/Unlock"),
"description": lambda: tr("Enable openpilot to auto-lock doors above 20 km/h and auto-unlock when shifting to Park."),
"brands": ["toyota"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
+3
View File
@@ -91,6 +91,9 @@ class Bus(StrEnum):
party = auto()
ap_party = auto()
sdsu = auto()
zss = auto()
def rate_limit(new_value, last_value, dw_step, up_step):
return float(np.clip(new_value, last_value + dw_step, last_value + up_step))
+33 -6
View File
@@ -56,6 +56,9 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.hondaNidec)]
ret.openpilotLongitudinalControl = True
if dp_params & structs.DPFlags.HondaNidecStockLong:
ret.openpilotLongitudinalControl = False
ret.pcmCruise = True
if candidate == CAR.HONDA_CRV_5G:
@@ -93,13 +96,25 @@ class CarInterface(CarInterfaceBase):
ret.longitudinalTuning.kiV = [1.2, 0.8, 0.5]
# Disable control if EPS mod detected
eps_modified = False
for fw in car_fw:
if fw.ecu == "eps" and b"," in fw.fwVersion:
ret.dashcamOnly = True
# ret.dashcamOnly = True
eps_modified = True
if candidate == CAR.HONDA_CIVIC:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560], [0, 2560]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[1.1], [0.33]]
if eps_modified:
# stock request input values: 0x0000, 0x00DE, 0x014D, 0x01EF, 0x0290, 0x0377, 0x0454, 0x0610, 0x06EE
# stock request output values: 0x0000, 0x0917, 0x0DC5, 0x1017, 0x119F, 0x140B, 0x1680, 0x1680, 0x1680
# modified request output values: 0x0000, 0x0917, 0x0DC5, 0x1017, 0x119F, 0x140B, 0x1680, 0x2880, 0x3180
# stock filter output values: 0x009F, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108
# modified filter output values: 0x009F, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0400, 0x0480
# note: max request allowed is 4096, but request is capped at 3840 in firmware, so modifications result in 2x max
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560, 8000], [0, 2560, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.3], [0.1]]
else:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560], [0, 2560]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[1.1], [0.33]]
elif candidate in (CAR.HONDA_CIVIC_BOSCH, CAR.HONDA_CIVIC_BOSCH_DIESEL):
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 4096], [0, 4096]] # TODO: determine if there is a dead zone at the top end
@@ -114,7 +129,10 @@ class CarInterface(CarInterfaceBase):
elif candidate == CAR.HONDA_ACCORD:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 4096], [0, 4096]] # TODO: determine if there is a dead zone at the top end
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.6], [0.18]]
if eps_modified:
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.3], [0.09]]
else:
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.6], [0.18]]
elif candidate == CAR.ACURA_ILX:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]] # TODO: determine if there is a dead zone at the top end
@@ -126,8 +144,15 @@ class CarInterface(CarInterfaceBase):
ret.wheelSpeedFactor = 1.025
elif candidate == CAR.HONDA_CRV_5G:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.64], [0.192]]
if eps_modified:
# stock request input values: 0x0000, 0x00DB, 0x01BB, 0x0296, 0x0377, 0x0454, 0x0532, 0x0610, 0x067F
# stock request output values: 0x0000, 0x0500, 0x0A15, 0x0E6D, 0x1100, 0x1200, 0x129A, 0x134D, 0x1400
# modified request output values: 0x0000, 0x0500, 0x0A15, 0x0E6D, 0x1100, 0x1200, 0x1ACD, 0x239A, 0x2800
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560, 10000], [0, 2560, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.21], [0.07]]
else:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.64], [0.192]]
ret.wheelSpeedFactor = 1.025
elif candidate == CAR.HONDA_CRV_HYBRID:
@@ -219,6 +244,8 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs[-1].safetyParam |= HondaSafetyFlags.RADARLESS.value
if candidate in HONDA_BOSCH_CANFD:
ret.safetyConfigs[-1].safetyParam |= HondaSafetyFlags.BOSCH_CANFD.value
if not ret.openpilotLongitudinalControl and candidate not in HONDA_BOSCH:
ret.safetyConfigs[-1].safetyParam |= HondaSafetyFlags.NIDEC_STOCK_LONG.value
# min speed to enable ACC. if car can do stop and go, then set enabling speed
# to a negative value, so it won't matter. Otherwise, add 0.5 mph margin to not
+1
View File
@@ -56,6 +56,7 @@ class HondaSafetyFlags(IntFlag):
NIDEC_ALT = 4
RADARLESS = 8
BOSCH_CANFD = 16
NIDEC_STOCK_LONG = 32
class HondaFlags(IntFlag):
@@ -154,6 +154,13 @@ class CarInterface(CarInterfaceBase):
if candidate in (CAR.KIA_OPTIMA_H,):
ret.dashcamOnly = True
# w/ SMDPS, allow steering to 0
if 0x2AA in fingerprint[0]:
ret.minSteerSpeed = 0.
print("----------------------------------------------")
print("dragonpilot: SMDPS detected!")
print("----------------------------------------------")
return ret
@staticmethod
+6
View File
@@ -22,4 +22,10 @@ CarParamsT = capnp.lib.capnp._StructModule
class DPFlags:
LatALKA = 1
ExtRadar = 2
ToyotaLockCtrl = 2 ** 2
ToyotaTSS1SnG = 2 ** 3
ToyotaStockLon = 2 ** 4
VagA0SnG = 2 ** 5
VagAvoidEPSLockout = 2 ** 6
HondaNidecStockLong = 2 ** 7
pass
+5 -3
View File
@@ -33,6 +33,8 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.subaru)]
if ret.flags & SubaruFlags.GLOBAL_GEN2:
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.GEN2.value
elif ret.flags & (SubaruFlags.IMPREZA_2018 | SubaruFlags.HYBRID) :
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.IMPREZA_2018.value
ret.steerLimitTimer = 0.4
ret.steerActuatorDelay = 0.1
@@ -51,11 +53,11 @@ class CarInterface(CarInterfaceBase):
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.0025, 0.1], [0.00025, 0.01]]
elif candidate == CAR.SUBARU_IMPREZA:
ret.steerActuatorDelay = 0.4 # end-to-end angle controller
ret.steerActuatorDelay = 0.1 # end-to-end angle controller
ret.lateralTuning.init('pid')
ret.lateralTuning.pid.kf = 0.00005
ret.lateralTuning.pid.kf = 0.00003
ret.lateralTuning.pid.kiBP, ret.lateralTuning.pid.kpBP = [[0., 20.], [0., 20.]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.2, 0.3], [0.02, 0.03]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.12, 0.18], [0.012, 0.018]]
elif candidate == CAR.SUBARU_IMPREZA_2020:
ret.lateralTuning.init('pid')
+11 -2
View File
@@ -26,6 +26,12 @@ class CarControllerParams:
elif CP.carFingerprint == CAR.SUBARU_IMPREZA_2020:
self.STEER_DELTA_UP = 35
self.STEER_MAX = 1439
self.STEER_DELTA_UP = 35
self.STEER_DELTA_DOWN = 70
elif CP.carFingerprint == CAR.SUBARU_IMPREZA:
self.STEER_MAX = 3071
self.STEER_DELTA_UP = 60
self.STEER_DELTA_DOWN = 60
else:
self.STEER_MAX = 2047
@@ -57,7 +63,7 @@ class SubaruSafetyFlags(IntFlag):
GEN2 = 1
LONG = 2
PREGLOBAL_REVERSED_DRIVER_TORQUE = 4
IMPREZA_2018 = 8
class SubaruFlags(IntFlag):
# Detected flags
@@ -74,6 +80,8 @@ class SubaruFlags(IntFlag):
HYBRID = 32
LKAS_ANGLE = 64
# rick
IMPREZA_2018 = 2 ** 10
GLOBAL_ES_ADDR = 0x787
GEN2_ES_BUTTONS_DID = b'\x11\x30'
@@ -143,7 +151,8 @@ class CAR(Platforms):
SubaruCarDocs("Subaru Crosstrek 2018-19", video="https://youtu.be/Agww7oE1k-s?t=26"),
SubaruCarDocs("Subaru XV 2018-19", video="https://youtu.be/Agww7oE1k-s?t=26"),
],
CarSpecs(mass=1568, wheelbase=2.67, steerRatio=15),
CarSpecs(mass=1568, wheelbase=2.67, steerRatio=13.5),
flags=SubaruFlags.IMPREZA_2018,
)
SUBARU_IMPREZA_2020 = SubaruPlatformConfig(
[
+1
View File
@@ -270,6 +270,7 @@ routes = [
CarTestRoute("6719965b0e1d1737/2023-02-09--22-44-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid
CarTestRoute("6719965b0e1d1737/2023-08-29--06-40-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid, openpilot longitudinal, radar disabled
CarTestRoute("14623aae37e549f3/2021-10-24--01-20-49", TOYOTA.TOYOTA_PRIUS_V),
CarTestRoute("ea8fbe72b96a185c|2023-02-22--09-20-34", TOYOTA.TOYOTA_CHR_TSS2), # openpilot longitudinal, with Radar Filter
CarTestRoute("202c40641158a6e5/2021-09-21--09-43-24", VOLKSWAGEN.VOLKSWAGEN_ARTEON_MK1),
CarTestRoute("2c68dda277d887ac/2021-05-11--15-22-20", VOLKSWAGEN.VOLKSWAGEN_ATLAS_MK1),
@@ -2,6 +2,7 @@ import math
import numpy as np
from opendbc.car import Bus, make_tester_present_msg, rate_limit, structs, ACCELERATION_DUE_TO_GRAVITY, DT_CTRL
from opendbc.car.lateral import apply_meas_steer_torque_limits, apply_std_steer_angle_limits, common_fault_avoidance
from opendbc.car.can_definitions import CanData
from opendbc.car.carlog import carlog
from opendbc.car.common.filter_simple import FirstOrderFilter, HighPassFilter
from opendbc.car.common.pid import PIDController
@@ -32,6 +33,17 @@ MAX_STEER_RATE_FRAMES = 17 # tx control frames needed before torque can be cut
# EPS allows user torque above threshold for 50 frames before permanently faulting
MAX_USER_TORQUE = 500
# Lock / unlock door commands - Credit goes to AlexandreSato!
from opendbc.car.common.conversions import Conversions as CV
LOCK_SPEED = 20 * CV.KPH_TO_MS
LOCK_UNLOCK_CAN_ID = 0x750
UNLOCK_CMD = b'\x40\x05\x30\x11\x00\x40\x00\x00'
LOCK_CMD = b'\x40\x05\x30\x11\x00\x80\x00\x00'
from cereal import car
PARK = car.CarState.GearShifter.park
DRIVE = car.CarState.GearShifter.drive
def get_long_tune(CP, params):
if CP.carFingerprint in TSS2_CAR:
@@ -75,6 +87,8 @@ class CarController(CarControllerBase):
self.secoc_acc_message_counter = 0
self.secoc_prev_reset_counter = 0
self.doors_locked = False
def update(self, CC, CS, now_nanos):
actuators = CC.actuators
stopping = actuators.longControlState == LongCtrlState.stopping
@@ -184,6 +198,9 @@ class CarController(CarControllerBase):
if not should_resume and CS.out.cruiseState.standstill:
self.standstill_req = True
if self.standstill_req and (self.CP.flags & ToyotaFlags.TSS1_SNG.value):
self.standstill_req = False
if self.frame % 3 == 0:
# Press distance button until we are at the correct bar length. Only change while enabled to avoid skipping startup popup
if self.frame % 6 == 0 and self.CP.openpilotLongitudinalControl:
@@ -247,6 +264,10 @@ class CarController(CarControllerBase):
elif net_acceleration_request_min > 0.3:
self.permit_braking = False
# rick - do not do delay compensation for non-TSS2 vehicles (e.g. car with sDSU?), assign the value back to actuators.accel
# from Jason, see https://github.com/sunnypilot/opendbc/compare/dd2016f77f8467ca2f7934db1b8c6d73164b3df7...f90b75b1531d0ef949c1c7fb8c175059448a2a97#diff-dc03b1fc7156134429efc0cdced75bc227d0ceb8bbd0c55763022fb9db6794d9
if not self.CP.carFingerprint in TSS2_CAR:
pcm_accel_cmd = actuators.accel
pcm_accel_cmd = float(np.clip(pcm_accel_cmd, self.params.ACCEL_MIN, self.params.ACCEL_MAX))
main_accel_cmd = 0. if self.CP.flags & ToyotaFlags.SECOC.value else pcm_accel_cmd
@@ -292,11 +313,11 @@ class CarController(CarControllerBase):
hud_control.rightLaneVisible, hud_control.leftLaneDepart,
hud_control.rightLaneDepart, lat_active, CS.lkas_hud))
if (self.frame % 100 == 0 or send_ui) and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if (self.frame % 100 == 0 or send_ui) and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value and not self.CP.flags & ToyotaFlags.RADAR_FILTER.value:
can_sends.append(toyotacan.create_fcw_command(self.packer, fcw_alert))
# keep radar disabled
if self.frame % 20 == 0 and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if self.frame % 20 == 0 and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value and not self.CP.flags & ToyotaFlags.RADAR_FILTER.value:
can_sends.append(make_tester_present_msg(0x750, 0, 0xF))
new_actuators = actuators.as_builder()
@@ -305,5 +326,13 @@ class CarController(CarControllerBase):
new_actuators.steeringAngleDeg = self.last_angle
new_actuators.accel = self.accel
if self.CP.flags & ToyotaFlags.LOCK_CTRL.value:
if not self.doors_locked and CS.out.gearShifter == DRIVE and CS.out.vEgo >= LOCK_SPEED:
can_sends.append(CanData(LOCK_UNLOCK_CAN_ID, LOCK_CMD, 0))
self.doors_locked = True
elif self.doors_locked and CS.out.gearShifter == PARK:
can_sends.append(CanData(LOCK_UNLOCK_CAN_ID, UNLOCK_CMD, 0))
self.doors_locked = False
self.frame += 1
return new_actuators, can_sends
+49 -5
View File
@@ -52,12 +52,32 @@ class CarState(CarStateBase):
self.gvc = 0.0
self.secoc_synchronization = None
# radar filter (mainly for CHR/Camry)
# the idea is to place a Panda in between Radar and camera/body (engine room) to block 0x343 (longitudinal)
# depends on the firmware, we should be able to read most CAN directly from cp (not cp_cam, its empty)
self.dp_radar_filter = bool(self.CP.flags & ToyotaFlags.RADAR_FILTER.value)
from opendbc.car.toyota.sdsu import SDSU
self.sdsu = SDSU(CP.flags)
# rick - dsu_bypass from cydia2020: https://github.com/cydia2020/toyota-dsu-reroute-harness/
# the idea is to "re-route" the DSU to Panda CAN2 (Which connects to ADAS Camera)
# * when comma device is not available, the DSU message can still communicate with ADAS Camera, and over to car.
# * when comma device is active, CAN message of DSU and ADAS camera will the be blocked by Panda, only forward some CAN messages over to car (from CAN0).
self.dp_dsu_bypass = self.CP.flags & ToyotaFlags.DSU_BYPASS.value
from opendbc.car.toyota.zss import ZSS
self.zss = ZSS(CP.flags)
def update(self, can_parsers) -> structs.CarState:
cp = can_parsers[Bus.pt]
cp_cam = can_parsers[Bus.cam]
ret = structs.CarState()
cp_acc = cp_cam if self.CP.carFingerprint in (TSS2_CAR - RADAR_ACC_CAR) else cp
if self.dp_dsu_bypass:
cp_acc = cp_cam
else:
cp_acc = cp_cam if self.CP.carFingerprint in (TSS2_CAR - RADAR_ACC_CAR) else cp
if not self.CP.flags & ToyotaFlags.SECOC.value:
self.gvc = cp.vl["VSC1S07"]["GVC"]
@@ -77,7 +97,7 @@ class CarState(CarStateBase):
else:
ret.gasPressed = cp.vl["PCM_CRUISE"]["GAS_RELEASED"] == 0 # TODO: these also have GAS_PEDAL, come back and unify
can_gear = int(cp.vl["GEAR_PACKET"]["GEAR"])
if not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value or self.dp_radar_filter:
ret.stockAeb = bool(cp_acc.vl["PRE_COLLISION"]["PRECOLLISION_ACTIVE"] and cp_acc.vl["PRE_COLLISION"]["FORCE"] < -1e-5)
self.parse_wheel_speeds(ret,
@@ -116,6 +136,10 @@ class CarState(CarStateBase):
# we could use the override bit from dbc, but it's triggered at too high torque values
ret.steeringPressed = abs(ret.steeringTorque) > STEER_THRESHOLD
if self.zss.enabled:
self.zss.set_values(can_parsers[Bus.zss])
ret.steeringAngleDeg = self.zss.get_steering_angle_deg(cp.vl["PCM_CRUISE"]["CRUISE_ACTIVE"], ret.steeringAngleDeg)
# Check EPS LKA/LTA fault status
ret.steerFaultTemporary = cp.vl["EPS_STATUS"]["LKA_STATE"] in TEMP_STEER_FAULTS
ret.steerFaultPermanent = cp.vl["EPS_STATUS"]["LKA_STATE"] in PERM_STEER_FAULTS
@@ -146,8 +170,10 @@ class CarState(CarStateBase):
conversion_factor = CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS
ret.cruiseState.speedCluster = cluster_set_speed * conversion_factor
if self.CP.carFingerprint in TSS2_CAR and not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
self.acc_type = cp_acc.vl["ACC_CONTROL"]["ACC_TYPE"]
if (self.CP.carFingerprint in TSS2_CAR and not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value) or \
self.dp_dsu_bypass:
if not (self.CP.flags & ToyotaFlags.SDSU.value):
self.acc_type = cp_acc.vl["ACC_CONTROL"]["ACC_TYPE"]
ret.stockFcw = bool(cp_acc.vl["PCS_HUD"]["FCW"])
# some TSS2 cars have low speed lockout permanently set, so ignore on those cars
@@ -195,6 +221,16 @@ class CarState(CarStateBase):
buttonEvents += create_button_events(self.distance_button, prev_distance_button, {1: ButtonType.gapAdjustCruise})
elif self.sdsu.enabled:
# The follow distance button signal as forwarded by the sdsu
self.sdsu.update_states(can_parsers[Bus.sdsu])
prev_distance_button = self.distance_button
self.distance_button = self.sdsu.dist_btn
buttonEvents += create_button_events(self.distance_button, prev_distance_button, {1: ButtonType.gapAdjustCruise})
ret.buttonEvents = buttonEvents
# dp - ALKA: Toyota requires main ON to use ACC/LKA, use main as switch
@@ -208,7 +244,15 @@ class CarState(CarStateBase):
("BLINKERS_STATE", float('nan')),
]
return {
parsers = {
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, 0),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], 2),
}
if CP.flags & ToyotaFlags.SDSU:
parsers[Bus.sdsu] = CANParser("toyota_sdsu", [("SDSU", 100)], 0)
if CP.flags & ToyotaFlags.ZSS:
parsers[Bus.zss] = CANParser("toyota_zss", [("SECONDARY_STEER_ANGLE", float('nan'))], 0)
return parsers
+58 -5
View File
@@ -3,7 +3,7 @@ from opendbc.car.toyota.carstate import CarState
from opendbc.car.toyota.carcontroller import CarController
from opendbc.car.toyota.radar_interface import RadarInterface
from opendbc.car.toyota.values import Ecu, CAR, DBC, ToyotaFlags, CarControllerParams, TSS2_CAR, RADAR_ACC_CAR, MIN_ACC_SPEED, \
EPS_SCALE, ANGLE_CONTROL_CAR, ToyotaSafetyFlags, UNSUPPORTED_DSU_CAR
EPS_SCALE, ANGLE_CONTROL_CAR, ToyotaSafetyFlags, UNSUPPORTED_DSU_CAR, NO_DSU_CAR
from opendbc.car.disable_ecu import disable_ecu
from opendbc.car.interfaces import CarInterfaceBase
@@ -60,13 +60,32 @@ class CarInterface(CarInterfaceBase):
if Ecu.hybrid in found_ecus:
ret.flags |= ToyotaFlags.HYBRID.value
# 0x343 should not be present on bus 2 on cars other than TSS2_CAR unless we are re-routing DSU
dsu_bypass = False
if (0x343 in fingerprint[2] or 0x4CB in fingerprint[2]) and candidate not in TSS2_CAR:
print("----------------------------------------------")
print("dragonpilot: DSU_BYPASS detected!")
print("----------------------------------------------")
# rick - disable for now, breaks TOYOTA_AVALON_2019 model tests.
# dsu_bypass = True
# ret.flags |= ToyotaFlags.DSU_BYPASS.value
if 0x23 in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: ZSS detected!")
print("----------------------------------------------")
ret.flags |= ToyotaFlags.ZSS.value
if candidate == CAR.TOYOTA_PRIUS:
stop_and_go = True
# Only give steer angle deadzone to for bad angle sensor prius
for fw in car_fw:
if fw.ecu == "eps" and not fw.fwVersion == b'8965B47060\x00\x00\x00\x00\x00\x00':
ret.steerActuatorDelay = 0.25
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning, steering_angle_deadzone_deg=0.2)
if ret.flags & ToyotaFlags.ZSS.value:
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
else:
ret.steerActuatorDelay = 0.25
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning, steering_angle_deadzone_deg=0.2)
elif candidate in (CAR.LEXUS_RX, CAR.LEXUS_RX_TSS2):
stop_and_go = True
@@ -96,13 +115,40 @@ class CarInterface(CarInterfaceBase):
if alpha_long:
ret.flags |= ToyotaFlags.DISABLE_RADAR.value
# RADAR_ACC_CAR = CHR TSS2 / RAV4 TSS2
# NO_DSU_CAR = CAMRY / CHR
if 0x2FF in fingerprint[0] or 0x2AA in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: RADAR_FILTER detected!")
print("----------------------------------------------")
ret.alphaLongitudinalAvailable = False
ret.flags |= ToyotaFlags.RADAR_FILTER.value | ToyotaFlags.DISABLE_RADAR.value
sdsu_active = False
if not (candidate in (RADAR_ACC_CAR | NO_DSU_CAR)) and 0x2FF in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: SDSU detected!")
print("----------------------------------------------")
sdsu_active = True
stop_and_go = True
ret.flags |= ToyotaFlags.SDSU.value
ret.alphaLongitudinalAvailable = False
# openpilot longitudinal enabled by default:
# - TSS2 cars with camera sending ACC_CONTROL where we can block it
# openpilot longitudinal behind alpha long toggle:
# - TSS2 radar ACC cars (disables radar)
ret.openpilotLongitudinalControl = (candidate in (TSS2_CAR - RADAR_ACC_CAR) or
bool(ret.flags & ToyotaFlags.DISABLE_RADAR.value))
bool(ret.flags & ToyotaFlags.DISABLE_RADAR.value) or \
sdsu_active or \
dsu_bypass)
if dp_params & structs.DPFlags.ToyotaStockLon:
ret.openpilotLongitudinalControl = False
ret.alphaLongitudinalAvailable = False
ret.autoResumeSng = ret.openpilotLongitudinalControl
@@ -124,12 +170,19 @@ class CarInterface(CarInterfaceBase):
if ret.flags & ToyotaFlags.HYBRID.value:
ret.longitudinalActuatorDelay = 0.05
if dp_params & structs.DPFlags.ToyotaLockCtrl:
ret.flags |= ToyotaFlags.LOCK_CTRL.value
ret.safetyConfigs[0].safetyParam |= ToyotaSafetyFlags.LOCK_CTRL.value
if dp_params & structs.DPFlags.ToyotaTSS1SnG:
ret.flags |= ToyotaFlags.TSS1_SNG.value
return ret
@staticmethod
def init(CP, can_recv, can_send, communication_control=None):
# disable radar if alpha longitudinal toggled on radar-ACC car
if CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if not CP.flags & ToyotaFlags.RADAR_FILTER.value and CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if communication_control is None:
communication_control = bytes([uds.SERVICE_TYPE.COMMUNICATION_CONTROL, uds.CONTROL_TYPE.ENABLE_RX_DISABLE_TX, uds.MESSAGE_TYPE.NORMAL])
disable_ecu(can_recv, can_send, bus=0, addr=0x750, sub_addr=0xf, com_cont_req=communication_control)
+18
View File
@@ -0,0 +1,18 @@
# Copyright (c) 2025, Rick Lan
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, and/or sublicense, for non-commercial purposes only, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
# Commercial use (e.g. use in a product, service, or activity intended to generate revenue) is prohibited without explicit written permission from the copyright holder.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from opendbc.car.toyota.values import ToyotaFlags
from opendbc.can.parser import CANParser
class SDSU:
def __init__(self, flags: int):
self.enabled = flags & ToyotaFlags.SDSU.value
self.dist_btn = 0
def update_states(self, cp: CANParser):
self.dist_btn = cp.vl["SDSU"]["FD_BUTTON"]
@@ -57,11 +57,14 @@ class ToyotaSafetyFlags(IntFlag):
LTA = (4 << 8)
SECOC = (8 << 8)
UNSUPPORTED_DSU = (16 << 8) # dp - use DSU_CRUISE (0x365) for ACC main instead of PCM_CRUISE_2 (0x1D3)
LOCK_CTRL = (32 << 8)
class ToyotaFlags(IntFlag):
# Detected flags
HYBRID = 1
# use legacy id
SDSU = 2
DISABLE_RADAR = 4
# Static flags
@@ -81,6 +84,11 @@ class ToyotaFlags(IntFlag):
# no resume button press required
NO_STOP_TIMER_DEPRECATED = 256
LOCK_CTRL = 2 ** 13
TSS1_SNG = 2 ** 14
RADAR_FILTER = 2 ** 15
DSU_BYPASS = 2 ** 16
ZSS = 2 ** 17
def dbc_dict(pt, radar):
return {Bus.pt: pt, Bus.radar: radar}
+81
View File
@@ -0,0 +1,81 @@
from opendbc.car.toyota.values import ToyotaFlags
from opendbc.can.parser import CANParser
ANGLE_DIFF_THRESHOLD = 4.0
THRESHOLD_COUNT = 10
class ZSS:
def __init__(self, flags: int):
self.enabled = flags & ToyotaFlags.ZSS.value
# self._alka_enabled = flags & ToyotaFlags.ALKA.value
self._offset_compute_once = True
# self._alka_active_prev = False
self._cruise_active_prev = False
self._offset = 0.
self._threshold_count = 0
self._zss_value = 0.
self._print_allow = True
def set_values(self, cp: CANParser):
self._zss_value = cp.vl["SECONDARY_STEER_ANGLE"]["ZORRO_STEER"]
def get_enabled(self):
return self.enabled
def get_steering_angle_deg(self, cruise_active, stock_steering_angle_deg):
# off, fall back to stock
if not self.enabled:
return stock_steering_angle_deg
# when lka control is off, use stock
# alka_active = self._is_alka_active(main_on)
if not cruise_active: # and not alka_active:
return stock_steering_angle_deg
# lka just activated
# if not self._offset_compute_once and ((alka_active and not self._alka_active_prev) or (cruise_active and not self._cruise_active_prev)):
if not self._offset_compute_once and (cruise_active and not self._cruise_active_prev):
self._threshold_count = 0
self._offset_compute_once = True
self._print_allow = True
# self._alka_active_prev = alka_active
self._cruise_active_prev = cruise_active
# compute offset when required
if self._offset_compute_once:
self._offset_compute_once = not self._compute_offset(stock_steering_angle_deg)
# error checking
if self._threshold_count >= THRESHOLD_COUNT:
if self._print_allow:
print("ZSS: Too many large diff, fallback to stock.")
self._print_allow = False
return stock_steering_angle_deg
if self._offset_compute_once:
print("ZSS: Compute offset required, fallback to stock.")
return stock_steering_angle_deg
zss_steering_angle_deg = self._zss_value - self._offset
angle_diff = abs(stock_steering_angle_deg - zss_steering_angle_deg)
if angle_diff > ANGLE_DIFF_THRESHOLD:
print(f"ZSS: Diff too large ({angle_diff}), fallback to stock. ")
# if self._is_alka_active(main_on) or cruise_active:
if cruise_active:
self._threshold_count += 1
return stock_steering_angle_deg
return zss_steering_angle_deg
# def _is_alka_active(self, main_on):
# return self._alka_enabled and main_on != 0
def _compute_offset(self, steering_angle_deg):
if abs(steering_angle_deg) > 1e-3 and abs(self._zss_value) > 1e-3:
self._offset = self._zss_value - steering_angle_deg
print(f"ZSS: offset computed: {self._offset}")
return True
return False
@@ -31,6 +31,8 @@ class CarController(CarControllerBase):
self.eps_timer_soft_disable_alert = False
self.hca_frame_timer_running = 0
self.hca_frame_same_torque = 0
self._dp_vag_a0_sng = bool(self.CP.flags & VolkswagenFlags.A0SnG)
self.dp_avoid_eps_lockout = CP.flags & VolkswagenFlags.AVOID_EPS_LOCKOUT
def update(self, CC, CS, now_nanos):
actuators = CC.actuators
@@ -50,7 +52,13 @@ class CarController(CarControllerBase):
# of HCA disabled; this is done whenever output happens to be zero.
if CC.latActive:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
if self.dp_avoid_eps_lockout:
#根據速度縮放new_torque扭力上限
torque_scale = np.interp(CS.out.vEgo, [0.4, 3.5, 4.0], [0.8, 0.95, 1.0])
scaled_steer_max = self.CCP.STEER_MAX * torque_scale
new_torque = int(round(actuators.torque * scaled_steer_max))
else:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP)
self.hca_frame_timer_running += self.CCP.STEER_STEP
if self.apply_torque_last == apply_torque:
@@ -119,11 +127,22 @@ class CarController(CarControllerBase):
lead_distance, hud_control.leadDistanceBars))
# **** Stock ACC Button Controls **************************************** #
if self._dp_vag_a0_sng:
if self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last:
standing_resume_spam = CS.out.standstill
spam_window = self.frame % 50 < 15
gra_send_ready = self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last
if gra_send_ready and (CC.cruiseControl.cancel or CC.cruiseControl.resume):
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=CC.cruiseControl.cancel, resume=CC.cruiseControl.resume))
send_cancel = CC.cruiseControl.cancel
send_resume = CC.cruiseControl.resume or (standing_resume_spam and spam_window)
if send_cancel or send_resume:
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=send_cancel, resume=send_resume))
else:
gra_send_ready = self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last
if gra_send_ready and (CC.cruiseControl.cancel or CC.cruiseControl.resume):
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=CC.cruiseControl.cancel, resume=CC.cruiseControl.resume))
new_actuators = actuators.as_builder()
new_actuators.torque = self.apply_torque_last / self.CCP.STEER_MAX
@@ -31,7 +31,13 @@ class CarInterface(CarInterfaceBase):
else:
ret.networkLocation = NetworkLocation.fwdCamera
ret.dashcamOnly = is_release # Release support needs HCA timeout fix, safety validation
# The PQ port is in dashcam-only mode due to a fixed six-minute maximum timer on HCA steering. An unsupported
# EPS flash update to work around this timer, and enable steering down to zero, is available from:
# https://github.com/pd0wm/pq-flasher
# It is documented in a four-part blog series:
# https://blog.willemmelching.nl/carhacking/2022/01/02/vw-part1/
# Panda ALLOW_DEBUG firmware required.
# ret.dashcamOnly = is_release # Release support needs HCA timeout fix, safety validation
elif ret.flags & VolkswagenFlags.MLB:
# Set global MLB parameters
@@ -102,4 +108,10 @@ class CarInterface(CarInterfaceBase):
safety_configs.insert(0, get_safety_config(structs.CarParams.SafetyModel.noOutput))
ret.safetyConfigs = safety_configs
if dp_params & structs.DPFlags.VagA0SnG:
ret.flags |= VolkswagenFlags.A0SnG.value
if dp_params & structs.DPFlags.VagAvoidEPSLockout:
ret.flags |= VolkswagenFlags.AVOID_EPS_LOCKOUT.value
return ret
@@ -191,6 +191,8 @@ class VolkswagenFlags(IntFlag):
PQ = 2
MLB = 8
A0SnG = 2 ** 10
AVOID_EPS_LOCKOUT = 2 ** 11
@dataclass
class VolkswagenMLBPlatformConfig(PlatformConfig):
+8
View File
@@ -0,0 +1,8 @@
BO_ 767 SDSU: 8 XXX
SG_ FD_BUTTON : 7|1@0+ (1,0) [0|1] "" XXX
SG_ STATE : 23|4@0+ (1,0) [0|15] "" XXX
CM_ "BO_ SDSU: The sDSU is a modified DSU for use in TSS-P Toyotas. Learn more: https://github.com/RetroPilot/ocelot/tree/main/firmware/smart_dsu";
CM_ SG_ 767 FD_BUTTON "The follow distance button signal as forwarded by the sdsu";
VAL_ 767 STATE 7 "STATE_AEB_CTRL" 6 "FAULT_INVALID" 5 "FAULT_TIMEOUT" 4 "FAULT_STARTUP" 3 "FAULT_SCE" 2 "FAULT_SEND" 1 "FAULT_BAD_CHECKSUM" 0 "NO_FAULT" ;
+4
View File
@@ -0,0 +1,4 @@
BO_ 35 SECONDARY_STEER_ANGLE: 8 XXX
SG_ ZORRO_STEER : 7|24@0- (0.004901594652,0) [-500|500] "" XXX
CM_ "BO_ SECONDARY_STEER_ANGLE: ZSS is a high-precision steering angle sensor that can replace the lower resolution sensor in most TSS1 Toyotas. Learn more: https://github.com/commaai/openpilot/wiki/Toyota-Lexus#zorro-steering-sensor-zss";
+13 -2
View File
@@ -32,6 +32,7 @@ static bool honda_fwd_brake = false;
static bool honda_bosch_long = false;
static bool honda_bosch_radarless = false;
static bool honda_bosch_canfd = false;
static bool honda_nidec_stock_long = false;
typedef enum {HONDA_NIDEC, HONDA_BOSCH} HondaHw;
static HondaHw honda_hw = HONDA_NIDEC;
@@ -279,6 +280,10 @@ static bool honda_tx_hook(const CANPacket_t *msg) {
static safety_config honda_nidec_init(uint16_t param) {
alka_allowed = true; // dp - ALKA enabled for Honda Nidec
// Steering-only (stock long)
static CanMsg HONDA_N_STOCK_LONG_TX_MSGS[] = {{0xE4, 0, 5, .check_relay = true}, {0x194, 0, 4, .check_relay = true},
{0x33D, 0, 5, .check_relay = true}};
// 0x1FA is dynamically forwarded based on stock AEB
// 0xE4 is steering on all cars except CRV and RDX, 0x194 for CRV and RDX,
// 0x1FA is brake control, 0x30C is acc hud, 0x33D is lkas hud
@@ -286,6 +291,7 @@ static safety_config honda_nidec_init(uint16_t param) {
{0x30C, 0, 8, .check_relay = true}, {0x33D, 0, 5, .check_relay = true}};
const uint16_t HONDA_PARAM_NIDEC_ALT = 4;
const uint16_t HONDA_PARAM_NIDEC_STOCK_LONG = 32; // matches python HondaSafetyFlags.NIDEC_STOCK_LONG
honda_hw = HONDA_NIDEC;
honda_brake = 0;
@@ -295,6 +301,7 @@ static safety_config honda_nidec_init(uint16_t param) {
honda_bosch_long = false;
honda_bosch_radarless = false;
honda_bosch_canfd = false;
honda_nidec_stock_long = GET_FLAG(param, HONDA_PARAM_NIDEC_STOCK_LONG);
safety_config ret;
@@ -318,7 +325,11 @@ static safety_config honda_nidec_init(uint16_t param) {
SET_RX_CHECKS(honda_nidec_common_rx_checks, ret);
}
SET_TX_MSGS(HONDA_N_TX_MSGS, ret);
if (honda_nidec_stock_long) {
SET_TX_MSGS(HONDA_N_STOCK_LONG_TX_MSGS, ret);
} else {
SET_TX_MSGS(HONDA_N_TX_MSGS, ret);
}
return ret;
}
@@ -418,7 +429,7 @@ static bool honda_nidec_fwd_hook(int bus_num, int addr) {
if (bus_num == 2) {
// forwarded if stock AEB is active
bool is_brake_msg = addr == 0x1FA;
block_msg = is_brake_msg && !honda_fwd_brake;
block_msg = is_brake_msg && !honda_fwd_brake && !honda_nidec_stock_long;
}
return block_msg;
+13 -1
View File
@@ -72,6 +72,8 @@
static bool subaru_gen2 = false;
static bool subaru_longitudinal = false;
// rick
static bool subaru_impreza_2018 = false;
static uint32_t subaru_get_checksum(const CANPacket_t *msg) {
return (uint8_t)msg->data[0];
@@ -135,6 +137,8 @@ static void subaru_rx_hook(const CANPacket_t *msg) {
static bool subaru_tx_hook(const CANPacket_t *msg) {
const TorqueSteeringLimits SUBARU_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(2047, 50, 70);
const TorqueSteeringLimits SUBARU_GEN2_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(1000, 40, 40);
// rick
const TorqueSteeringLimits SUBARU_IMPREZA_2018_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(3071, 60, 60);
const LongitudinalLimits SUBARU_LONG_LIMITS = {
.min_gas = 808, // appears to be engine braking
@@ -156,7 +160,11 @@ static bool subaru_tx_hook(const CANPacket_t *msg) {
bool steer_req = (msg->data[3] >> 5) & 1U;
const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS : SUBARU_STEERING_LIMITS;
// const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS : SUBARU_STEERING_LIMITS;
// rick
const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS :
(subaru_impreza_2018 ? SUBARU_IMPREZA_2018_STEERING_LIMITS: SUBARU_STEERING_LIMITS);
violation |= steer_torque_cmd_checks(desired_torque, steer_req, limits);
}
@@ -239,6 +247,10 @@ static safety_config subaru_init(uint16_t param) {
subaru_gen2 = GET_FLAG(param, SUBARU_PARAM_GEN2);
// rick
const uint16_t SUBARU_PARAM_IMPREZA_2018 = 8;
subaru_impreza_2018 = GET_FLAG(param, SUBARU_PARAM_IMPREZA_2018);
#ifdef ALLOW_DEBUG
const uint16_t SUBARU_PARAM_LONGITUDINAL = 2;
subaru_longitudinal = GET_FLAG(param, SUBARU_PARAM_LONGITUDINAL);
+24 -1
View File
@@ -65,6 +65,9 @@ static int toyota_dbc_eps_torque_factor = 100; // conversion factor for STEER_
// dp - use DSU_CRUISE (0x365) for ACC main instead of PCM_CRUISE_2 (0x1D3)
static bool toyota_unsupported_dsu = false;
// lock ctrl
static bool toyota_lock_ctrl = false;
static uint32_t toyota_compute_checksum(const CANPacket_t *msg) {
int len = GET_LEN(msg);
uint8_t checksum = (uint8_t)(msg->addr) + (uint8_t)((unsigned int)(msg->addr) >> 8U) + (uint8_t)(len);
@@ -330,7 +333,8 @@ static bool toyota_tx_hook(const CANPacket_t *msg) {
}
// UDS: Only tester present ("\x0F\x02\x3E\x00\x00\x00\x00\x00") allowed on diagnostics address
if (msg->addr == 0x750U) {
// dp - lock_ctrl bypasses this restriction to allow any UDS message
if ((msg->addr == 0x750U) && !toyota_lock_ctrl) {
// this address is sub-addressed. only allow tester present to radar (0xF)
bool invalid_uds_msg = (GET_BYTES(msg, 0, 4) != 0x003E020FU) || (GET_BYTES(msg, 4, 4) != 0x0U);
if (invalid_uds_msg) {
@@ -377,11 +381,16 @@ static safety_config toyota_init(uint16_t param) {
const uint32_t TOYOTA_PARAM_UNSUPPORTED_DSU = 16UL << TOYOTA_PARAM_OFFSET;
toyota_unsupported_dsu = GET_FLAG(param, TOYOTA_PARAM_UNSUPPORTED_DSU);
// lock ctrl
const uint32_t TOYOTA_PARAM_LOCK_CTRL = 32UL << TOYOTA_PARAM_OFFSET;
toyota_lock_ctrl = GET_FLAG(param, TOYOTA_PARAM_LOCK_CTRL);
toyota_alt_brake = GET_FLAG(param, TOYOTA_PARAM_ALT_BRAKE);
toyota_stock_longitudinal = GET_FLAG(param, TOYOTA_PARAM_STOCK_LONGITUDINAL);
toyota_lta = GET_FLAG(param, TOYOTA_PARAM_LTA);
toyota_dbc_eps_torque_factor = param & TOYOTA_EPS_FACTOR;
// upstream TX selection + dp features via tx_ext (lock_ctrl, long_filter)
safety_config ret;
if (toyota_secoc) {
if (toyota_stock_longitudinal) {
@@ -428,6 +437,19 @@ static safety_config toyota_init(uint16_t param) {
return ret;
}
// dp - tx_ext hook for dragonpilot-specific TX messages (lock_ctrl)
static TxExtResult toyota_tx_ext_hook(const CANPacket_t *msg) {
TxExtResult result = {.allowed = false, .check_relay = false};
int len = GET_LEN(msg);
// dp - lock_ctrl: allow any UDS message on 0x750 (stock long already has tester present only)
if ((msg->addr == 0x750U) && toyota_lock_ctrl && (msg->bus == 0U) && (len == 8)) {
result.allowed = true;
}
return result;
}
// dp - rx_ext hook for optional messages (placeholder)
static void toyota_rx_ext_hook(const CANPacket_t *msg) {
if (alka_allowed && ((alternative_experience & ALT_EXP_ALKA) != 0)) {
@@ -452,6 +474,7 @@ const safety_hooks toyota_hooks = {
.rx = toyota_rx_hook,
.rx_ext = toyota_rx_ext_hook,
.tx = toyota_tx_hook,
.tx_ext = toyota_tx_ext_hook,
.get_checksum = toyota_get_checksum,
.compute_checksum = toyota_compute_checksum,
.get_quality_flag_valid = toyota_get_quality_flag_valid,
@@ -338,6 +338,47 @@ class TestHondaNidecPcmSafety(HondaPcmEnableBase, TestHondaNidecSafetyBase):
pass
class TestHondaNidecStockLongSafety(HondaPcmEnableBase, TestHondaNidecSafetyBase):
"""
Covers Honda Nidec with stock longitudinal control
"""
TX_MSGS = [[0xE4, 0], [0x194, 0], [0x33D, 0]]
FWD_BLACKLISTED_ADDRS = {2: [0xE4, 0x194, 0x33D]}
RELAY_MALFUNCTION_ADDRS = {0: (0xE4, 0x194, 0x33D)}
def setUp(self):
self.packer = CANPackerSafety("honda_civic_touring_2016_can_generated")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.hondaNidec, HondaSafetyFlags.NIDEC_STOCK_LONG)
self.safety.init_tests()
# Nidec doesn't disengage on falling edge of cruise
def test_disable_control_allowed_from_cruise(self):
pass
# No-op longitudinal tests (stock handles it)
def test_brake_safety_check(self):
pass
def test_acc_hud_safety_check(self):
pass
def test_fwd_hook(self):
pass
def test_honda_fwd_brake_latching(self):
pass
def _send_brake_msg(self, brake, aeb_req=0, bus=0):
pass
def _rx_brake_msg(self, brake, aeb_req=0):
pass
def _send_acc_hud_msg(self, pcm_gas, pcm_speed):
pass
class TestHondaNidecPcmAltSafety(TestHondaNidecPcmSafety):
"""
Covers the Honda Nidec safety mode with alt SCM messages
@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Tests for the dp Honda Nidec stock-longitudinal feature.
When `dp_honda_nidec_stock_long` is set:
- python: honda/interface.py flips `openpilotLongitudinalControl` to False and
sets the `HondaSafetyFlags.NIDEC_STOCK_LONG` (= 32) safety param.
- panda: honda.h reads `HONDA_PARAM_NIDEC_STOCK_LONG` and switches the TX
whitelist from HONDA_N_TX_MSGS to HONDA_N_STOCK_LONG_TX_MSGS, which
drops brake (0x1FA) and ACC HUD (0x30C).
These tests exercise the panda side: with the flag set, brake/ACC-hud TX must
be rejected unconditionally; steering + LKAS hud remain allowed. All other
Nidec PCM safety behaviour (RX checks, button enable, etc.) is inherited
verbatim from TestHondaNidecPcmSafety.
"""
import unittest
import numpy as np
from opendbc.car.honda.values import HondaSafetyFlags
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
import opendbc.safety.tests.common as common
from opendbc.safety.tests.common import CANPackerSafety
from opendbc.safety.tests.test_honda import TestHondaNidecPcmSafety
class TestHondaNidecPcmStockLongSafety(TestHondaNidecPcmSafety):
"""Honda Nidec lateral-only mode: stock ACC owns gas/brake."""
TX_MSGS = [[0xE4, 0], [0x194, 0], [0x33D, 0]]
FWD_BLACKLISTED_ADDRS = {2: [0xE4, 0x194, 0x33D]}
RELAY_MALFUNCTION_ADDRS = {0: (0xE4, 0x194, 0x33D)}
def setUp(self):
self.packer = CANPackerSafety("honda_civic_touring_2016_can_generated")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.hondaNidec, HondaSafetyFlags.NIDEC_STOCK_LONG)
self.safety.init_tests()
def test_brake_safety_check(self):
# Stock ACC owns longitudinal — every brake TX must be rejected.
for brake in np.arange(0, self.MAX_BRAKE + 10, 1):
for controls_allowed in (True, False):
self.safety.set_controls_allowed(controls_allowed)
self.assertFalse(
self._tx(self._send_brake_msg(brake)),
f"brake={brake} controls_allowed={controls_allowed} should be rejected",
)
def test_acc_hud_safety_check(self):
# Stock ACC owns longitudinal — every ACC HUD TX must be rejected.
for controls_allowed in (True, False):
self.safety.set_controls_allowed(controls_allowed)
for pcm_gas in (0, 100, self.MAX_GAS):
for pcm_speed in (0, 50, 100):
self.assertFalse(
self._tx(self._send_acc_hud_msg(pcm_gas, pcm_speed)),
f"acc_hud gas={pcm_gas} speed={pcm_speed} controls_allowed={controls_allowed} should be rejected",
)
def test_honda_fwd_brake_latching(self):
# AEB-forwarding logic doesn't apply: openpilot never TX's its own brake to compare against.
pass
def test_fwd_hook(self):
# Base Nidec test mutates FWD_BLACKLISTED_ADDRS to include 0x1FA/0x30C, which are
# not in our TX list. Run the simpler common.CarSafetyTest version directly.
common.CarSafetyTest.test_fwd_hook(self)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""
Tests for tx_ext hook functionality.
tx_ext allows additional TX messages beyond the base whitelist, with per-message
check_relay control. This is used for dragonpilot-specific features:
- lock_ctrl: allows any UDS message on 0x750 (not just tester present)
"""
import unittest
from opendbc.car.toyota.values import ToyotaSafetyFlags
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
from opendbc.safety.tests.common import CANPackerSafety
class TestTxExtBase(unittest.TestCase):
"""Base test class for tx_ext functionality."""
TX_MSGS = [] # Required by test_tx_hook_on_wrong_safety_mode
safety: libsafety_py.LibSafety
packer: CANPackerSafety
@classmethod
def setUpClass(cls):
if cls.__name__ == "TestTxExtBase":
raise unittest.SkipTest("Base class")
def _tx(self, msg):
return self.safety.safety_tx_hook(msg)
def _rx(self, msg):
return self.safety.safety_rx_hook(msg)
class TestTxExtToyotaLockCtrl(TestTxExtBase):
"""Test lock_ctrl feature for Toyota.
lock_ctrl allows any UDS message on 0x750, not just tester present.
This is used for door lock control via diagnostic messages.
"""
EPS_SCALE = 73
def setUp(self):
self.packer = CANPackerSafety("toyota_nodsu_pt_generated")
self.safety = libsafety_py.libsafety
def _uds_msg(self, data: bytes):
"""Create a UDS message on 0x750."""
return libsafety_py.make_CANPacket(0x750, 0, data)
def _tester_present_msg(self):
"""Valid tester present message (always allowed without lock_ctrl)."""
return self._uds_msg(b"\x0F\x02\x3E\x00\x00\x00\x00\x00")
def _lock_ctrl_msg(self):
"""Example lock control message (only allowed with lock_ctrl)."""
return self._uds_msg(b"\x0F\x03\x30\x01\x00\x00\x00\x00")
def _random_uds_msg(self):
"""Random UDS message (only allowed with lock_ctrl)."""
return self._uds_msg(b"\x0F\x05\xAA\xBB\xCC\xDD\x00\x00")
def test_lock_ctrl_disabled_only_tester_present(self):
"""Without lock_ctrl, only tester present is allowed on 0x750."""
# OP long mode (0x750 in whitelist), no lock_ctrl
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, self.EPS_SCALE)
self.safety.init_tests()
# Tester present should be allowed
self.assertTrue(self._tx(self._tester_present_msg()))
# Other UDS messages should be blocked
self.assertFalse(self._tx(self._lock_ctrl_msg()))
self.assertFalse(self._tx(self._random_uds_msg()))
def test_lock_ctrl_enabled_allows_any_uds(self):
"""With lock_ctrl, any UDS message is allowed on 0x750."""
# OP long mode with lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# All UDS messages should be allowed
self.assertTrue(self._tx(self._tester_present_msg()))
self.assertTrue(self._tx(self._lock_ctrl_msg()))
self.assertTrue(self._tx(self._random_uds_msg()))
def test_lock_ctrl_stock_long_allows_uds(self):
"""With lock_ctrl + stock_long, 0x750 is allowed via tx_ext."""
# Stock long mode (0x750 NOT in base whitelist) with lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# All UDS messages should be allowed via tx_ext
self.assertTrue(self._tx(self._tester_present_msg()))
self.assertTrue(self._tx(self._lock_ctrl_msg()))
self.assertTrue(self._tx(self._random_uds_msg()))
def test_lock_ctrl_stock_long_without_flag_blocks_uds(self):
"""Without lock_ctrl, stock_long blocks all 0x750 messages."""
# Stock long mode without lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# 0x750 is not in stock long whitelist, should be blocked
self.assertFalse(self._tx(self._tester_present_msg()))
self.assertFalse(self._tx(self._lock_ctrl_msg()))
class TestTxExtRelayCheck(TestTxExtBase):
"""Test check_relay behavior for tx_ext messages.
Messages allowed via tx_ext can have check_relay=true or false.
- lock_ctrl 0x750: check_relay=false (no relay check)
"""
EPS_SCALE = 73
def setUp(self):
self.packer = CANPackerSafety("toyota_nodsu_pt_generated")
self.safety = libsafety_py.libsafety
def _uds_msg_raw(self):
"""Create raw UDS message for relay testing."""
return libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
def _wait_for_relay_timeout(self):
"""Wait for relay transition timeout (1 second = 100 ticks at 100Hz)."""
for _ in range(200):
self.safety.safety_tick_current_safety_config()
def test_lock_ctrl_no_relay_malfunction(self):
"""lock_ctrl 0x750 should not trigger relay malfunction when received."""
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
self._wait_for_relay_timeout()
# Receiving 0x750 should NOT trigger relay malfunction
# (check_relay=false for lock_ctrl)
self.assertFalse(self.safety.get_relay_malfunction())
self._rx(self._uds_msg_raw())
self.assertFalse(self.safety.get_relay_malfunction())
class TestTxExtDisabledBrands(unittest.TestCase):
"""Test that tx_ext returns not allowed for brands without tx_ext hook."""
TX_MSGS = [] # Required by test_tx_hook_on_wrong_safety_mode
def test_hyundai_no_tx_ext(self):
"""Hyundai should not have any tx_ext allowances."""
safety = libsafety_py.libsafety
safety.set_safety_hooks(CarParams.SafetyModel.hyundai, 0)
safety.init_tests()
# Random message should not be allowed via tx_ext
msg = libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
self.assertFalse(safety.safety_tx_hook(msg))
def test_honda_no_tx_ext(self):
"""Honda should not have any tx_ext allowances."""
safety = libsafety_py.libsafety
safety.set_safety_hooks(CarParams.SafetyModel.hondaNidec, 0)
safety.init_tests()
# Random message should not be allowed via tx_ext
msg = libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
self.assertFalse(safety.safety_tx_hook(msg))
if __name__ == "__main__":
unittest.main()
+18
View File
@@ -107,6 +107,24 @@ class Car:
if self.params.get_bool("dp_lat_alka"):
dp_params |= structs.DPFlags.LatALKA
if self.params.get_bool("dp_toyota_door_auto_lock_unlock"):
dp_params |= structs.DPFlags.ToyotaLockCtrl
if self.params.get_bool("dp_toyota_tss1_sng"):
dp_params |= structs.DPFlags.ToyotaTSS1SnG
if self.params.get_bool("dp_toyota_stock_lon"):
dp_params |= structs.DPFlags.ToyotaStockLon
if self.params.get_bool("dp_vag_a0_sng"):
dp_params |= structs.DPFlags.VagA0SnG
if self.params.get_bool("dp_vag_avoid_eps_lockout"):
dp_params |= structs.DPFlags.VagAvoidEPSLockout
if self.params.get_bool("dp_honda_nidec_stock_long"):
dp_params |= structs.DPFlags.HondaNidecStockLong
dp_fingerprint = str(self.params.get("dp_dev_model_selected") or "")
# Defend against stale/garbage values (e.g. "0" persisted from older versions).
if dp_fingerprint and dp_fingerprint not in interfaces: