Volkswagen ID.4 lateral (#3366)

* VW ID.4 port

* VW MEB: safety vehicle model + tests

* safety tests

* cleanup

* fix test

* fix

* safety

* use curvature

* no pid

* clean long

* longitudinal

* add fcw

* allowance=80

* add carstate signals

* typo

* inline power

* add power tests

* fix mutation test

* update route

* flags

* merge master

* camera harness

* add blinker controls

* add tests

* rm rx

* fix ruff

* fix

* revert: blinker controls

* brake fault

* Update opendbc/car/volkswagen/interface.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* Update opendbc/car/volkswagen/carstate.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* curvature limits

* rm stopAccel

* add pid

* starting

* pid correction

* apply ford limits

* only startaccel

* curvature std limits

* dedup tesla

* add ford safety

* integrator

* misra

* use lat accel

* ruff

* add ford limit fields

* tests

* simple ford curvature

* add test

* reduce

* lower jerk

* limit winddown

* lateral only

* Update opendbc/car/ford/carcontroller.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* comment

* unify max lat accel

* add test

* fix

* cleanup tests

* fix

* Revert "limit winddown"

This reverts commit 289e610f09.

* add back rt

* update lines

* update lines

* add is_zero from meb

* Revert "add is_zero from meb"

This reverts commit 1c97e6f658.

* note

* rm

* use curv=0

* no ford

* use power

* split from long

* fix

* comments

* cleanup

* fix

* comment

* reorder tests

* config struct

* direct config

* Revert "direct config"

This reverts commit 66b1ef5d9a.

* align

* fix

* Update opendbc/car/lateral.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* steerLimit

* inactive

* change to rt jerk value

* add rolling window

* group to struct

* rename new limit method

* more

* add second speed source

* rm long acc_main_on

* align curvature to actual

* Update opendbc/safety/modes/volkswagen_meb.h

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* Update opendbc/safety/modes/volkswagen_meb.h

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* allow debug

* redundant speed in cmd checks

* undeprecate curvature

* fix test

* whitelist cancel

* mutations

* set steeringCurvature

* Revert "set steeringCurvature"

This reverts commit 9c44a92519.

* gap

* KLR_01 frequency

* comment

* add tuning

* comments

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
Daniel Koepping
2026-06-05 20:40:30 -07:00
committed by GitHub
parent 7343ffe79d
commit 4cd4b8cc18
21 changed files with 951 additions and 29 deletions

View File

@@ -384,6 +384,7 @@
|Volkswagen|Golf R 2015-19|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)|
|Volkswagen|Golf SportsVan 2015-20|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)|
|Volkswagen|Grand California 2019-24|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)|
|Volkswagen|ID.4 2021-23|Adaptive Cruise Control (ACC) & Lane Assist|[Dashcam mode](#dashcam)|
|Volkswagen|Jetta 2015-18|Adaptive Cruise Control (ACC) & Lane Assist|[Dashcam mode](#dashcam)|
|Volkswagen|Jetta 2019-23|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)|
|Volkswagen|Jetta GLI 2021-23|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)|

View File

@@ -277,6 +277,7 @@ routes = [
CarTestRoute("6719965b0e1d1737/2023-08-29--06-40-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid, openpilot longitudinal, radar disabled
CarTestRoute("14623aae37e549f3/2021-10-24--01-20-49", TOYOTA.TOYOTA_PRIUS_V),
CarTestRoute("aebd8f1d4ea16066/00000009--b31e222338", VOLKSWAGEN.VOLKSWAGEN_ID4_MK1),
CarTestRoute("202c40641158a6e5/2021-09-21--09-43-24", VOLKSWAGEN.VOLKSWAGEN_ARTEON_MK1),
CarTestRoute("2c68dda277d887ac/2021-05-11--15-22-20", VOLKSWAGEN.VOLKSWAGEN_ATLAS_MK1),
CarTestRoute("ffcd23abbbd02219/2024-02-28--14-59-38", VOLKSWAGEN.VOLKSWAGEN_CADDY_MK3),

View File

@@ -82,7 +82,7 @@ def _make_car_test(car_name):
assert len(car_params.longitudinalTuning.kiV) == len(car_params.longitudinalTuning.kiBP)
# Lateral sanity checks
if car_params.steerControlType != structs.CarParams.SteerControlType.angle:
if car_params.steerControlType not in (structs.CarParams.SteerControlType.angle, structs.CarParams.SteerControlType.curvature):
tune = car_params.lateralTuning
if tune.which() == 'pid':
if car_name != MOCK.MOCK:

View File

@@ -24,6 +24,7 @@ legend = ["LAT_ACCEL_FACTOR", "MAX_LAT_ACCEL_MEASURED", "FRICTION"]
"TESLA_MODEL_X" = [nan, 2.5, nan]
# Guess
"VOLKSWAGEN_ID4_MK1" = [nan, 2.5, nan]
"FORD_BRONCO_SPORT_MK1" = [nan, 1.5, nan]
"FORD_ESCAPE_MK4" = [nan, 1.5, nan]
"FORD_ESCAPE_MK4_5" = [nan, 1.5, nan]

View File

@@ -4,7 +4,7 @@ from opendbc.car import Bus, DT_CTRL, structs
from opendbc.car.lateral import apply_driver_steer_torque_limits
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.interfaces import CarControllerBase
from opendbc.car.volkswagen import mlbcan, mqbcan, pqcan
from opendbc.car.volkswagen import mebcan, mlbcan, mqbcan, pqcan
from opendbc.car.volkswagen.values import CanBus, CarControllerParams, VolkswagenFlags
VisualAlert = structs.CarControl.HUDControl.VisualAlert
@@ -45,10 +45,14 @@ class CarController(CarControllerBase):
self.CCS = pqcan
elif CP.flags & VolkswagenFlags.MLB:
self.CCS = mlbcan
elif CP.flags & VolkswagenFlags.MEB:
self.CCS = mebcan
else:
self.CCS = mqbcan
self.apply_torque_last = 0
self.apply_curvature_last = 0.
self.steering_power_last = 0
self.gra_acc_counter_last = None
self.hca_mitigation = HCAMitigation(self.CCP)
@@ -61,14 +65,49 @@ class CarController(CarControllerBase):
if self.frame % self.CCP.STEER_STEP == 0:
apply_torque = 0
if CC.latActive:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP)
if self.CP.flags & VolkswagenFlags.MEB:
# Logic to avoid HCA refused state:
# * steering power as counter and near zero before OP lane assist deactivation
# MEB rack can be used continuously without time limits
# maximum real steering angle change ~ 120-130 deg/s
apply_torque = self.hca_mitigation.update(apply_torque, self.apply_torque_last)
hca_enabled = apply_torque != 0
self.apply_torque_last = apply_torque
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled))
if CC.latActive:
hca_enabled = True
# compensate the gap between measured and current curvature
apply_curvature = actuators.curvature + (CS.curvature_meas - CC.currentCurvature)
apply_curvature = self.CCP.CURVATURE_LIMITS.apply_limits(apply_curvature, self.apply_curvature_last, CS.out.vEgoRaw, CS.curvature_meas,
CC.latActive, self.CCP.STEER_STEP)
min_power = max(self.steering_power_last - self.CCP.STEERING_POWER_STEP, self.CCP.STEERING_POWER_MIN)
max_power = min(self.steering_power_last + self.CCP.STEERING_POWER_STEP, self.CCP.STEERING_POWER_MAX)
target_power_driver = int(np.interp(CS.out.steeringTorque, [self.CCP.STEER_DRIVER_ALLOWANCE, self.CCP.STEER_DRIVER_MAX],
[self.CCP.STEERING_POWER_MAX, self.CCP.STEERING_POWER_MIN]))
target_power = int(np.interp(CS.out.vEgo, [0., 0.5], [self.CCP.STEERING_POWER_MIN, target_power_driver]))
steering_power = min(max(target_power, min_power), max_power)
else:
if self.steering_power_last > 0: # keep HCA alive until steering power has reduced to zero
hca_enabled = True
apply_curvature = float(np.clip(CS.curvature_meas, -self.CCP.CURVATURE_MAX, self.CCP.CURVATURE_MAX))
steering_power = max(self.steering_power_last - self.CCP.STEERING_POWER_STEP, 0)
else:
hca_enabled = False
apply_curvature = 0. # inactive curvature
steering_power = 0
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_curvature, hca_enabled, steering_power))
self.apply_curvature_last = apply_curvature
self.steering_power_last = steering_power
else:
if CC.latActive:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP)
apply_torque = self.hca_mitigation.update(apply_torque, self.apply_torque_last)
hca_enabled = apply_torque != 0
self.apply_torque_last = apply_torque
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled))
if self.CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT:
# Pacify VW Emergency Assist driver inactivity detection by changing its view of driver steering input torque
@@ -79,6 +118,13 @@ class CarController(CarControllerBase):
ea_simulated_torque = CS.out.steeringTorque
can_sends.append(self.CCS.create_eps_update(self.packer_pt, self.CAN.cam, CS.eps_stock_values, ea_simulated_torque))
# Emergency Assist intervention
if self.CP.flags & VolkswagenFlags.MEB and self.CP.flags & VolkswagenFlags.STOCK_KLR_PRESENT:
# send capacitive steering wheel hands-on message to keep ACC resume active and control Emergency Assist
if self.frame % self.CCP.KLR_01_STEP == 0:
can_sends.append(mebcan.create_capacitive_wheel_touch(self.packer_pt, self.CAN.cam, CC.latActive, CS.klr_stock_values))
can_sends.append(mebcan.create_capacitive_wheel_touch(self.packer_pt, self.CAN.pt, CC.latActive, CS.klr_stock_values))
# **** Acceleration Controls ******************************************** #
if self.CP.openpilotLongitudinalControl:
@@ -126,6 +172,7 @@ class CarController(CarControllerBase):
new_actuators = actuators.as_builder()
new_actuators.torque = self.apply_torque_last / self.CCP.STEER_MAX
new_actuators.torqueOutputCan = self.apply_torque_last
new_actuators.curvature = self.apply_curvature_last
self.gra_acc_counter_last = CS.gra_stock_values["COUNTER"]
self.frame += 1

