Volkswagen ID.4 longitudinal (#3424)

* VW ID.4 port

* VW MEB: safety vehicle model + tests

* safety tests

* cleanup

* fix test

* fix

* safety

* use curvature

* no pid

* clean long

* longitudinal

* add fcw

* allowance=80

* add carstate signals

* typo

* inline power

* add power tests

* fix mutation test

* update route

* flags

* merge master

* camera harness

* add blinker controls

* add tests

* rm rx

* fix ruff

* fix

* revert: blinker controls

* brake fault

* Update opendbc/car/volkswagen/interface.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* Update opendbc/car/volkswagen/carstate.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* curvature limits

* rm stopAccel

* add pid

* starting

* pid correction

* apply ford limits

* only startaccel

* curvature std limits

* dedup tesla

* add ford safety

* integrator

* misra

* use lat accel

* ruff

* add ford limit fields

* tests

* simple ford curvature

* add test

* reduce

* lower jerk

* limit winddown

* lateral only

* Update opendbc/car/ford/carcontroller.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* comment

* unify max lat accel

* add test

* fix

* cleanup tests

* fix

* Revert "limit winddown"

This reverts commit 289e610f09.

* add back rt

* update lines

* update lines

* add is_zero from meb

* Revert "add is_zero from meb"

This reverts commit 1c97e6f658.

* note

* rm

* use curv=0

* no ford

* use power

* split from long

* fix

* comments

* cleanup

* fix

* comment

* long port

* comment

* reorder tests

* config struct

* direct config

* Revert "direct config"

This reverts commit 66b1ef5d9a.

* align

* fix

* Update opendbc/car/lateral.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* steerLimit

* steerLimit

* inactive

* inactive

* change to rt jerk value

* add rolling window

* group to struct

* rename new limit method

* more

* more

* add second speed source

* rm long acc_main_on

* align curvature to actual

* merge lateral

* extend

* rm old comment

* undeprecate curvature

* only op long

* Update opendbc/safety/modes/volkswagen_meb.h

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* Update opendbc/safety/modes/volkswagen_meb.h

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* allow debug

* redundant speed in cmd checks

* undeprecate curvature

* fix test

* whitelist cancel

* mutations

* merge vw-id4 into id4-long

* safety cleanup

* lil radar cleanup

* rm radar parser

* bring back stock acc

* mebcan

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
Daniel Koepping
2026-06-05 23:50:49 -07:00
committed by GitHub
parent 4cd4b8cc18
commit 4634ef32c6
7 changed files with 428 additions and 51 deletions

View File

@@ -45,14 +45,17 @@ class CarController(CarControllerBase):
self.CCS = pqcan
elif CP.flags & VolkswagenFlags.MLB:
self.CCS = mlbcan
elif CP.flags & VolkswagenFlags.MEB:
self.CCS = mebcan
else:
self.CCS = mqbcan
self.apply_torque_last = 0
self.apply_curvature_last = 0.
self.steering_power_last = 0
self.accel_last = 0.
self.long_override_counter = 0
self.long_disabled_counter = 0
self.lead_distance_bars_last = None
self.distance_bar_frame = 0
self.gra_acc_counter_last = None
self.hca_mitigation = HCAMitigation(self.CCP)
@@ -95,7 +98,7 @@ class CarController(CarControllerBase):
apply_curvature = 0. # inactive curvature
steering_power = 0
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_curvature, hca_enabled, steering_power))
can_sends.append(mebcan.create_steering_control(self.packer_pt, self.CAN.pt, apply_curvature, hca_enabled, steering_power))
self.apply_curvature_last = apply_curvature
self.steering_power_last = steering_power
@@ -129,12 +132,34 @@ class CarController(CarControllerBase):
if self.CP.openpilotLongitudinalControl:
if self.frame % self.CCP.ACC_CONTROL_STEP == 0:
acc_control = self.CCS.acc_control_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.longActive)
accel = float(np.clip(actuators.accel, self.CCP.ACCEL_MIN, self.CCP.ACCEL_MAX) if CC.longActive else 0)
stopping = actuators.longControlState == LongCtrlState.stopping
starting = actuators.longControlState == LongCtrlState.pid and (CS.esp_hold_confirmation or CS.out.vEgo < self.CP.vEgoStopping)
can_sends.extend(self.CCS.create_acc_accel_control(self.packer_pt, self.CAN.pt, CS.acc_type, CC.longActive, accel,
acc_control, stopping, starting, CS.esp_hold_confirmation))
if self.CP.flags & VolkswagenFlags.MEB:
starting = actuators.longControlState == LongCtrlState.starting and CS.out.vEgo <= self.CP.vEgoStarting
accel = float(np.clip(actuators.accel, self.CCP.ACCEL_MIN, self.CCP.ACCEL_MAX) if CC.enabled else 0)
long_override = CC.cruiseControl.override or CS.out.gasPressed
self.long_override_counter = min(self.long_override_counter + 1, 5) if long_override else 0
long_override_begin = long_override and self.long_override_counter < 5
self.long_disabled_counter = min(self.long_disabled_counter + 1, 5) if not CC.enabled else 0
long_disabling = not CC.enabled and self.long_disabled_counter < 5
acc_control = mebcan.acc_control_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.enabled, long_override)
acc_hold_type = mebcan.acc_hold_type(CS.out.cruiseState.available, CS.out.accFaulted, CC.enabled, starting, stopping,
CS.esp_hold_confirmation, long_override, long_override_begin, long_disabling)
can_sends.extend(mebcan.create_acc_accel_control(self.packer_pt, self.CAN.pt, CS.acc_type, CC.enabled,
4.0, 4.0, 0., 0.,
accel, acc_control, acc_hold_type, stopping, starting, CS.esp_hold_confirmation,
CS.out.vEgoRaw * CV.MS_TO_KPH, long_override, CS.travel_assist_available))
self.accel_last = accel
else:
acc_control = self.CCS.acc_control_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.longActive)
accel = float(np.clip(actuators.accel, self.CCP.ACCEL_MIN, self.CCP.ACCEL_MAX) if CC.longActive else 0)
starting = actuators.longControlState == LongCtrlState.pid and (CS.esp_hold_confirmation or CS.out.vEgo < self.CP.vEgoStopping)
can_sends.extend(self.CCS.create_acc_accel_control(self.packer_pt, self.CAN.pt, CS.acc_type, CC.longActive, accel,
acc_control, stopping, starting, CS.esp_hold_confirmation))
#if self.aeb_available:
# if self.frame % self.CCP.AEB_CONTROL_STEP == 0:
@@ -151,16 +176,32 @@ class CarController(CarControllerBase):
can_sends.append(self.CCS.create_lka_hud_control(self.packer_pt, self.CAN.pt, CS.ldw_stock_values, CC.latActive,
CS.out.steeringPressed, hud_alert, hud_control))
if hud_control.leadDistanceBars != self.lead_distance_bars_last:
self.distance_bar_frame = self.frame
if self.frame % self.CCP.ACC_HUD_STEP == 0 and self.CP.openpilotLongitudinalControl:
lead_distance = 0
if hud_control.leadVisible and self.frame * DT_CTRL > 1.0: # Don't display lead until we know the scaling factor
lead_distance = 512 if CS.upscale_lead_car_signal else 8
acc_hud_status = self.CCS.acc_hud_status_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.longActive)
# FIXME: PQ may need to use the on-the-wire mph/kmh toggle to fix rounding errors
# FIXME: Detect clusters with vEgoCluster offsets and apply an identical vCruiseCluster offset
set_speed = hud_control.setSpeed * CV.MS_TO_KPH
can_sends.append(self.CCS.create_acc_hud_control(self.packer_pt, self.CAN.pt, acc_hud_status, set_speed,
lead_distance, hud_control.leadDistanceBars))
if self.CP.flags & VolkswagenFlags.MEB:
fcw_alert = hud_control.visualAlert == VisualAlert.fcw
show_distance_bars = self.frame - self.distance_bar_frame < 400
lead_distance = 0
if hud_control.leadVisible and self.frame * DT_CTRL > 1.0:
lead_distance = 8
acc_hud_status = mebcan.acc_hud_status_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.enabled,
CC.cruiseControl.override or CS.out.gasPressed)
can_sends.append(mebcan.create_acc_hud_control(self.packer_pt, self.CAN.pt, acc_hud_status, hud_control.setSpeed * CV.MS_TO_KPH,
hud_control.leadVisible, hud_control.leadDistanceBars + 1, show_distance_bars,
CS.esp_hold_confirmation, lead_distance, 0, fcw_alert))
else:
lead_distance = 0
if hud_control.leadVisible and self.frame * DT_CTRL > 1.0: # Don't display lead until we know the scaling factor
lead_distance = 512 if CS.upscale_lead_car_signal else 8
acc_hud_status = self.CCS.acc_hud_status_value(CS.out.cruiseState.available, CS.out.accFaulted, CC.longActive)
# FIXME: PQ may need to use the on-the-wire mph/kmh toggle to fix rounding errors
# FIXME: Detect clusters with vEgoCluster offsets and apply an identical vCruiseCluster offset
set_speed = hud_control.setSpeed * CV.MS_TO_KPH
can_sends.append(self.CCS.create_acc_hud_control(self.packer_pt, self.CAN.pt, acc_hud_status, set_speed,
lead_distance, hud_control.leadDistanceBars))
# **** Stock ACC Button Controls **************************************** #
@@ -173,7 +214,9 @@ class CarController(CarControllerBase):
new_actuators.torque = self.apply_torque_last / self.CCP.STEER_MAX
new_actuators.torqueOutputCan = self.apply_torque_last
new_actuators.curvature = self.apply_curvature_last
new_actuators.accel = self.accel_last
self.lead_distance_bars_last = hud_control.leadDistanceBars
self.gra_acc_counter_last = CS.gra_stock_values["COUNTER"]
self.frame += 1
return new_actuators, can_sends

