Files
opendbc-meb/opendbc/car/volkswagen/carstate.py
Jason Wen c4f5a4f9d2 Merge branch 'upstream/opendbc/master' into sync-20260401
# Conflicts:
#	docs/CARS.md
#	opendbc/car/car_helpers.py
#	opendbc/car/chrysler/carcontroller.py
#	opendbc/car/chrysler/values.py
#	opendbc/car/honda/carcontroller.py
#	opendbc/car/honda/hondacan.py
#	opendbc/car/hyundai/hyundaicanfd.py
#	opendbc/car/rivian/interface.py
2026-04-02 21:05:06 -04:00

348 lines
17 KiB
Python

from opendbc.can import CANParser
from opendbc.car import Bus, structs
from opendbc.car.interfaces import CarStateBase
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.volkswagen.values import DBC, CanBus, NetworkLocation, TransmissionType, GearShifter, \
CarControllerParams, VolkswagenFlags
ButtonType = structs.CarState.ButtonEvent.Type
class CarState(CarStateBase):
def __init__(self, CP, CP_SP):
super().__init__(CP, CP_SP)
self.frame = 0
self.eps_init_complete = False
self.CCP = CarControllerParams(CP)
self.button_states = {button.event_type: False for button in self.CCP.BUTTONS}
self.esp_hold_confirmation = False
self.upscale_lead_car_signal = False
self.eps_stock_values = False
self.acc_type = 0
def update_button_enable(self, buttonEvents: list[structs.CarState.ButtonEvent]):
if not self.CP.pcmCruise:
for b in buttonEvents:
# Enable OP long on falling edge of enable buttons
if b.type in (ButtonType.setCruise, ButtonType.resumeCruise) and not b.pressed:
return True
return False
def create_button_events(self, pt_cp, buttons):
button_events = []
for button in buttons:
state = pt_cp.vl[button.can_addr][button.can_msg] in button.values
if self.button_states[button.event_type] != state:
event = structs.CarState.ButtonEvent()
event.type = button.event_type
event.pressed = state
button_events.append(event)
self.button_states[button.event_type] = state
return button_events
def update(self, can_parsers) -> tuple[structs.CarState, structs.CarStateSP]:
pt_cp = can_parsers[Bus.pt]
cam_cp = can_parsers[Bus.cam]
ext_cp = pt_cp if self.CP.networkLocation == NetworkLocation.fwdCamera else cam_cp
alt_cp = can_parsers[Bus.alt]
if self.CP.flags & VolkswagenFlags.PQ:
return self.update_pq(pt_cp, cam_cp, ext_cp)
elif self.CP.flags & VolkswagenFlags.MLB:
return self.update_mlb(pt_cp, cam_cp, ext_cp, alt_cp)
ret = structs.CarState()
ret_sp = structs.CarStateSP()
if self.CP.transmissionType == TransmissionType.direct:
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Motor_EV_01"]["MO_Waehlpos"], None))
elif self.CP.transmissionType == TransmissionType.manual:
if bool(pt_cp.vl["Gateway_72"]["BCM1_Rueckfahrlicht_Schalter"]):
ret.gearShifter = GearShifter.reverse
else:
ret.gearShifter = GearShifter.drive
else:
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Gateway_73"]["GE_Fahrstufe"], None))
if True:
# MQB-specific
if self.CP.flags & VolkswagenFlags.KOMBI_PRESENT:
self.upscale_lead_car_signal = bool(pt_cp.vl["Kombi_03"]["KBI_Variante"]) # Analog vs digital instrument cluster
self.parse_wheel_speeds(ret,
pt_cp.vl["ESP_19"]["ESP_VL_Radgeschw_02"],
pt_cp.vl["ESP_19"]["ESP_VR_Radgeschw_02"],
pt_cp.vl["ESP_19"]["ESP_HL_Radgeschw_02"],
pt_cp.vl["ESP_19"]["ESP_HR_Radgeschw_02"],
)
if self.CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT:
ret.carFaultedNonCritical = bool(cam_cp.vl["HCA_01"]["EA_Ruckfreigabe"]) or cam_cp.vl["HCA_01"]["EA_ACC_Sollstatus"] > 0 # EA
ret.brake = pt_cp.vl["ESP_05"]["ESP_Bremsdruck"] / 250.0 # FIXME: this is pressure in Bar, not sure what OP expects
brake_pedal_pressed = bool(pt_cp.vl["Motor_14"]["MO_Fahrer_bremst"])
brake_pressure_detected = bool(pt_cp.vl["ESP_05"]["ESP_Fahrer_bremst"])
ret.brakePressed = brake_pedal_pressed or brake_pressure_detected
ret.parkingBrake = bool(pt_cp.vl["Kombi_01"]["KBI_Handbremse"]) # FIXME: need to include an EPB check as well
ret.doorOpen = any([pt_cp.vl["Gateway_72"]["ZV_FT_offen"],
pt_cp.vl["Gateway_72"]["ZV_BT_offen"],
pt_cp.vl["Gateway_72"]["ZV_HFS_offen"],
pt_cp.vl["Gateway_72"]["ZV_HBFS_offen"],
pt_cp.vl["Gateway_72"]["ZV_HD_offen"]])
if self.CP.enableBsm:
# Infostufe: BSM LED on, Warnung: BSM LED flashing
ret.leftBlindspot = bool(ext_cp.vl["SWA_01"]["SWA_Infostufe_SWA_li"]) or bool(ext_cp.vl["SWA_01"]["SWA_Warnung_SWA_li"])
ret.rightBlindspot = bool(ext_cp.vl["SWA_01"]["SWA_Infostufe_SWA_re"]) or bool(ext_cp.vl["SWA_01"]["SWA_Warnung_SWA_re"])
ret.stockFcw = bool(ext_cp.vl["ACC_10"]["AWV2_Freigabe"])
ret.stockAeb = bool(ext_cp.vl["ACC_10"]["ANB_Teilbremsung_Freigabe"]) or bool(ext_cp.vl["ACC_10"]["ANB_Zielbremsung_Freigabe"])
self.acc_type = ext_cp.vl["ACC_06"]["ACC_Typ"]
self.esp_hold_confirmation = bool(pt_cp.vl["ESP_21"]["ESP_Haltebestaetigung"])
acc_limiter_mode = ext_cp.vl["ACC_02"]["ACC_Gesetzte_Zeitluecke"] == 0
speed_limiter_mode = bool(pt_cp.vl["TSK_06"]["TSK_Limiter_ausgewaehlt"])
ret.cruiseState.available = pt_cp.vl["TSK_06"]["TSK_Status"] in (2, 3, 4, 5)
ret.cruiseState.enabled = pt_cp.vl["TSK_06"]["TSK_Status"] in (3, 4, 5)
ret.cruiseState.speed = ext_cp.vl["ACC_02"]["ACC_Wunschgeschw_02"] * CV.KPH_TO_MS if self.CP.pcmCruise else 0
ret.accFaulted = pt_cp.vl["TSK_06"]["TSK_Status"] in (6, 7)
ret.leftBlinker = bool(pt_cp.vl["Blinkmodi_02"]["Comfort_Signal_Left"])
ret.rightBlinker = bool(pt_cp.vl["Blinkmodi_02"]["Comfort_Signal_Right"])
# Shared logic
ret.vEgoCluster = pt_cp.vl["Kombi_01"]["KBI_angez_Geschw"] * CV.KPH_TO_MS
self.parse_mlb_mqb_steering_state(ret, pt_cp)
ret.gasPressed = pt_cp.vl["Motor_20"]["MO_Fahrpedalrohwert_01"] > 0
ret.espActive = bool(pt_cp.vl["ESP_21"]["ESP_Eingriff"])
ret.espDisabled = pt_cp.vl["ESP_21"]["ESP_Tastung_passiv"] != 0
ret.seatbeltUnlatched = pt_cp.vl["Airbag_02"]["AB_Gurtschloss_FA"] != 3
ret.standstill = ret.vEgoRaw == 0
ret.cruiseState.standstill = self.CP.pcmCruise and self.esp_hold_confirmation
ret.cruiseState.nonAdaptive = acc_limiter_mode or speed_limiter_mode
if ret.cruiseState.speed > 90:
ret.cruiseState.speed = 0
self.eps_stock_values = pt_cp.vl["LH_EPS_03"]
self.ldw_stock_values = cam_cp.vl["LDW_02"] if self.CP.networkLocation == NetworkLocation.fwdCamera else {}
self.gra_stock_values = pt_cp.vl["GRA_ACC_01"]
ret.buttonEvents = self.create_button_events(pt_cp, self.CCP.BUTTONS)
ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo)
self.frame += 1
return ret, ret_sp
def update_pq(self, pt_cp, cam_cp, ext_cp) -> tuple[structs.CarState, structs.CarStateSP]:
ret = structs.CarState()
ret_sp = structs.CarStateSP()
# vEgo obtained from Bremse_1 vehicle speed rather than Bremse_3 wheel speeds because Bremse_3 isn't present on NSF
ret.vEgoRaw = pt_cp.vl["Bremse_1"]["BR1_Rad_kmh"] * CV.KPH_TO_MS
ret.vEgo, ret.aEgo = self.update_speed_kf(ret.vEgoRaw)
ret.standstill = ret.vEgoRaw == 0
# Update EPS position and state info. For signed values, VW sends the sign in a separate signal.
ret.steeringAngleDeg = pt_cp.vl["Lenkhilfe_3"]["LH3_BLW"] * (1, -1)[int(pt_cp.vl["Lenkhilfe_3"]["LH3_BLWSign"])]
ret.steeringRateDeg = pt_cp.vl["Lenkwinkel_1"]["LW1_Lenk_Gesch"] * (1, -1)[int(pt_cp.vl["Lenkwinkel_1"]["LW1_Gesch_Sign"])]
ret.steeringTorque = pt_cp.vl["Lenkhilfe_3"]["LH3_LM"] * (1, -1)[int(pt_cp.vl["Lenkhilfe_3"]["LH3_LMSign"])]
ret.steeringPressed = abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE
hca_status = self.CCP.hca_status_values.get(pt_cp.vl["Lenkhilfe_2"]["LH2_Sta_HCA"])
ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status)
# Update gas, brakes, and gearshift.
ret.gasPressed = pt_cp.vl["Motor_3"]["MO3_Pedalwert"] > 0
ret.brake = pt_cp.vl["Bremse_5"]["BR5_Bremsdruck"] / 250.0 # FIXME: this is pressure in Bar, not sure what OP expects
ret.brakePressed = bool(pt_cp.vl["Motor_2"]["MO2_BLS"])
ret.parkingBrake = bool(pt_cp.vl["Kombi_1"]["Bremsinfo"])
# Update gear and/or clutch position data.
if self.CP.transmissionType == TransmissionType.automatic:
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Getriebe_1"]["GE1_Wahl_Pos"], None))
elif self.CP.transmissionType == TransmissionType.manual:
reverse_light = bool(pt_cp.vl["Gate_Komf_1"]["GK1_Rueckfahr"])
if reverse_light:
ret.gearShifter = GearShifter.reverse
else:
ret.gearShifter = GearShifter.drive
# Update door and trunk/hatch lid open status.
ret.doorOpen = any([pt_cp.vl["Gate_Komf_1"]["GK1_Fa_Tuerkont"],
pt_cp.vl["Gate_Komf_1"]["BSK_BT_geoeffnet"],
pt_cp.vl["Gate_Komf_1"]["BSK_HL_geoeffnet"],
pt_cp.vl["Gate_Komf_1"]["BSK_HR_geoeffnet"],
pt_cp.vl["Gate_Komf_1"]["BSK_HD_Hauptraste"]])
# Update seatbelt fastened status.
ret.seatbeltUnlatched = not bool(pt_cp.vl["Airbag_1"]["Gurtschalter_Fahrer"])
# Consume blind-spot monitoring info/warning LED states, if available.
# Infostufe: BSM LED on, Warnung: BSM LED flashing
if self.CP.enableBsm:
ret.leftBlindspot = bool(ext_cp.vl["SWA_1"]["SWA_Infostufe_SWA_li"]) or bool(ext_cp.vl["SWA_1"]["SWA_Warnung_SWA_li"])
ret.rightBlindspot = bool(ext_cp.vl["SWA_1"]["SWA_Infostufe_SWA_re"]) or bool(ext_cp.vl["SWA_1"]["SWA_Warnung_SWA_re"])
# Consume factory LDW data relevant for factory SWA (Lane Change Assist)
# and capture it for forwarding to the blind spot radar controller
self.ldw_stock_values = cam_cp.vl["LDW_Status"] if self.CP.networkLocation == NetworkLocation.fwdCamera else {}
# Stock FCW is considered active if the release bit for brake-jerk warning
# is set. Stock AEB considered active if the partial braking or target
# braking release bits are set.
# Refer to VW Self Study Program 890253: Volkswagen Driver Assistance
# Systems, chapters on Front Assist with Braking and City Emergency
# Braking for the 2016 Passat NMS
# TODO: deferred until we can collect data on pre-MY2016 behavior, AWV message may be shorter with fewer signals
ret.stockFcw = False
ret.stockAeb = False
# Update ACC radar status.
self.acc_type = ext_cp.vl["ACC_System"]["ACS_Typ_ACC"]
ret.cruiseState.available = bool(pt_cp.vl["Motor_5"]["MO5_GRA_Hauptsch"])
ret.cruiseState.enabled = pt_cp.vl["Motor_2"]["MO2_Sta_GRA"] in (1, 2)
if self.CP.pcmCruise:
ret.accFaulted = ext_cp.vl["ACC_GRA_Anzeige"]["ACA_StaACC"] in (6, 7)
else:
ret.accFaulted = pt_cp.vl["Motor_2"]["MO2_Sta_GRA"] == 3
# Update ACC setpoint. When the setpoint reads as 255, the driver has not
# yet established an ACC setpoint, so treat it as zero.
ret.cruiseState.speed = ext_cp.vl["ACC_GRA_Anzeige"]["ACA_V_Wunsch"] * CV.KPH_TO_MS
if ret.cruiseState.speed > 70: # 255 kph in m/s == no current setpoint
ret.cruiseState.speed = 0
# Update button states for turn signals and ACC controls, capture all ACC button state/config for passthrough
ret.leftBlinker, ret.rightBlinker = self.update_blinker_from_stalk(300, pt_cp.vl["Gate_Komf_1"]["GK1_Blinker_li"],
pt_cp.vl["Gate_Komf_1"]["GK1_Blinker_re"])
ret.buttonEvents = self.create_button_events(pt_cp, self.CCP.BUTTONS)
self.gra_stock_values = pt_cp.vl["GRA_Neu"]
# Additional safety checks performed in CarInterface.
ret.espDisabled = bool(pt_cp.vl["Bremse_1"]["BR1_ESPASR_passive"])
ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo)
self.frame += 1
return ret, ret_sp
def update_mlb(self, pt_cp, cam_cp, ext_cp, alt_cp) -> structs.CarState:
ret = structs.CarState()
ret_sp = structs.CarStateSP()
self.parse_wheel_speeds(ret,
pt_cp.vl["ESP_03"]["ESP_VL_Radgeschw"],
pt_cp.vl["ESP_03"]["ESP_VR_Radgeschw"],
pt_cp.vl["ESP_03"]["ESP_HL_Radgeschw"],
pt_cp.vl["ESP_03"]["ESP_HR_Radgeschw"],
)
ret.gasPressed = pt_cp.vl["Motor_03"]["MO_Fahrpedalrohwert_01"] > 0
ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(alt_cp.vl["Getriebe_03"]["GE_Waehlhebel"], None))
# TODO: We don't have a true mainswitch state yet, might need stateful tracking on LS_01 if momentary-press is a thing
# TSK_04.TSK_Status_GRA_ACC_02 0 = not engaged, 1 = engaged, 2 = engaged with driver accel override, 3 = fault
ret.cruiseState.available = alt_cp.vl["TSK_04"]["TSK_Status_GRA_ACC_02"] in (0, 1, 2)
ret.cruiseState.available = alt_cp.vl["TSK_04"]["TSK_Status_GRA_ACC_02"] in (1, 2)
ret.accFaulted = alt_cp.vl["TSK_04"]["TSK_Status_GRA_ACC_02"] == 3
ret.cruiseState.speed = ext_cp.vl["ACC_02"]["ACC_Wunschgeschw_02"] * CV.KPH_TO_MS
self.parse_mlb_mqb_steering_state(ret, pt_cp)
ret.brake = pt_cp.vl["ESP_05"]["ESP_Bremsdruck"] / 250.0
brake_pedal_pressed = bool(pt_cp.vl["Motor_03"]["MO_Fahrer_bremst"])
brake_pressure_detected = bool(pt_cp.vl["ESP_05"]["ESP_Fahrer_bremst"])
ret.brakePressed = brake_pedal_pressed or brake_pressure_detected
ret.parkingBrake = bool(pt_cp.vl["Kombi_01"]["KBI_Handbremse"])
ret.espDisabled = pt_cp.vl["ESP_01"]["ESP_Tastung_passiv"] != 0
ret.leftBlinker = bool(pt_cp.vl["BCM"]["BLINKER_LEFT"])
ret.rightBlinker = bool(pt_cp.vl["BCM"]["BLINKER_RIGHT"])
ret.seatbeltUnlatched = bool(pt_cp.vl["Airbag_01"]["AB_Gurtwarn_VF"])
ret.doorOpen = any([alt_cp.vl["Gateway_05"]["FT_Tuer_geoeffnet"],
alt_cp.vl["Gateway_05"]["BT_Tuer_geoeffnet"],
alt_cp.vl["Gateway_05"]["HL_Tuer_geoeffnet"],
alt_cp.vl["Gateway_05"]["HR_Tuer_geoeffnet"]])
# Consume blind-spot monitoring info/warning LED states, if available.
# Infostufe: BSM LED on, Warnung: BSM LED flashing
if self.CP.enableBsm:
ret.leftBlindspot = bool(ext_cp.vl["SWA_01"]["SWA_Infostufe_SWA_li"]) or bool(ext_cp.vl["SWA_01"]["SWA_Warnung_SWA_li"])
ret.rightBlindspot = bool(ext_cp.vl["SWA_01"]["SWA_Infostufe_SWA_re"]) or bool(ext_cp.vl["SWA_01"]["SWA_Warnung_SWA_re"])
self.ldw_stock_values = cam_cp.vl["LDW_02"] if self.CP.networkLocation == NetworkLocation.fwdCamera else {}
self.gra_stock_values = pt_cp.vl["LS_01"]
ret.buttonEvents = self.create_button_events(pt_cp, self.CCP.BUTTONS)
ret.cruiseState.standstill = self.CP.pcmCruise and self.esp_hold_confirmation
ret.standstill = ret.vEgoRaw == 0
self.frame += 1
return ret, ret_sp
def update_low_speed_alert(self, v_ego: float) -> bool:
# Low speed steer alert hysteresis logic
if (self.CP.minSteerSpeed - 1e-3) > CarControllerParams.DEFAULT_MIN_STEER_SPEED and v_ego < (self.CP.minSteerSpeed + 1.):
self.low_speed_alert = True
elif v_ego > (self.CP.minSteerSpeed + 2.):
self.low_speed_alert = False
return self.low_speed_alert
def parse_mlb_mqb_steering_state(self, ret, pt_cp, drive_mode=True):
ret.steeringAngleDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradwinkel"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradwinkel"])]
ret.steeringRateDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradw_Geschw"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradw_Geschw"])]
ret.steeringTorque = pt_cp.vl["LH_EPS_03"]["EPS_Lenkmoment"] * (1, -1)[int(pt_cp.vl["LH_EPS_03"]["EPS_VZ_Lenkmoment"])]
ret.steeringPressed = abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE
hca_status = self.CCP.hca_status_values.get(pt_cp.vl["LH_EPS_03"]["EPS_HCA_Status"])
ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status, drive_mode)
return
def update_hca_state(self, hca_status, drive_mode=True):
# Treat FAULT as temporary for worst likely EPS recovery time, for cars without factory Lane Assist
# DISABLED means the EPS hasn't been configured to support Lane Assist
self.eps_init_complete = self.eps_init_complete or (hca_status in ("DISABLED", "READY", "ACTIVE") or self.frame > 600)
perm_fault = drive_mode and hca_status == "DISABLED" or (self.eps_init_complete and hca_status == "FAULT")
temp_fault = drive_mode and hca_status in ("REJECTED", "PREEMPTED") or not self.eps_init_complete
return temp_fault, perm_fault
@staticmethod
def get_can_parsers(CP, CP_SP):
if CP.flags & VolkswagenFlags.PQ:
return CarState.get_can_parsers_pq(CP)
# manually configure some optional and variable-rate/edge-triggered messages
pt_messages, cam_messages, alt_messages = [], [], []
if not CP.flags & VolkswagenFlags.MLB:
pt_messages += [
("Blinkmodi_02", 1) # From J519 BCM (sent at 1Hz when no lights active, 50Hz when active)
]
if CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT:
cam_messages += [
("HCA_01", 1), # From R242 Driver assistance camera, 50Hz if steering/1Hz if not
]
return {
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, CanBus(CP).pt),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], cam_messages, CanBus(CP).cam),
Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], alt_messages, CanBus(CP).alt),
}
@staticmethod
def get_can_parsers_pq(CP):
return {
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).pt),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).cam),
Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).alt),
}