View File

@@ -13,12 +13,15 @@ class CarState(CarStateBase):
super().__init__(CP)
self.frame = 0
self.eps_init_complete = False
self.cruise_recovery_timer = 0
self.CCP = CarControllerParams(CP)
self.button_states = {button.event_type: False for button in self.CCP.BUTTONS}
self.esp_hold_confirmation = False
self.upscale_lead_car_signal = False
self.eps_stock_values = False
self.acc_type = 0
self.travel_assist_available = False
self.curvature_meas = 0.
def update_button_enable(self, buttonEvents: list[structs.CarState.ButtonEvent]):
if not self.CP.pcmCruise:
@@ -52,6 +55,8 @@ class CarState(CarStateBase):
return self.update_pq(pt_cp, cam_cp, ext_cp)
elif self.CP.flags & VolkswagenFlags.MLB:
return self.update_mlb(pt_cp, cam_cp, ext_cp, alt_cp)
elif self.CP.flags & VolkswagenFlags.MEB:
return self.update_meb(pt_cp, cam_cp, ext_cp)
ret = structs.CarState()
@@ -229,6 +234,90 @@ class CarState(CarStateBase):
self.frame += 1
return ret
def update_meb(self, pt_cp, cam_cp, ext_cp) -> structs.CarState:
ret = structs.CarState()
self.parse_wheel_speeds(ret,
pt_cp.vl["ESC_51"]["VL_Radgeschw"],
pt_cp.vl["ESC_51"]["VR_Radgeschw"],
pt_cp.vl["ESC_51"]["HL_Radgeschw"],
pt_cp.vl["ESC_51"]["HR_Radgeschw"],
)
if self.CP.flags & VolkswagenFlags.KOMBI_PRESENT:
ret.vEgoCluster = pt_cp.vl["Kombi_01"]["KBI_angez_Geschw"] * CV.KPH_TO_MS
ret.standstill = ret.vEgoRaw == 0
# Update EPS position and state info. For signed values, VW sends the sign in a separate signal.
ret.steeringAngleDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradwinkel"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradwinkel"])]
ret.steeringRateDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradw_Geschw"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradw_Geschw"])]
ret.steeringTorque = pt_cp.vl["LH_EPS_03"]["EPS_Lenkmoment"] * (1, -1)[int(pt_cp.vl["LH_EPS_03"]["EPS_VZ_Lenkmoment"])]
ret.steeringPressed = self.update_steering_pressed(abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE, 5)
self.curvature_meas = -pt_cp.vl["QFK_01"]["Curvature"] * (1, -1)[int(pt_cp.vl["QFK_01"]["Curvature_VZ"])]
ret.yawRate = -pt_cp.vl["ESC_50"]["Yaw_Rate"] * (1, -1)[int(pt_cp.vl["ESC_50"]["Yaw_Rate_Sign"])] * CV.DEG_TO_RAD
if self.CP.flags & VolkswagenFlags.ALT_GEAR:
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Gateway_73"]["GE_Fahrstufe"], None))
else:
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Getriebe_11"]["GE_Fahrstufe"], None))
drive_mode = ret.gearShifter == GearShifter.drive
hca_status = self.CCP.hca_status_values.get(pt_cp.vl["QFK_01"]["LatCon_HCA_Status"])
ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status, drive_mode)
ret.carFaultedNonCritical = cam_cp.vl["EA_01"]["EA_Funktionsstatus"] in (3, 4, 5, 6)
ret.gasPressed = pt_cp.vl["Motor_51"]["Accel_Pedal_Pressure"] > 0
ret.brakePressed = bool(pt_cp.vl["Motor_14"]["MO_Fahrer_bremst"])
ret.parkingBrake = pt_cp.vl["ESC_50"]["EPB_Status"] in (1, 4) # EPB closing or closed (candidate for all platforms)
#ret.parkingBrake = pt_cp.vl["Gateway_73"]["EPB_Status"] in (1, 4) # this signal is not working for newer models
ret.seatbeltUnlatched = pt_cp.vl["Airbag_02"]["AB_Gurtschloss_FA"] != 3
doors = pt_cp.vl["ZV_02"] if bool(pt_cp.vl["Gateway_72"]["ZV_02_alt"]) else pt_cp.vl["Gateway_72"]
ret.doorOpen = any([doors["ZV_FT_offen"],
doors["ZV_BT_offen"],
doors["ZV_HFS_offen"],
doors["ZV_HBFS_offen"],
doors["ZV_HD_offen"]])
ret.espDisabled = bool(pt_cp.vl["ESP_21"]["ESP_Tastung_passiv"])
ret.espActive = bool(pt_cp.vl["ESP_21"]["ESP_Eingriff"])
self.acc_type = ext_cp.vl["ACC_18"]["ACC_Typ"]
self.esp_hold_confirmation = bool(pt_cp.vl["ESC_50"]["Standstill"])
self.travel_assist_available = bool(cam_cp.vl["TA_01"]["Travel_Assist_Available"])
ret.stockFcw = bool(ext_cp.vl["AWV_03"]["FCW_Active"])
ret.cruiseState.available = pt_cp.vl["Motor_51"]["TSK_Status"] in (2, 3, 4, 5)
ret.cruiseState.enabled = pt_cp.vl["Motor_51"]["TSK_Status"] in (3, 4, 5)
ret.cruiseState.standstill = self.CP.pcmCruise and self.esp_hold_confirmation
if self.CP.pcmCruise:
ret.cruiseState.nonAdaptive = bool(ext_cp.vl["MEB_ACC_01"]["ACC_Limiter_Mode"])
ret.cruiseState.speed = ext_cp.vl["MEB_ACC_01"]["ACC_Wunschgeschw_02"] * CV.KPH_TO_MS
if ret.cruiseState.speed > 90: # 255 kph in m/s == no current setpoint
ret.cruiseState.speed = 0
else:
ret.cruiseState.nonAdaptive = bool(pt_cp.vl["Motor_51"]["TSK_Limiter_ausgewaehlt"])
accFaulted = pt_cp.vl["Motor_51"]["TSK_Status"] in (6, 7)
ret.accFaulted = self.update_acc_fault(accFaulted, parking_brake=ret.parkingBrake, drive_mode=drive_mode)
ret.leftBlinker, ret.rightBlinker = self.update_blinker_from_stalk(240, pt_cp.vl["SMLS_01"]["BH_Blinker_li"],
pt_cp.vl["SMLS_01"]["BH_Blinker_re"])
if self.CP.enableBsm:
ret.leftBlindspot = (bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Info_Left"]) or
bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Warn_Left"]))
ret.rightBlindspot = (bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Info_Right"]) or
bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Warn_Right"]))
self.eps_stock_values = pt_cp.vl["LH_EPS_03"]
self.ldw_stock_values = cam_cp.vl["LDW_02"] if self.CP.networkLocation == NetworkLocation.fwdCamera else {}
self.gra_stock_values = pt_cp.vl["GRA_ACC_01"]
self.klr_stock_values = pt_cp.vl["KLR_01"] if self.CP.flags & VolkswagenFlags.STOCK_KLR_PRESENT else {}
ret.buttonEvents = self.create_button_events(pt_cp, self.CCP.BUTTONS)
ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo)
self.frame += 1
return ret
def update_mlb(self, pt_cp, cam_cp, ext_cp, alt_cp) -> structs.CarState:
ret = structs.CarState()
@@ -309,10 +398,24 @@ class CarState(CarStateBase):
temp_fault = drive_mode and hca_status in ("REJECTED", "PREEMPTED") or not self.eps_init_complete
return temp_fault, perm_fault
def update_acc_fault(self, acc_fault, parking_brake=False, drive_mode=True, recovery_frames_max=100):
# Ignore FAULT when not in drive mode and parked
# do not show misleading error during ignition in parked state
# grant a short time to recover a normal cruise state
fault = acc_fault
if parking_brake and not drive_mode:
fault = False
self.cruise_recovery_timer = self.frame
elif self.frame - self.cruise_recovery_timer < recovery_frames_max:
fault = False
return fault
@staticmethod
def get_can_parsers(CP):
if CP.flags & VolkswagenFlags.PQ:
return CarState.get_can_parsers_pq(CP)
elif CP.flags & VolkswagenFlags.MEB:
return CarState.get_can_parsers_meb(CP)
# manually configure some optional and variable-rate/edge-triggered messages
pt_messages, cam_messages, alt_messages = [], [], []
@@ -339,3 +442,23 @@ class CarState(CarStateBase):
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).cam),
Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).alt),
}
@staticmethod
def get_can_parsers_meb(CP):
pt_messages = [
# frequency changes too much for the CANParser to figure out
("Blinkmodi_02", 1), # From J519 BCM (sent at 1Hz when no lights active, 50Hz when active)
("SMLS_01", 1), # From Stalk Controls
]
if CP.networkLocation == NetworkLocation.fwdCamera:
pt_messages.append(("AWV_03", 1)) # Front Collision Detection (1 Hz when inactive, 50 Hz when active)
cam_messages = []
if CP.networkLocation == NetworkLocation.gateway:
cam_messages.append(("AWV_03", 1)) # Front Collision Detection (1 Hz when inactive, 50 Hz when active)
return {
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, CanBus(CP).pt),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], cam_messages, CanBus(CP).cam),
Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).alt),
}