View File

@@ -296,7 +296,8 @@ class CarState(CarStateBase):
else:
ret.cruiseState.nonAdaptive = bool(pt_cp.vl["Motor_51"]["TSK_Limiter_ausgewaehlt"])
accFaulted = pt_cp.vl["Motor_51"]["TSK_Status"] in (6, 7)
ret.accFaulted = self.update_acc_fault(accFaulted, parking_brake=ret.parkingBrake, drive_mode=drive_mode)
ret.accFaulted = self.update_acc_fault(accFaulted, parking_brake=ret.parkingBrake, drive_mode=drive_mode,
brake_pressed=ret.brakePressed)
ret.leftBlinker, ret.rightBlinker = self.update_blinker_from_stalk(240, pt_cp.vl["SMLS_01"]["BH_Blinker_li"],
pt_cp.vl["SMLS_01"]["BH_Blinker_re"])
@@ -398,12 +399,13 @@ class CarState(CarStateBase):
temp_fault = drive_mode and hca_status in ("REJECTED", "PREEMPTED") or not self.eps_init_complete
return temp_fault, perm_fault
def update_acc_fault(self, acc_fault, parking_brake=False, drive_mode=True, recovery_frames_max=100):
def update_acc_fault(self, acc_fault, parking_brake=False, drive_mode=True, brake_pressed=False, recovery_frames_max=300):
# Ignore FAULT when not in drive mode and parked
# do not show misleading error during ignition in parked state
# grant a short time to recover a normal cruise state
# after hard brake, stock system prevents acc engage for ~3 seconds
fault = acc_fault
if parking_brake and not drive_mode:
if (parking_brake and not drive_mode) or brake_pressed:
fault = False
self.cruise_recovery_timer = self.frame
elif self.frame - self.cruise_recovery_timer < recovery_frames_max:

View File

@@ -66,9 +66,7 @@ class CarInterface(CarInterfaceBase):
if 0x3DC in fingerprint[0]: # Gateway_73
ret.flags |= VolkswagenFlags.ALT_GEAR.value
ret.radarUnavailable = True
# only allow gateway harness for now
# only allow gateway harness to escalate Emergency Assist
ret.dashcamOnly = is_release or ret.networkLocation == NetworkLocation.fwdCamera
else:
@@ -111,10 +109,14 @@ class CarInterface(CarInterfaceBase):
ret.lateralTuning.pid.kiV = [0.2]
# Global longitudinal tuning defaults, can be overridden per-vehicle
ret.alphaLongitudinalAvailable = not (ret.flags & VolkswagenFlags.MEB) and \
(ret.networkLocation == NetworkLocation.gateway or docs)
if alpha_long and not (ret.flags & VolkswagenFlags.MEB):
# Proof-of-concept, prep for E2E only. No radar points available. Panda ALLOW_DEBUG firmware required.
if ret.flags & VolkswagenFlags.MEB:
ret.longitudinalActuatorDelay = 0.5
ret.longitudinalTuning.kiBP = [0., 30.]
ret.longitudinalTuning.kiV = [0.4, 0.]
ret.alphaLongitudinalAvailable = ret.networkLocation == NetworkLocation.gateway or docs
if alpha_long:
ret.openpilotLongitudinalControl = True
safety_configs[0].safetyParam |= VolkswagenSafetyFlags.LONG_CONTROL.value
if ret.transmissionType == TransmissionType.manual:
@@ -126,10 +128,13 @@ class CarInterface(CarInterfaceBase):
ret.steerActuatorDelay = 0.07
ret.pcmCruise = not ret.openpilotLongitudinalControl
if not (ret.flags & VolkswagenFlags.MEB):
if ret.flags & VolkswagenFlags.MEB:
ret.startingState = True
ret.startAccel = 0.8
else:
ret.stopAccel = -0.55
ret.vEgoStopping = 0.5
ret.vEgoStarting = 0.1
ret.vEgoStopping = 0.5
ret.autoResumeSng = ret.minEnableSpeed == -1
CAN = CanBus(fingerprint=fingerprint)

View File