View File

@@ -366,6 +366,18 @@ FW_VERSIONS = {
b'\xf1\x875Q0907572S \xf1\x890780',
],
},
CAR.VOLKSWAGEN_ID4_MK1: {
(Ecu.srs, 0x715, None): [
b'\xf1\x871EA959655EA\xf1\x890376',
b'\xf1\x875WA959655R \xf1\x890717',
],
(Ecu.eps, 0x712, None): [
b'\xf1\x871EA907144AQ\xf1\x895033\xf1\x82\x000_BH0A0_ON',
],
(Ecu.fwdRadar, 0x757, None): [
b'\xf1\x871EA907572H \xf1\x890234',
],
},
CAR.VOLKSWAGEN_JETTA_MK6: {
(Ecu.srs, 0x715, None): [
b'\xf1\x875C0959655M \xf1\x890726\xf1\x82\t00NB1108--------24',

View File

@@ -40,6 +40,37 @@ class CarInterface(CarInterfaceBase):
ret.networkLocation = NetworkLocation.gateway
ret.dashcamOnly = is_release # Release support needs HCA timeout fix, safety validation, revised J533 harness
elif ret.flags & VolkswagenFlags.MEB:
# Set global MEB parameters
safety_configs = [get_safety_config(structs.CarParams.SafetyModel.volkswagenMeb)]
ret.transmissionType = TransmissionType.direct
ret.steerControlType = structs.CarParams.SteerControlType.curvature
ret.steerAtStandstill = True
ret.lateralTuning.init('pid')
ret.lateralTuning.pid.kpBP = [10., 40.]
ret.lateralTuning.pid.kpV = [0., 1.45]
ret.lateralTuning.pid.kiBP = [10., 40.]
ret.lateralTuning.pid.kiV = [0., 0.12]
ret.lateralTuning.pid.kf = 1.
if any(msg in fingerprint[1] for msg in (0x520, 0x86, 0xFD, 0x13D)): # Airbag_02, LWI_01, ESP_21, QFK_01
ret.networkLocation = NetworkLocation.gateway
else:
ret.networkLocation = NetworkLocation.fwdCamera
ret.enableBsm = 0x24C in fingerprint[0] # MEB_Side_Assist_01
if 0x25D in fingerprint[0]: # KLR_01
ret.flags |= VolkswagenFlags.STOCK_KLR_PRESENT.value
if 0x3DC in fingerprint[0]: # Gateway_73
ret.flags |= VolkswagenFlags.ALT_GEAR.value
ret.radarUnavailable = True
# only allow gateway harness for now
ret.dashcamOnly = is_release or ret.networkLocation == NetworkLocation.fwdCamera
else:
# Set global MQB parameters
safety_configs = [get_safety_config(structs.CarParams.SafetyModel.volkswagen)]
@@ -68,6 +99,9 @@ class CarInterface(CarInterfaceBase):
if ret.flags & VolkswagenFlags.PQ or ret.flags & VolkswagenFlags.MLB:
ret.steerActuatorDelay = 0.2
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
elif ret.flags & VolkswagenFlags.MEB:
ret.steerActuatorDelay = 0.3
ret.steerLimitTimer = 0.1
else:
ret.steerActuatorDelay = 0.1
ret.lateralTuning.pid.kpBP = [0.]
@@ -77,9 +111,9 @@ class CarInterface(CarInterfaceBase):
ret.lateralTuning.pid.kiV = [0.2]
# Global longitudinal tuning defaults, can be overridden per-vehicle
ret.alphaLongitudinalAvailable = ret.networkLocation == NetworkLocation.gateway or docs
if alpha_long:
ret.alphaLongitudinalAvailable = not (ret.flags & VolkswagenFlags.MEB) and \
(ret.networkLocation == NetworkLocation.gateway or docs)
if alpha_long and not (ret.flags & VolkswagenFlags.MEB):
# Proof-of-concept, prep for E2E only. No radar points available. Panda ALLOW_DEBUG firmware required.
ret.openpilotLongitudinalControl = True
safety_configs[0].safetyParam |= VolkswagenSafetyFlags.LONG_CONTROL.value
@@ -92,9 +126,10 @@ class CarInterface(CarInterfaceBase):
ret.steerActuatorDelay = 0.07
ret.pcmCruise = not ret.openpilotLongitudinalControl
ret.stopAccel = -0.55
ret.vEgoStarting = 0.1
ret.vEgoStopping = 0.5
if not (ret.flags & VolkswagenFlags.MEB):
ret.stopAccel = -0.55
ret.vEgoStopping = 0.5
ret.vEgoStarting = 0.1
ret.autoResumeSng = ret.minEnableSpeed == -1
CAN = CanBus(fingerprint=fingerprint)

View File

@@ -0,0 +1,93 @@
def create_steering_control(packer, bus, apply_curvature, lkas_enabled, power=0):
values = {
"Curvature": abs(apply_curvature), # in rad/m
"Curvature_VZ": 1 if apply_curvature > 0 and lkas_enabled else 0,
"Power": power if lkas_enabled else 0,
"RequestStatus": 4 if lkas_enabled else 2,
"HighSendRate": lkas_enabled,
}
return packer.make_can_msg("HCA_03", bus, values)
def create_eps_update(packer, bus, eps_stock_values, ea_simulated_torque):
values = {s: eps_stock_values[s] for s in [
"COUNTER", # Sync counter value to EPS output
"EPS_Lenkungstyp", # EPS rack type
"EPS_Berechneter_LW", # Absolute raw steering angle
"EPS_VZ_BLW", # Raw steering angle sign
"EPS_HCA_Status", # EPS HCA control status
]}
values.update({
# Absolute driver torque input and sign, with EA inactivity mitigation
"EPS_Lenkmoment": abs(ea_simulated_torque),
"EPS_VZ_Lenkmoment": 1 if ea_simulated_torque < 0 else 0,
})
return packer.make_can_msg("LH_EPS_03", bus, values)
def create_lka_hud_control(packer, bus, ldw_stock_values, lat_active, steering_pressed, hud_alert, hud_control, sound_alert=False):
display_mode = 1 if lat_active else 0 # travel assist style showing yellow lanes when op is active
values = {}
if len(ldw_stock_values):
values = {s: ldw_stock_values[s] for s in [
"LDW_SW_Warnung_links", # Blind spot in warning mode on left side due to lane departure
"LDW_SW_Warnung_rechts", # Blind spot in warning mode on right side due to lane departure
"LDW_Seite_DLCTLC", # Direction of most likely lane departure (left or right)
"LDW_DLC", # Lane departure, distance to line crossing
"LDW_TLC", # Lane departure, time to line crossing
]}
values.update({
"LDW_Gong": sound_alert,
"LDW_Status_LED_gelb": 1 if lat_active and steering_pressed else 0,
"LDW_Status_LED_gruen": 1 if lat_active and not steering_pressed else 0,
"LDW_Lernmodus_links": 3 + display_mode if hud_control.leftLaneDepart else 1 + hud_control.leftLaneVisible + display_mode,
"LDW_Lernmodus_rechts": 3 + display_mode if hud_control.rightLaneDepart else 1 + hud_control.rightLaneVisible + display_mode,
"LDW_Texte": hud_alert,
})
return packer.make_can_msg("LDW_02", bus, values)
def create_acc_buttons_control(packer, bus, gra_stock_values, cancel=False, resume=False, up=False, down=False):
values = {s: gra_stock_values[s] for s in [
"GRA_Hauptschalter", # ACC button, on/off
"GRA_Typ_Hauptschalter", # ACC main button type
"GRA_Codierung", # ACC button configuration/coding
"GRA_Tip_Stufe_2", # unknown related to stalk type
"GRA_ButtonTypeInfo", # unknown related to stalk type
]}
values.update({
"COUNTER": (gra_stock_values["COUNTER"] + 1) % 16,
"GRA_Abbrechen": cancel,
"GRA_Tip_Wiederaufnahme": resume or up,
"GRA_Tip_Setzen": down,
})
return packer.make_can_msg("GRA_ACC_01", bus, values)
def create_capacitive_wheel_touch(packer, bus, lat_active, klr_stock_values):
# KLR_01: capacitive steering wheel hands-on signal
# Sent by J1158
# Consumed by R242 (camera) for lane assist hands on and J428 (ACC / Travel Assist / Emergency Assist)
# for resume and EA control
values = {s: klr_stock_values[s] for s in [
"COUNTER",
"KLR_Touchintensitaet_1",
"KLR_Touchintensitaet_2",
"KLR_Touchintensitaet_3",
"KLR_Touchauswertung",
]}
if lat_active:
values.update({
"COUNTER": (klr_stock_values["COUNTER"] + 1) % 16,
"KLR_Touchintensitaet_1": 80,
"KLR_Touchintensitaet_2": 200,
"KLR_Touchintensitaet_3": 10,
"KLR_Touchauswertung": 10,
})
return packer.make_can_msg("KLR_01", bus, values)

View File

@@ -243,6 +243,8 @@ VOLKSWAGEN_MQB_MEB_CONSTANTS: dict[int, list[int]] = {
0xC5, 0x91, 0x0F, 0x27, 0x34, 0x04, 0x7F, 0x02], # EA_02
0x20A: [0x9D, 0xE8, 0x36, 0xA1, 0xCA, 0x3B, 0x1D, 0x33,
0xE0, 0xD5, 0xBB, 0x5F, 0xAE, 0x3C, 0x31, 0x9F], # EML_06
0x25D: [0xDA, 0x6B, 0x0E, 0xB2, 0x78, 0xBD, 0x5A, 0x81,
0x7B, 0xD6, 0x41, 0x39, 0x76, 0xB6, 0xD7, 0x35], # KLR_01
0x26B: [0xCE, 0xCC, 0xBD, 0x69, 0xA1, 0x3C, 0x18, 0x76,
0x0F, 0x04, 0xF2, 0x3A, 0x93, 0x24, 0x19, 0x51], # TA_01
0x30C: [0x0F] * 16, # ACC_02

View File

@@ -3,6 +3,7 @@ from dataclasses import dataclass, field
from enum import Enum, IntFlag, StrEnum
from opendbc.car import Bus, CanBusBase, CarSpecs, DbcDict, PlatformConfig, Platforms, structs, uds
from opendbc.car.lateral import CurvatureSteeringLimits
from opendbc.can import CANDefine
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.docs_definitions import CarFootnote, CarHarness, CarDocs, CarParts, Column
@@ -99,6 +100,37 @@ class CarControllerParams:
"laneAssistDeactivTrailer": 5, # "Lane Assist: no function with trailer"
}
elif CP.flags & VolkswagenFlags.MEB:
self.LDW_STEP = 10 # LDW_02 message frequency 10Hz
self.ACC_HUD_STEP = 6
self.KLR_01_STEP = 6 # KLR_01 message frequency 17Hz
self.STEER_DRIVER_ALLOWANCE = 100 # Driver torque 1.0 Nm, begin steering reduction from MAX
self.STEER_DRIVER_MAX = 300 # Driver torque 3.0 Nm, stop steering reduction at MIN
self.STEERING_POWER_MAX = 50 # HCA_03 maximum steering power, percentage
self.STEERING_POWER_MIN = 4 # HCA_03 minimum steering power, percentage
self.STEERING_POWER_STEP = 2 # HCA_03 steering power counter steps
self.CURVATURE_MAX = 0.195 # Max curvature for steering command, m^-1
self.CURVATURE_LIMITS = CurvatureSteeringLimits(self.CURVATURE_MAX)
self.shifter_values = can_define.dv["Getriebe_11"]["GE_Fahrstufe"]
self.hca_status_values = can_define.dv["QFK_01"]["LatCon_HCA_Status"]
self.BUTTONS = [
Button(structs.CarState.ButtonEvent.Type.setCruise, "GRA_ACC_01", "GRA_Tip_Setzen", [1]),
Button(structs.CarState.ButtonEvent.Type.resumeCruise, "GRA_ACC_01", "GRA_Tip_Wiederaufnahme", [1]),
Button(structs.CarState.ButtonEvent.Type.accelCruise, "GRA_ACC_01", "GRA_Tip_Hoch", [1]),
Button(structs.CarState.ButtonEvent.Type.decelCruise, "GRA_ACC_01", "GRA_Tip_Runter", [1]),
Button(structs.CarState.ButtonEvent.Type.cancel, "GRA_ACC_01", "GRA_Abbrechen", [1]),
Button(structs.CarState.ButtonEvent.Type.gapAdjustCruise, "GRA_ACC_01", "GRA_Verstellung_Zeitluecke", [3]),
]
self.LDW_MESSAGES = {
"none": 0, # Nothing to display
"laneAssistTakeOverUrgent": 4, # "Lane Assist: Please Take Over Steering" (red)
"laneAssistTakeOver": 8, # "Lane Assist: Please Take Over Steering" (white)
}
else:
self.LDW_STEP = 10 # LDW_02 message frequency 10Hz
self.ACC_HUD_STEP = 6 # ACC_02 message frequency 16Hz
@@ -184,10 +216,13 @@ class VolkswagenFlags(IntFlag):
# Detected flags
STOCK_HCA_PRESENT = 1
KOMBI_PRESENT = 4
ALT_GEAR = 32
STOCK_KLR_PRESENT = 64
# Static flags
PQ = 2
MLB = 8
MEB = 16
@dataclass
@@ -209,6 +244,16 @@ class VolkswagenMQBPlatformConfig(PlatformConfig):
wmis: set[WMI] = field(default_factory=set)
@dataclass
class VolkswagenMEBPlatformConfig(PlatformConfig):
dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_meb'})
chassis_codes: set[str] = field(default_factory=set)
wmis: set[WMI] = field(default_factory=set)
def init(self):
self.flags |= VolkswagenFlags.MEB
@dataclass
class VolkswagenPQPlatformConfig(VolkswagenMQBPlatformConfig):
dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_pq'})
@@ -269,7 +314,7 @@ class VWCarDocs(CarDocs):
# FW_VERSIONS for that existing CAR.
class CAR(Platforms):
config: VolkswagenMQBPlatformConfig | VolkswagenPQPlatformConfig
config: VolkswagenMQBPlatformConfig | VolkswagenPQPlatformConfig | VolkswagenMEBPlatformConfig
VOLKSWAGEN_ARTEON_MK1 = VolkswagenMQBPlatformConfig(
[
@@ -331,6 +376,14 @@ class CAR(Platforms):
chassis_codes={"5G", "AU", "BA", "BE"},
wmis={WMI.VOLKSWAGEN_MEXICO_CAR, WMI.VOLKSWAGEN_EUROPE_CAR},
)
VOLKSWAGEN_ID4_MK1 = VolkswagenMEBPlatformConfig(
[
VWCarDocs("Volkswagen ID.4 2021-23"),
],
VolkswagenCarSpecs(mass=2224, wheelbase=2.77),
chassis_codes={"E2"},
wmis={WMI.VOLKSWAGEN_USA_SUV, WMI.VOLKSWAGEN_EUROPE_CAR, WMI.VOLKSWAGEN_EUROPE_SUV},
)
VOLKSWAGEN_JETTA_MK6 = VolkswagenPQPlatformConfig(
[VWCarDocs("Volkswagen Jetta 2015-18")],
VolkswagenCarSpecs(mass=1518, wheelbase=2.65, minSteerSpeed=50 * CV.KPH_TO_MS, minEnableSpeed=20 * CV.KPH_TO_MS),

View File

@@ -232,6 +232,7 @@ BO_ 258 ESC_50: 48 XXX
SG_ Regen_Braking : 123|1@1+ (1,0) [0|7] "" XXX
SG_ Standstill : 171|1@0+ (1,0) [0|1] "" XXX
SG_ Longitudinal_Speed : 181|10@1+ (0.25,0) [0|255] "Unit_KilometerPerHour" XXX
SG_ EPB_Status : 317|3@1+ (1,0) [0|7] "" XXX
BO_ 261 VMM_01: 8 XXX
SG_ CRC : 0|8@1+ (1,0) [0|255] "" XXX

View File

@@ -56,6 +56,7 @@
} while (0);
#define UPDATE_VEHICLE_SPEED(val_ms) (update_sample(&vehicle_speed, ROUND((val_ms) * VEHICLE_SPEED_FACTOR)))
#define UPDATE_VEHICLE_SPEED_2(val_ms) (update_sample(&vehicle_speed_2, ROUND((val_ms) * VEHICLE_SPEED_FACTOR)))
uint32_t GET_BYTES(const CANPacket_t *msg, int start, int len);
@@ -138,6 +139,8 @@ typedef struct {
const uint32_t frequency; // Hz
const int max_curvature_error; // rad/m * curvature_to_can, max deviation from measured curvature (0 disables)
const float curvature_error_min_speed; // min speed for the curvature error check [m/s]
const int max_steer_power; // max steer power if EPS supports it (0 disables)
const bool inactive_curvature_is_zero; // true resets desired to 0 on violation, false resets to measured curvature
} CurvatureSteeringLimits;
// parameters for lateral accel/jerk angle limiting using a simple vehicle model
@@ -239,7 +242,7 @@ bool steer_torque_cmd_checks(int desired_torque, int steer_req, const TorqueStee
bool steer_angle_cmd_checks(int desired_angle, bool steer_control_enabled, const AngleSteeringLimits limits);
bool steer_angle_cmd_checks_vm(int desired_angle, bool steer_control_enabled, const AngleSteeringLimits limits,
const AngleSteeringParams params);
bool steer_curvature_cmd_checks(int desired_curvature, bool steer_control_enabled, const CurvatureSteeringLimits limits);
bool steer_curvature_cmd_checks(int desired_curvature, int steer_power, bool steer_control_enabled, const CurvatureSteeringLimits limits);
bool longitudinal_accel_checks(int desired_accel, const LongitudinalLimits limits);
bool longitudinal_speed_checks(int desired_speed, const LongitudinalLimits limits);
bool longitudinal_gas_checks(int desired_gas, const LongitudinalLimits limits);
@@ -263,6 +266,7 @@ extern bool steering_disengage;
extern bool steering_disengage_prev;
extern bool cruise_engaged_prev;
extern struct sample_t vehicle_speed;
extern struct sample_t vehicle_speed_2;
extern bool vehicle_moving;
extern bool acc_main_on; // referred to as "ACC off" in ISO 15622:2018
extern int cruise_button_prev;
@@ -294,6 +298,7 @@ typedef struct {
uint32_t rt_msgs;
uint32_t rt_msgs_prev;
uint32_t ts_check_last;
int steer_power_last;
struct sample_t meas; // last 6 steer curvatures
} CurvatureSteeringState;
extern CurvatureSteeringState curvature_state;
@@ -354,6 +359,7 @@ extern const safety_hooks tesla_hooks;
extern const safety_hooks toyota_hooks;
extern const safety_hooks volkswagen_mlb_hooks;
extern const safety_hooks volkswagen_mqb_hooks;
extern const safety_hooks volkswagen_meb_hooks;
extern const safety_hooks volkswagen_pq_hooks;
extern const safety_hooks rivian_hooks;
extern const safety_hooks psa_hooks;

View File

@@ -238,7 +238,7 @@ bool steer_angle_cmd_checks(int desired_angle, bool steer_control_enabled, const
}
// Safety checks for curvature-based steering commands
bool steer_curvature_cmd_checks(int desired_curvature, bool steer_control_enabled, const CurvatureSteeringLimits limits) {
bool steer_curvature_cmd_checks(int desired_curvature, int steer_power, bool steer_control_enabled, const CurvatureSteeringLimits limits) {
// Highway curves are rolled in the direction of the turn, add tolerance to compensate
static const float MAX_LATERAL_ACCEL = ISO_LATERAL_ACCEL + (EARTH_G * AVERAGE_ROAD_ROLL); // ~3.6 m/s^2
// Lower than ISO 11270 lateral jerk limit, which is 5.0 m/s^3
@@ -247,6 +247,8 @@ bool steer_curvature_cmd_checks(int desired_curvature, bool steer_control_enable
const float fudged_speed = SAFETY_MAX((vehicle_speed.min / VEHICLE_SPEED_FACTOR) - 1.0, 1.0);
bool violation = false;
speed_mismatch_check((float)vehicle_speed_2.values[0] / VEHICLE_SPEED_FACTOR);
if (controls_allowed && steer_control_enabled) {
// *** absolute curvature cap ***
violation |= safety_max_limit_check(desired_curvature, limits.max_curvature, -limits.max_curvature);
@@ -282,14 +284,21 @@ bool steer_curvature_cmd_checks(int desired_curvature, bool steer_control_enable
violation |= desired_curvature != 0;
}
// No curvature control allowed when controls are not allowed
if (!controls_allowed) {
violation |= steer_control_enabled;
// *** steer power check ***
// allow power winddown after disengage to prevent EPS fault
if (limits.max_steer_power != 0) {
violation |= safety_max_limit_check(steer_power, limits.max_steer_power, 0);
violation |= (steer_power != 0) && !steer_control_enabled;
violation |= !controls_allowed && (steer_power != 0) && (steer_power >= curvature_state.steer_power_last);
curvature_state.steer_power_last = steer_power;
} else {
// No curvature control allowed when controls are not allowed
violation |= !controls_allowed && steer_control_enabled;
}
// reset to zero if either controls is not allowed or there's a violation
// reset to zero or measured curvature depending on EPS expectation
if (violation || !controls_allowed) {
curvature_state.desired_last = 0;
curvature_state.desired_last = limits.inactive_curvature_is_zero ? 0 : curvature_state.meas.values[0];
}
return violation;

View File

@@ -92,6 +92,8 @@ static const CurvatureSteeringLimits FORD_STEERING_LIMITS = {
.frequency = 20, // Hz
.max_curvature_error = 100, // 0.002 rad/m * curvature_to_can
.curvature_error_min_speed = 10.0, // m/s
.max_steer_power = 0, // disabled, Ford has no steed power signal
.inactive_curvature_is_zero = true, // Ford EPS expects curvature=0 when inactive
};
static void ford_rx_hook(const CANPacket_t *msg) {
@@ -113,7 +115,7 @@ static void ford_rx_hook(const CANPacket_t *msg) {
// Disable controls if speeds from ABS and PCM ECUs are too far apart.
// Signal: Veh_V_ActlEng
float filtered_pcm_speed = ((msg->data[6] << 8) | msg->data[7]) * 0.01 * KPH_TO_MS;
speed_mismatch_check(filtered_pcm_speed);
UPDATE_VEHICLE_SPEED_2(filtered_pcm_speed);
}
// Update vehicle yaw rate
@@ -233,7 +235,7 @@ static bool ford_tx_hook(const CANPacket_t *msg) {
// Check angle error and steer_control_enabled
int desired_curvature = raw_curvature - FORD_INACTIVE_CURVATURE; // /FORD_STEERING_LIMITS.curvature_to_can to get real curvature
violation |= steer_curvature_cmd_checks(desired_curvature, steer_control_enabled, FORD_STEERING_LIMITS);
violation |= steer_curvature_cmd_checks(desired_curvature, 0, steer_control_enabled, FORD_STEERING_LIMITS);
if (violation) {
tx = false;
@@ -254,7 +256,7 @@ static bool ford_tx_hook(const CANPacket_t *msg) {
// Check angle error and steer_control_enabled
int desired_curvature = raw_curvature - FORD_INACTIVE_CURVATURE; // /FORD_STEERING_LIMITS.curvature_to_can to get real curvature
violation |= steer_curvature_cmd_checks(desired_curvature, steer_control_enabled, FORD_STEERING_LIMITS);
violation |= steer_curvature_cmd_checks(desired_curvature, 0, steer_control_enabled, FORD_STEERING_LIMITS);
if (violation) {
tx = false;

View File

@@ -0,0 +1,170 @@
#pragma once
#include "opendbc/safety/declarations.h"
#include "opendbc/safety/modes/volkswagen_common.h"
#define MSG_ESC_51 0xFCU // RX, for wheel speeds
#define MSG_ESP_21 0xFDU // RX, redundant vehicle speed source
#define MSG_HCA_03 0x303U
#define MSG_QFK_01 0x13DU
#define MSG_Motor_51 0x10BU // RX for TSK state and accel pedal
#define MSG_KLR_01 0x25DU // TX, for capacitive steering wheel
static uint32_t volkswagen_meb_compute_crc(const CANPacket_t *msg) {
int len = GET_LEN(msg);
uint8_t crc = 0xFFU;
for (int i = 1; i < len; i++) {
crc ^= (uint8_t)msg->data[i];
crc = volkswagen_crc8_lut_8h2f[crc];
}
uint8_t counter = volkswagen_mqb_meb_get_counter(msg);
if (msg->addr == MSG_LH_EPS_03) {
crc ^= (uint8_t[]){0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5, 0xF5}[counter];
} else if (msg->addr == MSG_GRA_ACC_01) {
crc ^= (uint8_t[]){0x6A, 0x38, 0xB4, 0x27, 0x22, 0xEF, 0xE1, 0xBB, 0xF8, 0x80, 0x84, 0x49, 0xC7, 0x9E, 0x1E, 0x2B}[counter];
} else if (msg->addr == MSG_QFK_01) {
crc ^= (uint8_t[]){0x20, 0xCA, 0x68, 0xD5, 0x1B, 0x31, 0xE2, 0xDA, 0x08, 0x0A, 0xD4, 0xDE, 0x9C, 0xE4, 0x35, 0x5B}[counter];
} else if (msg->addr == MSG_ESC_51) {
crc ^= (uint8_t[]){0x77, 0x5C, 0xA0, 0x89, 0x4B, 0x7C, 0xBB, 0xD6, 0x1F, 0x6C, 0x4F, 0xF6, 0x20, 0x2B, 0x43, 0xDD}[counter];
} else if (msg->addr == MSG_ESP_21) {
crc ^= (uint8_t[]){0xB4, 0xEF, 0xF8, 0x49, 0x1E, 0xE5, 0xC2, 0xC0, 0x97, 0x19, 0x3C, 0xC9, 0xF1, 0x98, 0xD6, 0x61}[counter];
} else if (msg->addr == MSG_Motor_51) {
crc ^= (uint8_t[]){0x77, 0x5C, 0xA0, 0x89, 0x4B, 0x7C, 0xBB, 0xD6, 0x1F, 0x6C, 0x4F, 0xF6, 0x20, 0x2B, 0x43, 0xDD}[counter];
} else {
// Undefined CAN message, CRC check expected to fail
}
crc = volkswagen_crc8_lut_8h2f[crc];
return (uint8_t)(crc ^ 0xFFU);
}
static safety_config volkswagen_meb_init(uint16_t param) {
// Transmit of GRA_ACC_01 is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration
static const CanMsg VOLKSWAGEN_MEB_TX_MSGS[] = {
{MSG_HCA_03, 0, 24, .check_relay = true},
{MSG_GRA_ACC_01, 0, 8, .check_relay = false},
{MSG_GRA_ACC_01, 2, 8, .check_relay = false},
{MSG_LDW_02, 0, 8, .check_relay = true},
{MSG_KLR_01, 0, 8, .check_relay = false},
{MSG_KLR_01, 2, 8, .check_relay = true},
};
static RxCheck volkswagen_meb_rx_checks[] = {
{.msg = {{MSG_LH_EPS_03, 0, 8, 100U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_MOTOR_14, 0, 8, 10U, .ignore_checksum = true, .ignore_counter = true, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_GRA_ACC_01, 0, 8, 33U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_QFK_01, 0, 32, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_Motor_51, 0, 32, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_ESC_51, 0, 48, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_ESP_21, 0, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
};
volkswagen_common_init();
SAFETY_UNUSED(param);
return BUILD_SAFETY_CFG(volkswagen_meb_rx_checks, VOLKSWAGEN_MEB_TX_MSGS);
}
static void volkswagen_meb_rx_hook(const CANPacket_t *msg) {
if (msg->bus == 0U) {
// Update in-motion state by sampling wheel speeds
if (msg->addr == MSG_ESC_51) {
uint32_t fl = msg->data[8] | (msg->data[9] << 8);
uint32_t fr = msg->data[10] | (msg->data[11] << 8);
uint32_t rl = msg->data[12] | (msg->data[13] << 8);
uint32_t rr = msg->data[14] | (msg->data[15] << 8);
vehicle_moving = (fr > 0U) || (rr > 0U) || (rl > 0U) || (fl > 0U);
UPDATE_VEHICLE_SPEED((fr + rr + rl + fl) / 4.0 * 0.0075 * KPH_TO_MS);
}
// Check vehicle speed with redundant source
if (msg->addr == MSG_ESP_21) {
// Signal: ESP_v_Signal
float esp_speed = ((msg->data[5] << 8) | msg->data[4]) * 0.01 * KPH_TO_MS;
UPDATE_VEHICLE_SPEED_2(esp_speed);
}
if (msg->addr == MSG_QFK_01) {
int current_curvature = ((msg->data[6] & 0x7FU) << 8) | msg->data[5];
current_curvature *= GET_BIT(msg, 55U) ? 1 : -1;
update_sample(&curvature_state.meas, current_curvature);
}
if (msg->addr == MSG_LH_EPS_03) {
update_sample(&torque_driver, volkswagen_mlb_mqb_driver_input_torque(msg));
}
if (msg->addr == MSG_Motor_51) {
int acc_status = (msg->data[11] & 0x07U);
bool cruise_engaged = (acc_status == 3) || (acc_status == 4) || (acc_status == 5);
pcm_cruise_check(cruise_engaged);
int accel_pedal_value = ((msg->data[1] >> 4) & 0x0FU) | ((msg->data[2] & 0x1FU) << 4);
gas_pressed = accel_pedal_value > 0;
}
if (msg->addr == MSG_GRA_ACC_01) {
// Always exit controls on rising edge of Cancel
if (GET_BIT(msg, 13U)) {
controls_allowed = false;
}
}
if (msg->addr == MSG_MOTOR_14) {
brake_pressed = GET_BIT(msg, 28U);
}
}
}
static bool volkswagen_meb_tx_hook(const CANPacket_t *msg) {
// MEB is lateral-only; no openpilot longitudinal TX messages are allowed.
bool tx = true;
if (msg->addr == MSG_HCA_03) {
const CurvatureSteeringLimits VOLKSWAGEN_MEB_STEERING_LIMITS = {
.max_curvature = 29105,
.curvature_to_can = 149253.7313f,
.frequency = 50, // Hz
.max_curvature_error = 0, // disabled, MEB doesn't track rack
.curvature_error_min_speed = 0.0, // disabled
.max_steer_power = 125,
.inactive_curvature_is_zero = false, // MEB winds down with measured curvature
};
int desired_curvature_raw = GET_BYTES(msg, 3, 2) & 0x7FFFU;
bool desired_curvature_sign = GET_BIT(msg, 39U);
if (!desired_curvature_sign) {
desired_curvature_raw *= -1;
}
bool steer_req = (((msg->data[1] >> 4) & 0x0FU) == 4U);
int steer_power = msg->data[2];
if (steer_curvature_cmd_checks(desired_curvature_raw, steer_power, steer_req, VOLKSWAGEN_MEB_STEERING_LIMITS)) {
tx = false;
}
}
if ((msg->addr == MSG_GRA_ACC_01) && !controls_allowed) {
// only allow cancel button: bit 13
if (!GET_BIT(msg, 13U)) {
tx = false;
}
}
return tx;
}
const safety_hooks volkswagen_meb_hooks = {
.init = volkswagen_meb_init,
.rx = volkswagen_meb_rx_hook,
.tx = volkswagen_meb_tx_hook,
.get_counter = volkswagen_mqb_meb_get_counter,
.get_checksum = volkswagen_mqb_meb_get_checksum,
.compute_checksum = volkswagen_meb_compute_crc,
};

View File

@@ -23,6 +23,7 @@
#include "opendbc/safety/modes/nissan.h"
#include "opendbc/safety/modes/volkswagen_mlb.h"
#include "opendbc/safety/modes/volkswagen_mqb.h"
#include "opendbc/safety/modes/volkswagen_meb.h"
#include "opendbc/safety/modes/volkswagen_pq.h"
#include "opendbc/safety/modes/elm327.h"
#include "opendbc/safety/modes/body.h"
@@ -53,6 +54,7 @@ bool steering_disengage;
bool steering_disengage_prev;
bool cruise_engaged_prev = false;
struct sample_t vehicle_speed;
struct sample_t vehicle_speed_2;
bool vehicle_moving = false;
bool acc_main_on = false; // referred to as "ACC off" in ISO 15622:2018
int cruise_button_prev = 0;
@@ -415,6 +417,7 @@ int set_safety_hooks(uint16_t mode, uint16_t param) {
{SAFETY_CHRYSLER_CUSW, &chrysler_cusw_hooks},
{SAFETY_PSA, &psa_hooks},
{SAFETY_SUBARU_PREGLOBAL, &subaru_preglobal_hooks},
{SAFETY_VOLKSWAGEN_MEB, &volkswagen_meb_hooks},
{SAFETY_VOLKSWAGEN_MLB, &volkswagen_mlb_hooks},
{SAFETY_VOLKSWAGEN_PQ, &volkswagen_pq_hooks},
{SAFETY_ALLOUTPUT, &alloutput_hooks},
@@ -445,6 +448,7 @@ int set_safety_hooks(uint16_t mode, uint16_t param) {
curvature_state.rt_msgs = 0;
curvature_state.rt_msgs_prev = 0;
curvature_state.ts_check_last = 0;
curvature_state.steer_power_last = 0;
ts_torque_check_last = 0;
ts_steer_req_mismatch_last = 0;
valid_steer_req_count = 0;
@@ -452,6 +456,7 @@ int set_safety_hooks(uint16_t mode, uint16_t param) {
// reset samples
reset_sample(&vehicle_speed);
reset_sample(&vehicle_speed_2);
reset_sample(&torque_meas);
reset_sample(&torque_driver);
reset_sample(&angle_meas);

View File

@@ -9,6 +9,7 @@ from collections.abc import Callable
from opendbc.can import CANPacker
from opendbc.safety import ALTERNATIVE_EXPERIENCE
from opendbc.safety.tests.libsafety import libsafety_py
from opendbc.car.lateral import MAX_LATERAL_ACCEL, MAX_LATERAL_JERK
MAX_WRONG_COUNTERS = 5
MAX_SAMPLE_VALS = 6
@@ -822,6 +823,114 @@ class AngleSteeringSafetyTest(VehicleSpeedSafetyTest):
self.assertTrue(self._tx(self._angle_cmd_msg(0, True, increment_timer=False)))
class CurvatureSteeringSafetyTest(VehicleSpeedSafetyTest):
MAX_CURVATURE: float
MAX_CURVATURE_TEST: float
CURVATURE_TO_CAN: float
SEND_RATE: float
@classmethod
def setUpClass(cls):
if cls.__name__ == "CurvatureSteeringSafetyTest":
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _curvature_cmd_msg(self, curvature: float, steer_req: bool):
pass
@abc.abstractmethod
def _curvature_meas_msg(self, curvature: float):
pass
def _set_prev_desired_curvature(self, curvature: float):
curvature_can = int(round(curvature * self.CURVATURE_TO_CAN))
self.safety.set_desired_curvature_last(curvature_can)
def _reset_curvature_measurement(self, curvature: float):
for _ in range(MAX_SAMPLE_VALS):
self._rx(self._curvature_meas_msg(curvature))
def _reset_speed_measurement(self, speed: float):
for _ in range(MAX_SAMPLE_VALS):
self._rx(self._speed_msg(speed))
self._rx(self._speed_msg_2(speed))
def test_curvature_measurements(self):
self._common_measurement_test(self._curvature_meas_msg, -self.MAX_CURVATURE, self.MAX_CURVATURE, self.CURVATURE_TO_CAN,
self.safety.get_curvature_meas_min, self.safety.get_curvature_meas_max)
def test_curvature_limit(self):
v = 1
for sign in (1, -1):
max_curvature = self.MAX_CURVATURE_TEST * sign
max_curvature_rate = MAX_LATERAL_JERK / v**2
max_curvature_delta = max_curvature_rate * self.SEND_RATE * sign
self._reset_speed_measurement(v)
self.safety.set_controls_allowed(True)
self._set_prev_desired_curvature(max_curvature)
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature - max_curvature_delta, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(max_curvature + max_curvature_delta, True)), f"{v} {max_curvature} {max_curvature_delta}")
def test_iso_accel_limit(self):
speeds = [2., 5., 10., 15., 50.]
for v in speeds:
for sign in (1, -1):
max_curvature = np.clip(((MAX_LATERAL_ACCEL / (v - 1)**2) * sign), -self.MAX_CURVATURE_TEST, self.MAX_CURVATURE_TEST)
max_curvature_rate = MAX_LATERAL_JERK / (v - 1)**2
max_curvature_delta = max_curvature_rate * self.SEND_RATE * sign
self._reset_speed_measurement(v)
self.safety.set_controls_allowed(True)
self._set_prev_desired_curvature(max_curvature)
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature - max_curvature_delta, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature, True)), f"{v} {max_curvature} {max_curvature_delta}")
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(max_curvature + max_curvature_delta, True)), f"{v} {max_curvature} {max_curvature_delta}")
def test_iso_jerk_limit(self):
speeds = [2., 5., 10., 15., 50.]
for v in speeds:
max_curvature_rate = MAX_LATERAL_JERK / (v - 1)**2
max_curvature_delta = max_curvature_rate * self.SEND_RATE
self._reset_speed_measurement(v)
self.safety.set_controls_allowed(True)
self._set_prev_desired_curvature(max_curvature_delta)
self.assertTrue(self._tx(self._curvature_cmd_msg(max_curvature_delta, True)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(0, True)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._tx(self._curvature_cmd_msg(-max_curvature_delta, True)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(max_curvature_delta, True)))
# after violation, prev is reset to 0, going past the jerk limit must fail
self.safety.set_controls_allowed(True)
self.assertFalse(self._tx(self._curvature_cmd_msg(2 * max_curvature_delta, True)))
class SafetyTest(SafetyTestBase):
TX_MSGS: list[list[int]] = []
SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space
@@ -909,7 +1018,7 @@ class SafetyTest(SafetyTestBase):
continue
if {attr, current_test}.issubset({'TestHyundaiLongitudinalSafety', 'TestHyundaiLongitudinalSafetyCameraSCC', 'TestHyundaiSafetyFCEVLong'}):
continue
volkswagen_shared = ('TestVolkswagenMqb', 'TestVolkswagenMlb')
volkswagen_shared = ('TestVolkswagenMqb', 'TestVolkswagenMlb', 'TestVolkswagenMeb')
if attr.startswith(volkswagen_shared) and current_test.startswith(volkswagen_shared):
continue

View File

@@ -621,7 +621,7 @@ def main():
("opendbc/safety/lateral.h", 188, "boundary"),
("opendbc/safety/lateral.h", 212, "boundary"),
("opendbc/safety/lateral.h", 213, "boundary"),
("opendbc/safety/lateral.h", 354, "arithmetic"),
("opendbc/safety/lateral.h", 363, "arithmetic"),
}
survivors = [r for r in survivors if (str(r.site.origin_file.relative_to(ROOT)), r.site.origin_line, r.site.mutator) not in known_survivors]

View File

@@ -105,6 +105,7 @@ class TestFordSafetyBase(common.CarSafetyTest):
def _reset_curvature_measurement(self, curvature, speed):
for _ in range(6):
self._rx(self._speed_msg(speed))
self._rx(self._speed_msg_2(speed))
self._rx(self._yaw_rate_msg(curvature, speed))
# Driver brake pedal
@@ -200,6 +201,18 @@ class TestFordSafetyBase(common.CarSafetyTest):
}
return self.packer.make_can_msg_safety("Steering_Data_FD1", bus, values)
def test_rx_hook_speed_mismatch(self):
for speed in np.arange(0, 40, 0.5):
for speed_delta in np.arange(-5, 5, 0.1):
speed_2 = round(max(speed + speed_delta, 0), 1)
self._rx(self._speed_msg(speed))
self._rx(self._speed_msg_2(speed_2))
self.safety.set_controls_allowed(True)
self._tx(self._lat_ctl_msg(True, 0, 0, 0, 0))
within_delta = abs(speed - speed_2) <= common.MAX_SPEED_DELTA
self.assertEqual(self.safety.get_controls_allowed(), within_delta)
def test_rx_hook(self):
# checksum, counter, and quality flag checks
for quality_flag in [True, False]:

View File

@@ -1,10 +1,250 @@
#!/usr/bin/env python3
import numpy as np
import unittest
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
import opendbc.safety.tests.common as common
from opendbc.safety.tests.common import CANPackerSafety
# MEB message IDs
MSG_LH_EPS_03 = 0x9F
MSG_ESC_51 = 0xFC
MSG_Motor_51 = 0x10B
MSG_GRA_ACC_01 = 0x12B
MSG_QFK_01 = 0x13D
MSG_KLR_01 = 0x25D
MSG_HCA_03 = 0x303
MSG_LDW_02 = 0x397
MSG_MOTOR_14 = 0x3BE
class TestVolkswagenMebSafetyBase(common.CarSafetyTest, common.CurvatureSteeringSafetyTest):
STANDSTILL_THRESHOLD = 0
RELAY_MALFUNCTION_ADDRS = {0: (MSG_HCA_03, MSG_LDW_02), 2: (MSG_KLR_01,)}
MAX_CURVATURE = 29105
MAX_CURVATURE_TEST = 0.195
CURVATURE_TO_CAN = 149253.7313
MAX_POWER = 125
MAX_POWER_TEST = 50
SEND_RATE = 0.02
LATERAL_FREQUENCY = 50 # Hz
cnt_curvature_cmd = 0
def _set_prev_desired_power(self, power: int):
# init with local tx sequence
prev_allowed = self.safety.get_controls_allowed()
self.safety.set_controls_allowed(True)
self._tx(self._curvature_cmd_msg(0, steer_req=True, power=power))
self.safety.set_controls_allowed(prev_allowed)
def test_power_limit(self):
max_power_can = self.MAX_POWER
max_power = self.MAX_POWER_TEST
self._set_prev_desired_power(max_power_can)
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power + 1)))
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power - 1)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=False, power=max_power)))
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=False, power=-max_power)))
def test_power_without_control(self):
max_power_can = self.MAX_POWER
max_power = self.MAX_POWER_TEST
self._set_prev_desired_power(max_power_can)
self.safety.set_controls_allowed(False)
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=False, power=max_power)))
self.assertTrue(self._tx(self._curvature_cmd_msg(0, steer_req=False, power=0)))
self._set_prev_desired_power(max_power - 1)
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=False, power=max_power)))
# When controls are not allowed, steer_req=True is gated only by the power check:
# steady or rising power is disallowed, but a strictly decreasing power (steering
# wind-down at disengagement) is permitted so the EPS torque authority ramps to zero
self._set_prev_desired_power(max_power - 1)
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power))) # rising power
self.assertFalse(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power))) # steady power
self.assertTrue(self._tx(self._curvature_cmd_msg(0, steer_req=True, power=max_power - 1))) # decreasing power
def _speed_msg(self, speed_mps: float):
spd_kph = speed_mps * 3.6
values = {f"{s}_Radgeschw": spd_kph for s in ("VL", "VR", "HL", "HR")}
return self.packer.make_can_msg_safety("ESC_51", 0, values)
def _speed_msg_2(self, speed_mps: float):
values = {"ESP_v_Signal": speed_mps * 3.6}
return self.packer.make_can_msg_safety("ESP_21", 0, values)
def _motor_14_msg(self, brake):
values = {"MO_Fahrer_bremst": brake}
return self.packer.make_can_msg_safety("Motor_14", 0, values)
def _user_brake_msg(self, brake):
return self._motor_14_msg(brake)
def _user_gas_msg(self, gas):
values = {"Accel_Pedal_Pressure": 1 if gas else 0, "TSK_Status": 3}
return self.packer.make_can_msg_safety("Motor_51", 0, values)
def _vehicle_moving_msg(self, speed_mps: float):
return self._speed_msg(speed_mps)
def _curvature_meas_msg(self, curvature):
values = {"Curvature": abs(curvature), "Curvature_VZ": curvature > 0}
return self.packer.make_can_msg_safety("QFK_01", 0, values)
def _curvature_cmd_msg(self, curvature, steer_req=1, power=50, increment_timer=True):
if increment_timer:
self.safety.set_timer(self.cnt_curvature_cmd * int(1e6 / self.LATERAL_FREQUENCY))
self.__class__.cnt_curvature_cmd += 1
values = {
"Curvature": abs(curvature),
"Curvature_VZ": curvature > 0,
"RequestStatus": 4 if steer_req else 0,
"Power": power,
}
return self.packer.make_can_msg_safety("HCA_03", 0, values)
def _tsk_status_msg(self, enable, main_switch=True):
if main_switch:
tsk_status = 3 if enable else 2
else:
tsk_status = 0
values = {"TSK_Status": tsk_status}
return self.packer.make_can_msg_safety("Motor_51", 0, values)
def _pcm_status_msg(self, enable):
return self._tsk_status_msg(enable)
def _torque_driver_msg(self, torque):
values = {"EPS_Lenkmoment": abs(torque), "EPS_VZ_Lenkmoment": torque < 0}
return self.packer.make_can_msg_safety("LH_EPS_03", 0, values)
def _button_msg(self, cancel=0, resume=0, _set=0, bus=2):
values = {"GRA_Abbrechen": cancel, "GRA_Tip_Setzen": _set, "GRA_Tip_Wiederaufnahme": resume}
return self.packer.make_can_msg_safety("GRA_ACC_01", bus, values)
def test_curvature_measurements(self):
self._rx(self._curvature_meas_msg(0.15))
self._rx(self._curvature_meas_msg(-0.1))
self._rx(self._curvature_meas_msg(0))
self._rx(self._curvature_meas_msg(0))
self._rx(self._curvature_meas_msg(0))
self._rx(self._curvature_meas_msg(0))
self.assertEqual(int(-0.1 * self.CURVATURE_TO_CAN), self.safety.get_curvature_meas_min())
self.assertEqual(int(0.15 * self.CURVATURE_TO_CAN), self.safety.get_curvature_meas_max())
self._rx(self._curvature_meas_msg(0))
self.assertEqual(0, self.safety.get_curvature_meas_max())
self.assertEqual(int(-0.1 * self.CURVATURE_TO_CAN), self.safety.get_curvature_meas_min())
self._rx(self._curvature_meas_msg(0))
self.assertEqual(0, self.safety.get_curvature_meas_max())
self.assertEqual(0, self.safety.get_curvature_meas_min())
def test_brake_signal(self):
self._rx(self._user_brake_msg(False))
self.assertFalse(self.safety.get_brake_pressed_prev())
self._rx(self._user_brake_msg(True))
self.assertTrue(self.safety.get_brake_pressed_prev())
def test_torque_driver_measurements(self):
for t in (0, 100, -100, 250, -250):
self._rx(self._torque_driver_msg(t))
def test_main_switch_off_disables_controls(self):
self.safety.set_controls_allowed(True)
self._rx(self._tsk_status_msg(False, main_switch=False))
self.assertFalse(self.safety.get_controls_allowed())
def test_cancel_button_rising_edge(self):
self.safety.set_controls_allowed(True)
self._rx(self._button_msg(cancel=1, bus=0))
self.assertFalse(self.safety.get_controls_allowed())
def test_rx_hook_speed_mismatch(self):
for speed in np.arange(0, 40, 0.5):
for speed_delta in np.arange(-5, 5, 0.1):
speed_2 = round(max(speed + speed_delta, 0), 1)
self._rx(self._speed_msg(speed))
self._rx(self._speed_msg_2(speed_2))
self.safety.set_controls_allowed(True)
self._tx(self._curvature_cmd_msg(0, steer_req=True))
within_delta = abs(speed - speed_2) <= common.MAX_SPEED_DELTA
self.assertEqual(self.safety.get_controls_allowed(), within_delta)
class TestVolkswagenMebStockSafety(TestVolkswagenMebSafetyBase):
FWD_BLACKLISTED_ADDRS = {0: [MSG_KLR_01], 2: [MSG_HCA_03, MSG_LDW_02]}
TX_MSGS = [[MSG_HCA_03, 0], [MSG_LDW_02, 0], [MSG_GRA_ACC_01, 0], [MSG_GRA_ACC_01, 2],
[MSG_KLR_01, 0], [MSG_KLR_01, 2]]
def setUp(self):
self.packer = CANPackerSafety("vw_meb")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, 0)
self.safety.init_tests()
def test_curvature_violation(self):
# if violation occurs, MEB resets desired_curvature_last to measured curvature
meas = self.MAX_CURVATURE_TEST / 4
self.safety.set_controls_allowed(True)
self._reset_curvature_measurement(meas)
self._set_prev_desired_curvature(0)
# cause a violation by sending a command far from prev=0
self.assertFalse(self._tx(self._curvature_cmd_msg(self.MAX_CURVATURE_TEST, steer_req=True, power=50)))
# prev should track curvature_meas
self.assertEqual(self.safety.get_curvature_meas_min(), self.safety.get_desired_curvature_last())
def test_curvature_cmd_when_not_steering(self):
# Tests that only a zero curvature is allowed while the steer
# actuation bit is 0, regardless of controls allowed or meas
for controls_allowed in (True, False):
self.safety.set_controls_allowed(controls_allowed)
for steer_req in (True, False):
for curvature_meas in np.arange(-self.MAX_CURVATURE_TEST, self.MAX_CURVATURE_TEST, self.MAX_CURVATURE_TEST / 5):
self._reset_curvature_measurement(curvature_meas)
for curvature_cmd in np.arange(-self.MAX_CURVATURE_TEST, self.MAX_CURVATURE_TEST, self.MAX_CURVATURE_TEST / 5):
self._set_prev_desired_curvature(curvature_cmd)
# controls_allowed is checked if actuation bit is 1, else the curvature must be zero (inactive)
should_tx = controls_allowed if steer_req else round(curvature_cmd * self.CURVATURE_TO_CAN) == 0
self.assertEqual(should_tx, self._tx(self._curvature_cmd_msg(curvature_cmd, steer_req=steer_req, power=50 if steer_req else 0)))
def test_spam_cancel_safety_check(self):
self.safety.set_controls_allowed(0)
self.assertTrue(self._tx(self._button_msg(cancel=1)))
self.assertFalse(self._tx(self._button_msg(resume=1)))
self.assertFalse(self._tx(self._button_msg(_set=1)))
self.safety.set_controls_allowed(1)
self.assertTrue(self._tx(self._button_msg(resume=1)))
# ZAS_Kl_15=1
class TestVolkswagenMebIgnition(unittest.TestCase):
TX_MSGS: list = []
@@ -18,7 +258,6 @@ class TestVolkswagenMebIgnition(unittest.TestCase):
{"Klemmen_Status_01_BZ": counter,
"ZAS_Kl_15": ign})
# ZAS_Kl_15=1
def test_ignition_on(self):
for i in range(16):
self.safety.init_tests()