@@ -69,11 +69,197 @@ def create_acc_buttons_control(packer, bus, gra_stock_values, cancel=False, resu
return packer.make_can_msg("GRA_ACC_01", bus, values)
ACCEL_INACTIVE = 3.01
ACCEL_OVERRIDE = 0.00
ACC_CTRL_ERROR = 6
ACC_CTRL_OVERRIDE = 4
ACC_CTRL_ACTIVE = 3
ACC_CTRL_ENABLED = 2
ACC_CTRL_DISABLED = 0
ACC_HMS_RAMP_RELEASE = 5
ACC_HMS_RELEASE = 4
ACC_HMS_HOLD = 1
ACC_HMS_NO_REQUEST = 0
ACC_HUD_ERROR = 6
ACC_HUD_OVERRIDE = 4
ACC_HUD_ACTIVE = 3
ACC_HUD_ENABLED = 2
ACC_HUD_DISABLED = 0
def acc_control_value(main_switch_on, acc_faulted, long_active, override):
if acc_faulted:
acc_control = ACC_CTRL_ERROR # error state
elif long_active:
if override:
acc_control = ACC_CTRL_OVERRIDE # overriding
else:
acc_control = ACC_CTRL_ACTIVE # active long control state
elif main_switch_on:
acc_control = ACC_CTRL_ENABLED # long control ready
else:
acc_control = ACC_CTRL_DISABLED # long control deactivated state
return acc_control
def acc_hold_type(main_switch_on, acc_faulted, long_active, starting, stopping, esp_hold, override, override_begin, long_disabling):
# warning: car is reacting to hold mechanic even with long control off
if acc_faulted:
acc_hold_type = ACC_HMS_NO_REQUEST # no hold request
elif not long_active:
if long_disabling:
acc_hold_type = ACC_HMS_RAMP_RELEASE # ramp release of requests right after disabling long control (prevents car error with EPB at low speed)
else:
acc_hold_type = ACC_HMS_NO_REQUEST # no hold request
elif override:
if override_begin:
acc_hold_type = ACC_HMS_RAMP_RELEASE # ramp release of requests at the beginning of override (prevents car error with EPB at low speed)
else:
acc_hold_type = ACC_HMS_NO_REQUEST # overriding / no request
elif starting:
acc_hold_type = ACC_HMS_RELEASE # release request and startup
elif stopping or esp_hold:
acc_hold_type = ACC_HMS_HOLD # hold or hold request
else:
acc_hold_type = ACC_HMS_NO_REQUEST # no hold request
return acc_hold_type
def create_acc_accel_control(packer, bus, acc_type, acc_enabled, upper_jerk, lower_jerk, upper_control_limit, lower_control_limit,
accel, acc_control, acc_hold_type, stopping, starting, esp_hold, speed, override, travel_assist_available):
# active longitudinal control disables one pedal driving (regen mode) while using overriding mechanism
# error mitigation when stopping or stopped: (newer gen cars can be very sensitive)
# - send 0 m stopping distance for cars in kind of parameterized stopping mode (stopping accel -0.2 seen for those cars)
# -> this mode is seen for different cars with same firmware radars so could be a coded operational mode
# - jerk and control limits values set to 0 when fully stopped
# - set accel to 0 / no stop accel for full stop (seems to be compatible with old (non 0 stop accel) and new gen, because HMS state holds the car anyways)
# - stopping command sent as long as actually stopping
commands = []
# ACC_Anhalteweg: when stopping: MEB: values <> 0 the car can execute a hard brake probably if target is too close, MQBEvo: value 0 results in hard brake
terminal_rollout = 0
full_stop = stopping and esp_hold
full_stop_no_start = esp_hold and not starting
actually_stopping = stopping and not esp_hold
if acc_enabled:
if override: # the car expects a non inactive accel while overriding
acceleration = ACCEL_OVERRIDE # original ACC still sends active accel in this case (seamless experience)
elif full_stop:
acceleration = ACCEL_INACTIVE # inactive accel, newer gen >2024 error of not neutral value
else:
acceleration = accel
else:
acceleration = ACCEL_INACTIVE # inactive accel
values = {
"ACC_Typ": acc_type,
"ACC_Status_ACC": acc_control,
"ACC_StartStopp_Info": acc_enabled,
"ACC_Sollbeschleunigung_02": acceleration,
"ACC_zul_Regelabw_unten": lower_control_limit if acc_control in (ACC_CTRL_ACTIVE, ACC_CTRL_OVERRIDE) and not full_stop_no_start else 0,
"ACC_zul_Regelabw_oben": upper_control_limit if acc_control in (ACC_CTRL_ACTIVE, ACC_CTRL_OVERRIDE) and not full_stop_no_start else 0,
"ACC_neg_Sollbeschl_Grad_02": lower_jerk if acc_control in (ACC_CTRL_ACTIVE, ACC_CTRL_OVERRIDE) and not full_stop_no_start else 0,
"ACC_pos_Sollbeschl_Grad_02": upper_jerk if acc_control in (ACC_CTRL_ACTIVE, ACC_CTRL_OVERRIDE) and not full_stop_no_start else 0,
"ACC_Anfahren": starting,
"ACC_Anhalten": 1 if actually_stopping else 0,
"ACC_Anhalteweg": terminal_rollout if actually_stopping else 20.46,
"ACC_Anforderung_HMS": acc_hold_type,
"ACC_AKTIV_regelt": 1 if acc_control == ACC_CTRL_ACTIVE else 0,
"Speed": speed,
"SET_ME_0XFE": 0xFE,
"SET_ME_0X1": 0x1,
"SET_ME_0X9": 0x9,
}
commands.append(packer.make_can_msg("ACC_18", bus, values))
if travel_assist_available:
# satisfy car to prevent errors when pressing Travel Assist Button
values_ta = {
"Travel_Assist_Status": 4 if acc_enabled else 2,
"Travel_Assist_Request": 0,
"Travel_Assist_Available": 1,
}
commands.append(packer.make_can_msg("TA_01", bus, values_ta))
return commands
def acc_hud_status_value(main_switch_on, acc_faulted, long_active, override):
if acc_faulted:
acc_hud_control = ACC_HUD_ERROR # error state
elif long_active:
if override:
acc_hud_control = ACC_HUD_OVERRIDE # overriding
else:
acc_hud_control = ACC_HUD_ACTIVE # active
elif main_switch_on:
acc_hud_control = ACC_HUD_ENABLED # inactive
else:
acc_hud_control = ACC_HUD_DISABLED # deactivated
return acc_hud_control
def get_desired_gap(distance_bars, desired_gap, current_gap_signal):
# mapping desired gap to correct signal of corresponding distance bar
gap = 0
if distance_bars == current_gap_signal:
gap = desired_gap
return gap
def create_acc_hud_control(packer, bus, acc_control, set_speed, lead_visible, distance_bars, show_distance_bars, esp_hold, distance, desired_gap, fcw_alert):
values = {
"ACC_Status_ACC": acc_control,
"ACC_Tempolimit": 0,
"ACC_Wunschgeschw_02": set_speed if set_speed < 250 else 327.36,
"ACC_Gesetzte_Zeitluecke": distance_bars, # 5 distance bars available (3 are used by OP)
"ACC_Display_Prio": 0 if fcw_alert and acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 1, # probably keeping warning in front
"ACC_Optischer_Fahrerhinweis": 1 if fcw_alert and acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0, # enables optical warning
"ACC_Akustischer_Fahrerhinweis": 3 if fcw_alert and acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0, # enables sound warning
"ACC_Texte_Zusatzanz_02": 11 if fcw_alert and acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0, # type of warning: Break!
"ACC_Abstandsindex_02": 569, # seems to be default for MEB but is not static in every case
"ACC_EGO_Fahrzeug": 2 if fcw_alert and acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else
(1 if acc_control == ACC_HUD_ACTIVE else 0), # red car warn symbol for fcw
"Lead_Type_Detected": 1 if lead_visible else 0, # object should be displayed
"Lead_Type": 3 if lead_visible else 0, # displaying a car
"Lead_Distance": distance if lead_visible else 0, # hud distance of object
"ACC_Enabled": 1 if acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0,
"ACC_Standby_Override": 1 if acc_control != ACC_HUD_ACTIVE else 0,
"Street_Color": 1 if acc_control in (ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0, # light grey (1) or dark (0) street
"Lead_Brightness": 3 if acc_control == ACC_HUD_ACTIVE else 0, # object shows in color
"Zeitluecke_1": get_desired_gap(distance_bars, desired_gap, 1), # desired distance to lead object for distance bar 1
"Zeitluecke_2": get_desired_gap(distance_bars, desired_gap, 2), # desired distance to lead object for distance bar 2
"Zeitluecke_3": get_desired_gap(distance_bars, desired_gap, 3), # desired distance to lead object for distance bar 3
"Zeitluecke_4": get_desired_gap(distance_bars, desired_gap, 4), # desired distance to lead object for distance bar 4
"Zeitluecke_5": get_desired_gap(distance_bars, desired_gap, 5), # desired distance to lead object for distance bar 5
"Zeitluecke_Farbe": 1 if acc_control in (ACC_HUD_ENABLED, ACC_HUD_ACTIVE, ACC_HUD_OVERRIDE) else 0, # yellow (1) or white (0) time gap
"ACC_Anzeige_Zeitluecke": show_distance_bars if acc_control != ACC_HUD_DISABLED else 0, # show distance bar selection
"SET_ME_0X1": 0x1, # unknown
"SET_ME_0X6A": 0x6A, # unknown
"SET_ME_0XFFFF": 0xFFFF, # unknown
"SET_ME_0X7FFF": 0x7FFF, # unknown
}
return packer.make_can_msg("MEB_ACC_01", bus, values)
def create_capacitive_wheel_touch(packer, bus, lat_active, klr_stock_values):
# KLR_01: capacitive steering wheel hands-on signal
# Sent by J1158
# Consumed by R242 (camera) for lane assist hands on and J428 (ACC / Travel Assist / Emergency Assist)
# for resume and EA control
values = {s: klr_stock_values[s] for s in [
"COUNTER",
"KLR_Touchintensitaet_1",

View File

@@ -246,7 +246,7 @@ class VolkswagenMQBPlatformConfig(PlatformConfig):
@dataclass
class VolkswagenMEBPlatformConfig(PlatformConfig):
dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_meb'})
dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_meb', Bus.radar: 'vw_meb'})
chassis_codes: set[str] = field(default_factory=set)
wmis: set[WMI] = field(default_factory=set)

View File

@@ -4,11 +4,14 @@
#include "opendbc/safety/modes/volkswagen_common.h"
#define MSG_ESC_51 0xFCU // RX, for wheel speeds
#define MSG_ACC_18 0x14DU // TX by OP, ACC control instructions to the drivetrain coordinator
#define MSG_ESP_21 0xFDU // RX, redundant vehicle speed source
#define MSG_HCA_03 0x303U
#define MSG_MEB_ACC_01 0x300U // TX by OP, ACC HUD data to the instrument cluster
#define MSG_QFK_01 0x13DU
#define MSG_Motor_51 0x10BU // RX for TSK state and accel pedal
#define MSG_KLR_01 0x25DU // TX, for capacitive steering wheel
#define MSG_TA_01 0x26BU // TX by OP, Travel Assist status
static uint32_t volkswagen_meb_compute_crc(const CANPacket_t *msg) {
@@ -43,7 +46,7 @@ static uint32_t volkswagen_meb_compute_crc(const CANPacket_t *msg) {
static safety_config volkswagen_meb_init(uint16_t param) {
// Transmit of GRA_ACC_01 is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration
static const CanMsg VOLKSWAGEN_MEB_TX_MSGS[] = {
static const CanMsg VOLKSWAGEN_MEB_STOCK_TX_MSGS[] = {
{MSG_HCA_03, 0, 24, .check_relay = true},
{MSG_GRA_ACC_01, 0, 8, .check_relay = false},
{MSG_GRA_ACC_01, 2, 8, .check_relay = false},
@@ -52,6 +55,16 @@ static safety_config volkswagen_meb_init(uint16_t param) {
{MSG_KLR_01, 2, 8, .check_relay = true},
};
static const CanMsg VOLKSWAGEN_MEB_LONG_TX_MSGS[] = {
{MSG_HCA_03, 0, 24, .check_relay = true},
{MSG_LDW_02, 0, 8, .check_relay = true},
{MSG_KLR_01, 0, 8, .check_relay = false},
{MSG_KLR_01, 2, 8, .check_relay = true},
{MSG_MEB_ACC_01, 0, 48, .check_relay = true},
{MSG_ACC_18, 0, 32, .check_relay = true},
{MSG_TA_01, 0, 8, .check_relay = true},
};
static RxCheck volkswagen_meb_rx_checks[] = {
{.msg = {{MSG_LH_EPS_03, 0, 8, 100U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}},
{.msg = {{MSG_MOTOR_14, 0, 8, 10U, .ignore_checksum = true, .ignore_counter = true, .ignore_quality_flag = true}, { 0 }, { 0 }}},
@@ -63,9 +76,15 @@ static safety_config volkswagen_meb_init(uint16_t param) {
};
volkswagen_common_init();
SAFETY_UNUSED(param);
return BUILD_SAFETY_CFG(volkswagen_meb_rx_checks, VOLKSWAGEN_MEB_TX_MSGS);
#ifdef ALLOW_DEBUG
volkswagen_longitudinal = GET_FLAG(param, FLAG_VOLKSWAGEN_LONG_CONTROL);
#else
SAFETY_UNUSED(param);
#endif
return volkswagen_longitudinal ? BUILD_SAFETY_CFG(volkswagen_meb_rx_checks, VOLKSWAGEN_MEB_LONG_TX_MSGS) : \
BUILD_SAFETY_CFG(volkswagen_meb_rx_checks, VOLKSWAGEN_MEB_STOCK_TX_MSGS);
}
static void volkswagen_meb_rx_hook(const CANPacket_t *msg) {
@@ -100,14 +119,34 @@ static void volkswagen_meb_rx_hook(const CANPacket_t *msg) {
if (msg->addr == MSG_Motor_51) {
int acc_status = (msg->data[11] & 0x07U);
bool cruise_engaged = (acc_status == 3) || (acc_status == 4) || (acc_status == 5);
acc_main_on = cruise_engaged || (acc_status == 2);
pcm_cruise_check(cruise_engaged);
if (!volkswagen_longitudinal) {
pcm_cruise_check(cruise_engaged);
}
if (!acc_main_on) {
controls_allowed = false;
}
int accel_pedal_value = ((msg->data[1] >> 4) & 0x0FU) | ((msg->data[2] & 0x1FU) << 4);
gas_pressed = accel_pedal_value > 0;
}
if (msg->addr == MSG_GRA_ACC_01) {
// If using openpilot longitudinal, enter controls on falling edge of Set or Resume with main switch on
// Signal: GRA_ACC_01.GRA_Tip_Setzen
// Signal: GRA_ACC_01.GRA_Tip_Wiederaufnahme
if (volkswagen_longitudinal) {
bool set_button = GET_BIT(msg, 16U);
bool resume_button = GET_BIT(msg, 19U);
if ((volkswagen_set_button_prev && !set_button) || (volkswagen_resume_button_prev && !resume_button)) {
controls_allowed = acc_main_on;
}
volkswagen_set_button_prev = set_button;
volkswagen_resume_button_prev = resume_button;
}
// Always exit controls on rising edge of Cancel
if (GET_BIT(msg, 13U)) {
controls_allowed = false;
@@ -121,9 +160,26 @@ static void volkswagen_meb_rx_hook(const CANPacket_t *msg) {
}
static bool volkswagen_meb_tx_hook(const CANPacket_t *msg) {
// MEB is lateral-only; no openpilot longitudinal TX messages are allowed.
// acceleration in m/s2 * 1000 to avoid floating point math
const LongitudinalLimits VOLKSWAGEN_MEB_LONG_LIMITS = {
.max_accel = 2000,
.min_accel = -3500,
.inactive_accel = 3010, // VW sends one increment above the max range when inactive
};
bool tx = true;
// Safety check for MSG_ACC_18 acceleration requests
if (msg->addr == MSG_ACC_18) {
// Signal: ACC_18.ACC_Sollbeschleunigung_02 (acceleration in m/s2, scale 0.005, offset -7.22)
int desired_accel = ((((msg->data[4] & 0x7U) << 8) | msg->data[3]) * 5U) - 7220U;
// allow ACCEL_OVERRIDE (0) while controls are allowed even when the driver is on the gas
bool accel_override = controls_allowed && (desired_accel == 0);
if (!accel_override && longitudinal_accel_checks(desired_accel, VOLKSWAGEN_MEB_LONG_LIMITS)) {
tx = false;
}
}
if (msg->addr == MSG_HCA_03) {
const CurvatureSteeringLimits VOLKSWAGEN_MEB_STEERING_LIMITS = {
.max_curvature = 29105,

View File

@@ -3,17 +3,24 @@ import numpy as np
import unittest
from opendbc.car.structs import CarParams
from opendbc.car.volkswagen.values import VolkswagenSafetyFlags
from opendbc.safety.tests.libsafety import libsafety_py
import opendbc.safety.tests.common as common
from opendbc.safety.tests.common import CANPackerSafety
MAX_ACCEL = 2.0
MIN_ACCEL = -3.5
# MEB message IDs
MSG_LH_EPS_03 = 0x9F
MSG_ESC_51 = 0xFC
MSG_Motor_51 = 0x10B
MSG_GRA_ACC_01 = 0x12B
MSG_QFK_01 = 0x13D
MSG_ACC_18 = 0x14D
MSG_KLR_01 = 0x25D
MSG_TA_01 = 0x26B
MSG_MEB_ACC_01 = 0x300
MSG_HCA_03 = 0x303
MSG_LDW_02 = 0x397
MSG_MOTOR_14 = 0x3BE
@@ -124,6 +131,10 @@ class TestVolkswagenMebSafetyBase(common.CarSafetyTest, common.CurvatureSteering
}
return self.packer.make_can_msg_safety("HCA_03", 0, values)
def _accel_msg(self, accel):
values = {"ACC_Sollbeschleunigung_02": accel}
return self.packer.make_can_msg_safety("ACC_18", 0, values)
def _tsk_status_msg(self, enable, main_switch=True):
if main_switch:
tsk_status = 3 if enable else 2
@@ -194,18 +205,6 @@ class TestVolkswagenMebSafetyBase(common.CarSafetyTest, common.CurvatureSteering
within_delta = abs(speed - speed_2) <= common.MAX_SPEED_DELTA
self.assertEqual(self.safety.get_controls_allowed(), within_delta)
class TestVolkswagenMebStockSafety(TestVolkswagenMebSafetyBase):
FWD_BLACKLISTED_ADDRS = {0: [MSG_KLR_01], 2: [MSG_HCA_03, MSG_LDW_02]}
TX_MSGS = [[MSG_HCA_03, 0], [MSG_LDW_02, 0], [MSG_GRA_ACC_01, 0], [MSG_GRA_ACC_01, 2],
[MSG_KLR_01, 0], [MSG_KLR_01, 2]]
def setUp(self):
self.packer = CANPackerSafety("vw_meb")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, 0)
self.safety.init_tests()
def test_curvature_violation(self):
# if violation occurs, MEB resets desired_curvature_last to measured curvature
meas = self.MAX_CURVATURE_TEST / 4
@@ -236,6 +235,18 @@ class TestVolkswagenMebStockSafety(TestVolkswagenMebSafetyBase):
should_tx = controls_allowed if steer_req else round(curvature_cmd * self.CURVATURE_TO_CAN) == 0
self.assertEqual(should_tx, self._tx(self._curvature_cmd_msg(curvature_cmd, steer_req=steer_req, power=50 if steer_req else 0)))
class TestVolkswagenMebStockSafety(TestVolkswagenMebSafetyBase):
TX_MSGS = [[MSG_HCA_03, 0], [MSG_LDW_02, 0], [MSG_GRA_ACC_01, 0], [MSG_GRA_ACC_01, 2],
[MSG_KLR_01, 0], [MSG_KLR_01, 2]]
FWD_BLACKLISTED_ADDRS = {0: [MSG_KLR_01], 2: [MSG_HCA_03, MSG_LDW_02]}
def setUp(self):
self.packer = CANPackerSafety("vw_meb")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, 0)
self.safety.init_tests()
def test_spam_cancel_safety_check(self):
self.safety.set_controls_allowed(0)
self.assertTrue(self._tx(self._button_msg(cancel=1)))
@@ -244,6 +255,80 @@ class TestVolkswagenMebStockSafety(TestVolkswagenMebSafetyBase):
self.safety.set_controls_allowed(1)
self.assertTrue(self._tx(self._button_msg(resume=1)))
class TestVolkswagenMebLongSafety(TestVolkswagenMebSafetyBase):
TX_MSGS = [[MSG_HCA_03, 0], [MSG_LDW_02, 0], [MSG_MEB_ACC_01, 0], [MSG_ACC_18, 0],
[MSG_TA_01, 0], [MSG_KLR_01, 0], [MSG_KLR_01, 2]]
FWD_BLACKLISTED_ADDRS = {0: [MSG_KLR_01],
2: [MSG_HCA_03, MSG_LDW_02, MSG_MEB_ACC_01, MSG_ACC_18, MSG_TA_01]}
RELAY_MALFUNCTION_ADDRS = {0: (MSG_HCA_03, MSG_LDW_02, MSG_MEB_ACC_01, MSG_ACC_18, MSG_TA_01),
2: (MSG_KLR_01,)}
ALLOW_OVERRIDE = True
ACCEL_OVERRIDE = 0
INACTIVE_ACCEL = 3.01
def setUp(self):
self.packer = CANPackerSafety("vw_meb")
self.safety = libsafety_py.libsafety
self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, VolkswagenSafetyFlags.LONG_CONTROL)
self.safety.init_tests()
# stock cruise controls are entirely bypassed under openpilot longitudinal control
def test_disable_control_allowed_from_cruise(self):
pass
def test_enable_control_allowed_from_cruise(self):
pass
def test_cruise_engaged_prev(self):
pass
def test_set_and_resume_buttons(self):
for button in ["set", "resume"]:
# ACC main switch must be on, engage on falling edge
self.safety.set_controls_allowed(0)
self._rx(self._tsk_status_msg(False, main_switch=False))
self._rx(self._button_msg(_set=(button == "set"), resume=(button == "resume"), bus=0))
self.assertFalse(self.safety.get_controls_allowed(), f"controls allowed on {button} with main switch off")
self._rx(self._tsk_status_msg(False, main_switch=True))
self._rx(self._button_msg(_set=(button == "set"), resume=(button == "resume"), bus=0))
self.assertFalse(self.safety.get_controls_allowed(), f"controls allowed on {button} rising edge")
self._rx(self._button_msg(bus=0))
self.assertTrue(self.safety.get_controls_allowed(), f"controls not allowed on {button} falling edge")
def test_cancel_button(self):
# Disable on rising edge of cancel button
self._rx(self._tsk_status_msg(False, main_switch=True))
self.safety.set_controls_allowed(1)
self._rx(self._button_msg(cancel=True, bus=0))
self.assertFalse(self.safety.get_controls_allowed(), "controls allowed after cancel")
def test_main_switch(self):
# Disable as soon as main switch turns off
self._rx(self._tsk_status_msg(False, main_switch=True))
self.safety.set_controls_allowed(1)
self._rx(self._tsk_status_msg(False, main_switch=False))
self.assertFalse(self.safety.get_controls_allowed(), "controls allowed after ACC main switch off")
def test_accel_safety_check(self):
for controls_allowed in [True, False]:
for accel in np.concatenate((np.arange(MIN_ACCEL - 2, MAX_ACCEL + 2, 0.03), [0, self.INACTIVE_ACCEL])):
accel = round(accel, 2)
is_inactive_accel = accel == self.INACTIVE_ACCEL
send = (controls_allowed and MIN_ACCEL <= accel <= MAX_ACCEL) or is_inactive_accel
self.safety.set_controls_allowed(controls_allowed)
self.assertEqual(send, self._tx(self._accel_msg(accel)), (controls_allowed, accel))
def test_accel_override_with_gas(self):
if not self.ALLOW_OVERRIDE:
pass
self.safety.set_controls_allowed(True)
self.safety.set_gas_pressed_prev(True)
self.assertTrue(self._tx(self._accel_msg(self.ACCEL_OVERRIDE)))
self.assertFalse(self._tx(self._accel_msg(MAX_ACCEL)))
# ZAS_Kl_15=1
class TestVolkswagenMebIgnition(unittest.TestCase):
TX_MSGS: list = []