This commit is contained in:
firestar5683
2026-05-04 19:30:51 -05:00
parent 7837a3c1aa
commit aed0b7fdb9
18 changed files with 1612 additions and 0 deletions
@@ -0,0 +1 @@
@@ -0,0 +1,64 @@
from numpy import clip, interp
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.tesla.preap.nap_conf import (
ACCEL_MAX,
PEDAL_BP,
PEDAL_DI_MIN,
PEDAL_DI_ZERO,
PEDAL_MAX_VALUES,
REGEN_MAX,
nap_conf,
)
PEDAL_RAMP_RATE_UP = 5.0
PEDAL_RAMP_RATE_DOWN = 2.5
ACCEL_DEADBAND = 0.15
PEDAL_HYST_GAP = 1.0
TORQUE_LEVEL_ACC = 0.0
TORQUE_LEVEL_DECEL = -30.0
ZERO_TORQUE_MIN_SPEED = 10.0 * CV.MPH_TO_MS
class PedalZeroTorque:
def __init__(self):
self.value = PEDAL_DI_ZERO
self._best_torque = TORQUE_LEVEL_DECEL
def update(self, torque_level: float, current_pedal_di: float, v_ego: float) -> None:
if v_ego < ZERO_TORQUE_MIN_SPEED:
return
if TORQUE_LEVEL_DECEL < torque_level < TORQUE_LEVEL_ACC and abs(torque_level) < abs(self._best_torque):
self.value = current_pedal_di
self._best_torque = torque_level
def get(self, v_ego: float) -> float:
if v_ego < 5.0 * CV.MPH_TO_MS:
return PEDAL_DI_ZERO
return self.value
_zero_torque = PedalZeroTorque()
def get_zero_torque() -> PedalZeroTorque:
return _zero_torque
def compute_pedal_command(accel_request: float, v_ego: float, prev_pedal_di: float) -> tuple[float, float]:
max_pedal_value = float(interp(v_ego, PEDAL_BP, PEDAL_MAX_VALUES))
zero_torque_di = _zero_torque.get(v_ego)
if abs(accel_request) < ACCEL_DEADBAND:
accel_request = 0.0
pedal_di = float(interp(accel_request, [REGEN_MAX, 0.0, ACCEL_MAX], [PEDAL_DI_MIN, zero_torque_di, max_pedal_value]))
pedal_di = float(clip(pedal_di, PEDAL_DI_MIN, max_pedal_value))
pedal_di = float(clip(pedal_di, prev_pedal_di - PEDAL_RAMP_RATE_DOWN, prev_pedal_di + PEDAL_RAMP_RATE_UP))
if abs(pedal_di - prev_pedal_di) < PEDAL_HYST_GAP:
pedal_di = prev_pedal_di
return nap_conf.di_to_pedal(pedal_di), pedal_di
@@ -0,0 +1 @@
@@ -0,0 +1,90 @@
import numpy as np
from opendbc.can import CANPacker
from opendbc.car import Bus
from opendbc.car.carlog import carlog
from opendbc.car.tesla.pedal.controller import get_zero_torque
from opendbc.car.tesla.preap.interface import get_preap_accel_limits
from opendbc.car.tesla.preap.nap_conf import PEDAL_DI_MIN, PEDAL_DI_ZERO, nap_conf
from opendbc.car.tesla.preap.teslacan import TeslaCANPreAP
from opendbc.car.tesla.preap.virtual_das import VirtualDAS
from opendbc.car.tesla.values import CANBUS, CruiseButtons
ENGAGE_GRACE_FRAMES = 50
def init_preap_can(dbc_names):
tesla_can = TeslaCANPreAP(CANPacker(dbc_names[Bus.party]))
tesla_can.pedal_can_bus = nap_conf.pedal_can_bus
return tesla_can
class PreAPLongController:
def __init__(self):
self.prev_pedal_di = 0.0
self.prev_requested_long = False
self.preap_long_engage_frame = -1_000_000
self.engage_a_max = 0.0
self.vdas = VirtualDAS(dt=0.02)
def update(self, CC, CS, frame: int, tesla_can, can_bus_party: int):
can_sends = []
actuators = CC.actuators
requested_long = CS.cruiseEnabled and CS.enableLongControl
long_active = requested_long and CC.longActive
use_pedal = nap_conf.use_pedal and self._pedal_ready()
if (not self.prev_requested_long) and requested_long:
self.preap_long_engage_frame = frame
zero_torque_di = get_zero_torque().get(CS.out.vEgo)
self.prev_pedal_di = max(CS.pedal_interceptor_value, zero_torque_di)
self.vdas.reset(a_init=0.0, pedal_di_init=self.prev_pedal_di)
_, self.engage_a_max = get_preap_accel_limits(CS.out.vEgo)
if use_pedal:
pedal_button_press = CS.cruise_buttons != CS.prev_cruise_buttons and CS.cruise_buttons != CruiseButtons.IDLE
if ((not self.prev_requested_long) and requested_long) or (self.prev_requested_long and not requested_long) or pedal_button_press:
CS.preap_cc_cancel_needed = True
self.prev_requested_long = requested_long
if frame % 2 == 0:
if use_pedal:
get_zero_torque().update(CS.pedal.torque_level, self.prev_pedal_di, CS.out.vEgo)
if requested_long and use_pedal:
try:
if CS.out.gasPressed:
self.prev_pedal_di = max(CS.pedal_interceptor_value, PEDAL_DI_ZERO)
can_sends.append(tesla_can.create_pedal_command(0, enable=0))
elif long_active:
accel_request = float(actuators.accel)
engage_elapsed_frames = frame - self.preap_long_engage_frame
in_engage_grace = engage_elapsed_frames < ENGAGE_GRACE_FRAMES
if in_engage_grace:
accel_request = max(0.0, min(accel_request, (engage_elapsed_frames / ENGAGE_GRACE_FRAMES) * self.engage_a_max))
self.prev_pedal_di = self.vdas.update(
accel_request, CS.out.vEgo, self.prev_pedal_di, a_ego=CS.out.aEgo,
freeze_integrator=in_engage_grace, orientation_ned=list(CC.orientationNED),
)
can_sends.append(tesla_can.create_pedal_command(nap_conf.di_to_pedal(self.prev_pedal_di), enable=1))
else:
zero_torque_di = get_zero_torque().get(CS.out.vEgo)
self.prev_pedal_di = zero_torque_di
can_sends.append(tesla_can.create_pedal_command(nap_conf.di_to_pedal(zero_torque_di), enable=1))
except Exception:
carlog.exception("Pre-AP pedal command failed; sending disabled")
can_sends.append(tesla_can.create_pedal_command(nap_conf.di_to_pedal(PEDAL_DI_ZERO), enable=0))
self.prev_pedal_di = 0.0
else:
if nap_conf.use_pedal:
can_sends.append(tesla_can.create_pedal_command(nap_conf.di_to_pedal(PEDAL_DI_ZERO), enable=0))
self.prev_pedal_di = 0.0
return can_sends
@staticmethod
def _pedal_ready() -> bool:
return nap_conf.pedal_calibrated and abs(nap_conf.pedal_factor) > 1e-6
@@ -0,0 +1,129 @@
import copy
import math
import time
from cereal import custom
from opendbc.can import CANParser
from opendbc.car import Bus, structs
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.tesla.preap.engagement import PreAPEngagement
from opendbc.car.tesla.preap.nap_conf import PEDAL_DI_PRESSED, nap_conf
from opendbc.car.tesla.preap.pedal_feedback import PedalFeedback
from opendbc.car.tesla.values import CANBUS, DBC, GEAR_MAP, STEER_THRESHOLD
_DOORS = ("DOOR_STATE_FL", "DOOR_STATE_FR", "DOOR_STATE_RL", "DOOR_STATE_RR", "DOOR_STATE_FrontTrunk", "BOOT_STATE")
def _current_time_millis() -> int:
return int(round(time.time() * 1000))
def update_preap(cs, can_parsers):
cp_ap_party = can_parsers[Bus.ap_party]
cp_pt = can_parsers[Bus.pt]
cp_chassis = can_parsers[Bus.chassis]
ret = structs.CarState()
fp_ret = custom.StarPilotCarState.new_message()
ret.vEgoRaw = cp_chassis.vl["ESP_B"]["ESP_vehicleSpeed"] * CV.KPH_TO_MS
ret.vEgo, ret.aEgo = cs.update_speed_kf(ret.vEgoRaw)
ret.gasPressed = cp_pt.vl["DI_torque1"]["DI_pedalPos"] > PEDAL_DI_PRESSED
real_brake_pressed = cp_chassis.vl["BrakeMessage"]["driverBrakeStatus"] == 2
ret.brake = 0
ret.brakePressed = real_brake_pressed
epas_status = cp_chassis.vl["EPAS_sysStatus"]
cs.hands_on_level = epas_status["EPAS_handsOnLevel"]
ret.steeringAngleDeg = -epas_status["EPAS_internalSAS"]
ret.steeringRateDeg = -cp_chassis.vl["STW_ANGLHP_STAT"]["StW_AnglHP_Spd"]
ret.steeringTorque = -epas_status["EPAS_torsionBarTorque"]
ret.steeringPressed = cs.update_steering_pressed(abs(ret.steeringTorque) > STEER_THRESHOLD, 5)
eac_status = cs.can_define.dv["EPAS_sysStatus"]["EPAS_eacStatus"].get(int(epas_status["EPAS_eacStatus"]), None)
ret.steerFaultPermanent = eac_status == "EAC_FAULT"
ret.steerFaultTemporary = False
eac_error_code = cs.can_define.dv["EPAS_sysStatus"]["EPAS_eacErrorCode"].get(int(epas_status["EPAS_eacErrorCode"]), None)
ret.steeringDisengage = cs.hands_on_level >= 3 or (eac_status == "EAC_INHIBITED" and eac_error_code in (
"EAC_ERROR_HIGH_ANGLE_REQ", "EAC_ERROR_HIGH_ANGLE_RATE_REQ", "EAC_ERROR_HIGH_ANGLE_SAFETY", "EAC_ERROR_HIGH_ANGLE_RATE_SAFETY",
))
cs.engagement.handle_steering_disengage(ret.steeringDisengage)
cruise_state = cs.can_define.dv["DI_state"]["DI_cruiseState"].get(int(cp_chassis.vl["DI_state"]["DI_cruiseState"]), None)
cs.di_cruise_state = cruise_state or "OFF"
speed_units = cs.can_define.dv["DI_state"]["DI_speedUnits"].get(int(cp_chassis.vl["DI_state"]["DI_speedUnits"]), None)
if speed_units is not None:
cs.speed_units = speed_units
pedal_transform_valid = math.isfinite(nap_conf.pedal_factor) and abs(nap_conf.pedal_factor) > 1e-6
use_pedal = nap_conf.use_pedal and cs.CP.openpilotLongitudinalControl and pedal_transform_valid
pedal_long_allowed = use_pedal
long_control_allowed = True if not use_pedal else pedal_long_allowed
ret.cruiseState.available = True
if cs.enableLongControl and use_pedal:
ret.cruiseState.speed = cs.pedal_speed_kph * CV.KPH_TO_MS
elif speed_units == "KPH":
ret.cruiseState.speed = max(cp_chassis.vl["DI_state"]["DI_digitalSpeed"] * CV.KPH_TO_MS, 1e-3)
elif speed_units == "MPH":
ret.cruiseState.speed = max(cp_chassis.vl["DI_state"]["DI_digitalSpeed"] * CV.MPH_TO_MS, 1e-3)
ret.cruiseState.standstill = False
ret.standstill = cruise_state == "STANDSTILL"
ret.accFaulted = cruise_state == "FAULT"
ret.gearShifter = GEAR_MAP[cs.can_define.dv["DI_torque2"]["DI_gear"].get(int(cp_chassis.vl["DI_torque2"]["DI_gear"]), "DI_GEAR_INVALID")]
ret.doorOpen = any((cs.can_define.dv["GTW_carState"][door].get(int(cp_chassis.vl["GTW_carState"][door]), "OPEN") == "OPEN") for door in _DOORS)
ret.leftBlinker = cp_chassis.vl["GTW_carState"]["BC_indicatorLStatus"] == 1
ret.rightBlinker = cp_chassis.vl["GTW_carState"]["BC_indicatorRStatus"] == 1
ret.seatbeltUnlatched = False
ret.stockAeb = False
ret.stockLkas = False
cs.prev_cruise_buttons = cs.cruise_buttons
cs.cruise_buttons = int(cp_chassis.vl["STW_ACTN_RQ"]["SpdCtrlLvr_Stat"])
cs.msg_stw_actn_req = copy.copy(cp_chassis.vl["STW_ACTN_RQ"])
curr_time_ms = _current_time_millis()
ret.buttonEvents = cs.engagement.process_buttons(
cs.cruise_buttons, cs.prev_cruise_buttons, curr_time_ms, ret.vEgo, cs.speed_units,
use_pedal, pedal_long_allowed, long_control_allowed, real_brake_pressed, cs.di_cruise_state,
)
ret.brakePressed = False
can_engage = cs.engagement.check_can_engage(ret.doorOpen, ret.gearShifter, ret.seatbeltUnlatched)
ret.cruiseState.enabled = cs.engagement.cruiseEnabled and can_engage
cs.cruiseEnabled = cs.engagement.cruiseEnabled
cs.enableLongControl = cs.engagement.enableLongControl
cs.enableJustCC = cs.engagement.enableJustCC
cs.pedal_speed_kph = cs.engagement.pedal_speed_kph
cs.preap_cc_cancel_needed = cs.engagement.preap_cc_cancel_needed
cs.preap_cc_engage_needed = cs.engagement.preap_cc_engage_needed
if nap_conf.use_pedal:
gas_sensor = cp_ap_party.vl.get("GAS_SENSOR", {})
cs.pedal.update(gas_sensor, curr_time_ms)
cs.pedal.update_torque(cp_pt.vl.get("DI_torque1", {}))
cs.pedal_interceptor_value = cs.pedal.interceptor_value
cs.pedal_timeout = cs.pedal.timeout
if use_pedal:
ret.gasPressed = cs.pedal.gas_pressed
cs.cruise_enabled_prev = ret.cruiseState.enabled
return ret, fp_ret
def get_preap_can_parsers(CP):
chassis_messages = [
("ESP_B", 0), ("BrakeMessage", 0), ("DI_state", 0), ("DI_torque2", 0),
("GTW_carState", 0), ("STW_ANGLHP_STAT", 0), ("EPAS_sysStatus", 0), ("STW_ACTN_RQ", 0),
]
pt_messages = [("DI_torque1", 0)]
pedal_messages = [("GAS_SENSOR", 50)] if nap_conf.use_pedal else []
return {
Bus.party: CANParser(DBC[CP.carFingerprint][Bus.party], [], CANBUS.party),
Bus.ap_party: CANParser(DBC[CP.carFingerprint][Bus.party], pedal_messages, nap_conf.pedal_can_bus),
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, CANBUS.party),
Bus.chassis: CANParser(DBC[CP.carFingerprint][Bus.chassis], chassis_messages, CANBUS.party),
}
@@ -0,0 +1,16 @@
ACCEL_PREAP_BP = [0.0, 1.3, 7.5, 15.0, 25.0, 40.0]
ACCEL_PREAP_FOLLOW = [0.35, 0.55, 0.80, 0.75, 0.65, 0.50]
ACCEL_PREAP_PROFILES = {
0: [0.3, 0.8, 1.1, 1.0, 0.85, 0.7],
1: [0.3, 0.7, 1.0, 0.9, 0.8, 0.65],
2: [0.3, 0.6, 0.9, 0.8, 0.7, 0.55],
}
PEDAL_LONG_K_BP = [0.0, 3.0, 6.0, 35.0]
PEDAL_LONG_KP_V = [0.0, 0.0, 0.0, 0.0]
PEDAL_LONG_KI_V = [0.05, 0.08, 0.10, 0.15]
VDAS_INNER_K_BP = [0.0, 5.0, 35.0]
VDAS_INNER_KP_V = [0.0, 0.0, 0.0]
VDAS_INNER_KI_V = [0.3, 0.2, 0.15]
VDAS_FUTURE_T_BP = [2.0, 5.0]
VDAS_FUTURE_T_V = [0.30, 0.55]
VDAS_AEGO_FILTER_RC = 0.25
@@ -0,0 +1,148 @@
from opendbc.car import structs
from opendbc.car.carlog import carlog
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.tesla.values import CruiseButtons
ButtonType = structs.CarState.ButtonEvent.Type
CANCEL_ECHO_WINDOW_MS = 600
SPOOF_ECHO_WINDOW_MS = 300
class PreAPEngagement:
def __init__(self, double_pull_enabled: bool, double_pull_window_ms: int):
self.enableDoublePull = double_pull_enabled
self.double_pull_window_ms = double_pull_window_ms
self.cruiseEnabled = False
self.enableLongControl = False
self.enableJustCC = False
self.pending_enable = False
self.stalk_pull_time_ms = 0
self.prev_stalk_pull_time_ms = -1000
self.pedal_speed_kph = 0.0
self.preap_cc_cancel_needed = False
self.preap_cc_engage_needed = False
self.preap_last_cc_spoof_ms = 0
self.preap_brake_pressed_prev = False
self.last_stalk_non_cancel_ms = -10000
self.prev_steering_disengage = False
def handle_steering_disengage(self, steering_disengage: bool) -> None:
if steering_disengage and not self.prev_steering_disengage:
self.cruiseEnabled = False
self.enableLongControl = False
self.enableJustCC = False
self.pending_enable = False
self.pedal_speed_kph = 0.0
self.stalk_pull_time_ms = 0
self.prev_stalk_pull_time_ms = -1000
self.prev_steering_disengage = steering_disengage
def process_buttons(self, cruise_buttons: int, prev_cruise_buttons: int, curr_time_ms: int, v_ego: float, speed_units: str,
use_pedal: bool, pedal_long_allowed: bool, long_control_allowed: bool, real_brake_pressed: bool,
di_cruise_state: str = "OFF") -> list[structs.CarState.ButtonEvent]:
self.preap_cc_cancel_needed = False
self.preap_cc_engage_needed = False
button_events: list[structs.CarState.ButtonEvent] = []
if cruise_buttons == CruiseButtons.MAIN and prev_cruise_buttons != CruiseButtons.MAIN:
if self.enableDoublePull:
self._handle_double_pull(curr_time_ms, v_ego, speed_units, use_pedal, pedal_long_allowed, long_control_allowed, di_cruise_state)
else:
self.cruiseEnabled = True
self.enableLongControl = long_control_allowed
self.enableJustCC = not long_control_allowed
self.pedal_speed_kph = self._capture_target_speed(v_ego, speed_units) if pedal_long_allowed else 0.0
if not use_pedal and di_cruise_state == "STANDBY":
self.preap_cc_engage_needed = True
self.preap_last_cc_spoof_ms = curr_time_ms
if cruise_buttons != prev_cruise_buttons:
button_events.append(self._make_button_event(cruise_buttons, prev_cruise_buttons, curr_time_ms, v_ego, speed_units, use_pedal))
if self.pending_enable and (curr_time_ms - self.stalk_pull_time_ms > self.double_pull_window_ms):
self.pending_enable = False
brake_rising_edge = real_brake_pressed and not self.preap_brake_pressed_prev
if use_pedal and brake_rising_edge and self.cruiseEnabled and self.enableLongControl:
self.enableLongControl = False
self.enableJustCC = True
self.pending_enable = False
self.pedal_speed_kph = 0.0
self.preap_brake_pressed_prev = real_brake_pressed
return button_events
def check_can_engage(self, door_open: bool, gear_shifter, seatbelt_unlatched: bool) -> bool:
can_engage = not door_open and gear_shifter == structs.CarState.GearShifter.drive and not seatbelt_unlatched
if not can_engage:
self.cruiseEnabled = False
self.enableLongControl = False
self.enableJustCC = False
self.pending_enable = False
return can_engage
def _handle_double_pull(self, curr_time_ms: int, v_ego: float, speed_units: str, use_pedal: bool,
pedal_long_allowed: bool, long_control_allowed: bool, di_cruise_state: str) -> None:
self.prev_stalk_pull_time_ms = self.stalk_pull_time_ms
self.stalk_pull_time_ms = curr_time_ms
double_pull = (self.stalk_pull_time_ms - self.prev_stalk_pull_time_ms) < self.double_pull_window_ms
self.cruiseEnabled = True
self.pending_enable = False
self.enableLongControl = long_control_allowed if double_pull else False
self.enableJustCC = not self.enableLongControl
self.pedal_speed_kph = self._capture_target_speed(v_ego, speed_units) if pedal_long_allowed and double_pull else 0.0
if double_pull:
if not use_pedal:
self.preap_cc_engage_needed = True
self.preap_last_cc_spoof_ms = curr_time_ms
else:
self.pending_enable = True
if not use_pedal:
self.preap_cc_cancel_needed = True
self.preap_last_cc_spoof_ms = curr_time_ms
def _make_button_event(self, cruise_buttons: int, prev_cruise_buttons: int, curr_time_ms: int,
v_ego: float, speed_units: str, use_pedal: bool) -> structs.CarState.ButtonEvent:
be = structs.CarState.ButtonEvent()
be.pressed = cruise_buttons != CruiseButtons.IDLE
state = cruise_buttons if be.pressed else prev_cruise_buttons
if state == CruiseButtons.MAIN:
be.type = ButtonType.setCruise
if be.pressed:
self.last_stalk_non_cancel_ms = curr_time_ms
elif state == CruiseButtons.CANCEL:
is_echo = (self.cruiseEnabled and (curr_time_ms - self.last_stalk_non_cancel_ms) < CANCEL_ECHO_WINDOW_MS) or \
((curr_time_ms - self.preap_last_cc_spoof_ms) < SPOOF_ECHO_WINDOW_MS)
be.type = ButtonType.unknown if is_echo else ButtonType.cancel
if not is_echo:
self.cruiseEnabled = False
self.enableLongControl = False
self.enableJustCC = False
self.pending_enable = False
self.pedal_speed_kph = 0.0
self.stalk_pull_time_ms = 0
self.prev_stalk_pull_time_ms = -1000
elif CruiseButtons.is_accel(state):
be.type = ButtonType.accelCruise
if be.pressed and use_pedal and self.enableLongControl:
speed_uom_kph = CV.MPH_TO_KPH if speed_units == "MPH" else 1.0
actual_kph = int(v_ego * CV.MS_TO_KPH / speed_uom_kph + 0.5) * speed_uom_kph
self.pedal_speed_kph = min(max(self.pedal_speed_kph, actual_kph) + (5 * speed_uom_kph if state == CruiseButtons.RES_ACCEL_2ND else speed_uom_kph), 270.0)
elif CruiseButtons.is_decel(state):
be.type = ButtonType.decelCruise
if be.pressed and use_pedal and self.enableLongControl:
speed_uom_kph = CV.MPH_TO_KPH if speed_units == "MPH" else 1.0
self.pedal_speed_kph = max(self.pedal_speed_kph - (5 * speed_uom_kph if state == CruiseButtons.DECEL_2ND else speed_uom_kph), 0.0)
else:
be.type = ButtonType.unknown
return be
@staticmethod
def _capture_target_speed(v_ego: float, speed_units: str) -> float:
speed_uom_kph = CV.MPH_TO_KPH if speed_units == "MPH" else 1.0
return max(int(v_ego * CV.MS_TO_KPH / speed_uom_kph + 0.5) * speed_uom_kph, 0.0)
@@ -0,0 +1,11 @@
SPEED_BP = [0.0, 5.0, 12.0, 20.0, 30.0, 40.0]
ACCEL_BP = [-1.5, -1.0, -0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5]
DEFAULT_TABLE = [
[-5.00, -3.33, -1.67, 0.00, 10.00, 20.00, 30.00, 40.00, 50.00],
[-5.00, -3.33, -1.67, 0.00, 11.60, 23.20, 34.80, 46.40, 58.00],
[-5.00, -3.33, -1.67, 0.00, 13.20, 26.40, 39.60, 52.80, 66.00],
[-5.00, -3.33, -1.67, 0.00, 14.80, 29.60, 44.40, 59.20, 74.00],
[-5.00, -3.33, -1.67, 0.00, 16.40, 32.80, 49.20, 65.60, 82.00],
[-5.00, -3.33, -1.67, 0.00, 18.00, 36.00, 54.00, 72.00, 90.00],
]
@@ -0,0 +1,56 @@
import numpy as np
from opendbc.car import get_safety_config, structs
from opendbc.car.tesla.preap.constants import ACCEL_PREAP_BP, ACCEL_PREAP_PROFILES, PEDAL_LONG_K_BP, PEDAL_LONG_KI_V, PEDAL_LONG_KP_V
from opendbc.car.tesla.preap.nap_conf import nap_conf
PREAP_FLAG_ENABLE_PEDAL = 1
PREAP_FLAG_RADAR_EMULATION = 2
PREAP_FLAG_RADAR_BEHIND_NOSECONE = 4
def get_preap_accel_limits(current_speed: float) -> tuple[float, float]:
try:
from openpilot.common.params import Params
personality = Params().get_int("LongitudinalPersonality")
except Exception:
personality = 1
profile = ACCEL_PREAP_PROFILES.get(personality, ACCEL_PREAP_PROFILES[1])
return -1.5, float(np.interp(current_speed, ACCEL_PREAP_BP, profile))
def get_preap_params(ret: structs.CarParams) -> structs.CarParams:
safety_flags = 0
if nap_conf.use_pedal:
safety_flags |= PREAP_FLAG_ENABLE_PEDAL
if nap_conf.radar_enabled:
safety_flags |= PREAP_FLAG_RADAR_EMULATION
if nap_conf.radar_behind_nosecone:
safety_flags |= PREAP_FLAG_RADAR_BEHIND_NOSECONE
use_pedal = nap_conf.use_pedal and nap_conf.pedal_calibrated
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.teslaPreap, safety_flags)]
ret.radarUnavailable = not nap_conf.radar_enabled
ret.steerControlType = structs.CarParams.SteerControlType.angle
ret.openpilotLongitudinalControl = use_pedal
ret.pcmCruise = not use_pedal
ret.alphaLongitudinalAvailable = False
ret.steerLimitTimer = 0.4
ret.steerActuatorDelay = 0.1
ret.steerAtStandstill = True
ret.vEgoStopping = 0.1
ret.vEgoStarting = 0.1
ret.stoppingDecelRate = 1.0
if use_pedal:
ret.longitudinalTuning.kpBP = PEDAL_LONG_K_BP
ret.longitudinalTuning.kpV = PEDAL_LONG_KP_V
ret.longitudinalTuning.kiBP = PEDAL_LONG_K_BP
ret.longitudinalTuning.kiV = PEDAL_LONG_KI_V
try:
ret.longitudinalTuning.kf = 1.0
except AttributeError:
pass
ret.longitudinalActuatorDelay = 0.4
return ret
@@ -0,0 +1,89 @@
from openpilot.common.params import Params
from opendbc.car.tesla.preap.nap_params import NAPParamKeys
_params = Params()
PEDAL_DI_MIN = -5.0
PEDAL_DI_ZERO = 0.0
PEDAL_DI_PRESSED = 2.0
ACCEL_MAX = 2.5
REGEN_MAX = -1.5
PEDAL_BP = [0.0, 5.0, 12.0, 20.0, 30.0, 40.0]
PEDAL_MAX_VALUES = [50.0, 58.0, 66.0, 74.0, 82.0, 90.0]
def transform_di_to_pedal(val: float, pedal_zero: float, pedal_factor: float) -> float:
return pedal_zero + (val - PEDAL_DI_ZERO) / (pedal_factor or 1.0)
def transform_pedal_to_di(val: float, pedal_zero: float, pedal_factor: float) -> float:
return PEDAL_DI_ZERO + (val - pedal_zero) * (pedal_factor or 1.0)
class NAPConf:
@property
def adaptive_accel(self) -> bool:
return _params.get_bool(NAPParamKeys.ADAPTIVE_ACCEL)
@property
def follow_distance(self) -> int:
return max(1, min(7, _params.get_int(NAPParamKeys.FOLLOW_DISTANCE)))
@property
def use_pedal(self) -> bool:
return _params.get_bool(NAPParamKeys.PEDAL_ENABLED)
@property
def pedal_calibrated(self) -> bool:
if not _params.get_bool(NAPParamKeys.PEDAL_CALIB_DONE):
return False
return abs(self.pedal_factor) > 1e-6
@property
def pedal_can_zero(self) -> bool:
return _params.get_int(NAPParamKeys.PEDAL_CAN_BUS) == 0
@property
def pedal_can_bus(self) -> int:
return 0 if self.pedal_can_zero else 2
@property
def pedal_factor(self) -> float:
return _params.get_float(NAPParamKeys.PEDAL_CALIB_FACTOR)
@property
def pedal_zero(self) -> float:
return _params.get_float(NAPParamKeys.PEDAL_CALIB_ZERO)
@property
def radar_enabled(self) -> bool:
return _params.get_bool(NAPParamKeys.RADAR_ENABLED)
@property
def radar_behind_nosecone(self) -> bool:
return _params.get_bool(NAPParamKeys.RADAR_BEHIND_NOSECONE)
@property
def radar_offset(self) -> float:
return _params.get_float(NAPParamKeys.RADAR_OFFSET)
@property
def double_pull_enabled(self) -> bool:
return True
@property
def double_pull_window_ms(self) -> int:
return 400
def get_pedal_profile_values(self) -> list[float]:
return PEDAL_MAX_VALUES
def di_to_pedal(self, val: float) -> float:
return transform_di_to_pedal(val, self.pedal_zero, self.pedal_factor)
def pedal_to_di(self, val: float) -> float:
return transform_pedal_to_di(val, self.pedal_zero, self.pedal_factor)
nap_conf = NAPConf()
@@ -0,0 +1,15 @@
class NAPParamKeys:
ADAPTIVE_ACCEL = "NAPAdaptiveAccel"
FOLLOW_DISTANCE = "NAPFollowDistance"
FORCE_PRE_AP = "NAPForcePreAP"
PEDAL_ENABLED = "NAPPedalEnabled"
PEDAL_PROFILE = "NAPPedalProfile"
PEDAL_CAN_BUS = "NAPPedalCanBus"
PEDAL_CALIB_DONE = "NAPPedalCalibDone"
PEDAL_CALIB_MIN = "NAPPedalCalibMin"
PEDAL_CALIB_MAX = "NAPPedalCalibMax"
PEDAL_CALIB_FACTOR = "NAPPedalCalibFactor"
PEDAL_CALIB_ZERO = "NAPPedalCalibZero"
RADAR_ENABLED = "NAPRadarEnabled"
RADAR_BEHIND_NOSECONE = "NAPRadarBehindNosecone"
RADAR_OFFSET = "NAPRadarOffset"
@@ -0,0 +1,41 @@
from opendbc.car.tesla.preap.nap_conf import PEDAL_DI_PRESSED, nap_conf
PEDAL_TIMEOUT_MS = 500
class PedalFeedback:
def __init__(self):
self.interceptor_value = 0.0
self.interceptor_value2 = 0.0
self.interceptor_state = 0
self.idx = 0
self.prev_idx = 0
self.last_seen_ms = 0
self.available = False
self.timeout = True
self.torque_level = 0.0
def update(self, gas_sensor_msg, curr_time_ms: int) -> bool:
if not gas_sensor_msg:
return False
self.prev_idx = self.idx
self.interceptor_value = float(nap_conf.pedal_to_di(float(gas_sensor_msg.get("INTERCEPTOR_GAS", 0.0))))
self.interceptor_value2 = float(nap_conf.pedal_to_di(float(gas_sensor_msg.get("INTERCEPTOR_GAS2", 0.0))))
self.interceptor_state = int(gas_sensor_msg.get("STATE", 0))
self.idx = int(gas_sensor_msg.get("IDX", 0))
if self.idx != self.prev_idx:
self.last_seen_ms = curr_time_ms
self.timeout = (curr_time_ms - self.last_seen_ms) > PEDAL_TIMEOUT_MS
self.available = (not self.timeout) and (self.interceptor_state == 0)
return True
def update_torque(self, di_torque1_msg) -> None:
self.torque_level = di_torque1_msg.get("DI_torqueMotor", 0.0)
@property
def gas_pressed(self) -> bool:
return self.interceptor_value > PEDAL_DI_PRESSED
@@ -0,0 +1,52 @@
from opendbc.car.tesla.values import CruiseButtons
_PHASE_IDLE = 0
_PHASE_ENGAGING = 1
CANCEL_DELAY_FRAMES = 10
CC_ENGAGE_TIMEOUT_FRAMES = 50
class StockCCSpoofer:
def __init__(self):
self.cc_engage_phase = _PHASE_IDLE
self.cc_engage_start_frame = 0
self.cancel_pending = False
self.cancel_frame = -1_000_000
def update(self, CS, frame: int, tesla_can, can_bus_party: int):
can_sends = []
if getattr(CS, "preap_cc_cancel_needed", False):
self.cancel_pending = True
self.cancel_frame = frame
self.cc_engage_phase = _PHASE_IDLE
CS.preap_cc_cancel_needed = False
if getattr(CS, "preap_cc_engage_needed", False) and self.cc_engage_phase == _PHASE_IDLE:
self.cc_engage_phase = _PHASE_ENGAGING
self.cc_engage_start_frame = frame
CS.preap_cc_engage_needed = False
if self.cancel_pending and (frame - self.cancel_frame) >= CANCEL_DELAY_FRAMES and frame % 10 == 0:
sent = self._send(CS, tesla_can, can_bus_party, CruiseButtons.CANCEL)
if sent is not None:
can_sends.append(sent)
self.cancel_pending = False
elif self.cc_engage_phase == _PHASE_ENGAGING and frame % 10 == 0:
if (frame - self.cc_engage_start_frame) >= CC_ENGAGE_TIMEOUT_FRAMES or getattr(CS, "di_cruise_state", "OFF") == "ENABLED":
self.cc_engage_phase = _PHASE_IDLE
else:
sent = self._send(CS, tesla_can, can_bus_party, CruiseButtons.SET_ACCEL)
if sent is not None:
can_sends.append(sent)
return can_sends
@staticmethod
def _send(CS, tesla_can, can_bus_party: int, button: int):
msg_stw = getattr(CS, "msg_stw_actn_req", None)
if msg_stw is None:
return None
counter = (int(msg_stw.get("MC_STW_ACTN_RQ", 0)) + 1) % 16
return tesla_can.create_action_request(button, can_bus_party, counter, msg_stw)
@@ -0,0 +1,93 @@
import struct
from ctypes import create_string_buffer
from opendbc.car.tesla.values import CANBUS
PEDAL_M1 = 0.050796813
PEDAL_M2 = 0.101593626
PEDAL_D = -22.85856576
GAS_COMMAND_ID = 0x551
_STW_CRC_POLY = 0x1D
_STW_DEFAULTS = {
"VSL_Enbl_Rq": 1, "DTR_Dist_Rq": 0, "TurnIndLvr_Stat": 0,
"HiBmLvr_Stat": 0, "WprWashSw_Psd": 0, "WprWash_R_Sw_Posn_V2": 0,
"StW_Lvr_Stat": 0, "StW_Cond_Flt": 0, "StW_Cond_Psd": 0,
"HrnSw_Psd": 0, "StW_Sw00_Psd": 0, "StW_Sw01_Psd": 0,
"StW_Sw02_Psd": 0, "StW_Sw03_Psd": 0, "StW_Sw04_Psd": 0,
"StW_Sw05_Psd": 0, "StW_Sw06_Psd": 0,
}
def _crc8_stw(data: bytes) -> int:
crc = 0xFF
for b in data:
crc ^= b
for _ in range(8):
crc = ((crc << 1) ^ _STW_CRC_POLY) & 0xFF if (crc & 0x80) else (crc << 1) & 0xFF
return crc ^ 0xFF
class TeslaCANPreAP:
def __init__(self, packer):
self.packer = packer
self.pedal_can_bus = 2
self.pedal_idx = 0
@staticmethod
def checksum(msg_id: int, dat: bytes) -> int:
return ((msg_id & 0xFF) + ((msg_id >> 8) & 0xFF) + sum(dat)) & 0xFF
def create_steering_control(self, counter: int, angle: float, enabled: bool):
values = {
"DAS_steeringControlCounter": counter,
"DAS_steeringAngleRequest": -angle,
"DAS_steeringHapticRequest": 0,
"DAS_steeringControlType": 1 if enabled else 0,
}
data = self.packer.make_can_msg("DAS_steeringControl", CANBUS.party, values)[1]
values["DAS_steeringControlChecksum"] = self.checksum(0x488, data[:3])
return self.packer.make_can_msg("DAS_steeringControl", CANBUS.party, values)
def create_epas_control(self, counter: int, mode: int):
values = {
"EPB_epasEACAllow": mode,
"EPB_epasControlCounter": counter,
"EPB_epasControlChecksum": 0,
}
data = self.packer.make_can_msg("EPB_epasControl", CANBUS.party, values)[1]
values["EPB_epasControlChecksum"] = self.checksum(0x214, data)
return self.packer.make_can_msg("EPB_epasControl", CANBUS.party, values)
def create_pedal_command(self, accel_command: float, enable: int = 1, pedal_can_bus: int | None = None):
if pedal_can_bus is None:
pedal_can_bus = self.pedal_can_bus
idx = self.pedal_idx
self.pedal_idx = (self.pedal_idx + 1) % 16
if enable == 1:
int_cmd1 = max(0, min(65534, int((accel_command - PEDAL_D) / PEDAL_M1)))
int_cmd2 = max(0, min(65534, int((accel_command - PEDAL_D) / PEDAL_M2)))
else:
int_cmd1 = 0
int_cmd2 = 0
msg = create_string_buffer(6)
struct.pack_into("BBBBB", msg, 0,
(int_cmd1 >> 8) & 0xFF, int_cmd1 & 0xFF,
(int_cmd2 >> 8) & 0xFF, int_cmd2 & 0xFF,
((enable << 7) + idx) & 0xFF)
struct.pack_into("B", msg, 5, self.checksum(GAS_COMMAND_ID, msg.raw))
return GAS_COMMAND_ID, bytes(msg.raw), pedal_can_bus
def create_action_request(self, button_to_press: int, bus: int, counter: int, msg_stw=None):
values = {"MC_STW_ACTN_RQ": counter, "CRC_STW_ACTN_RQ": 0, "SpdCtrlLvr_Stat": button_to_press}
if msg_stw is not None:
for key, default in _STW_DEFAULTS.items():
values[key] = msg_stw.get(key, default)
else:
values.update(_STW_DEFAULTS)
data = self.packer.make_can_msg("STW_ACTN_RQ", bus, values)[1]
values["CRC_STW_ACTN_RQ"] = _crc8_stw(data[:7])
return self.packer.make_can_msg("STW_ACTN_RQ", bus, values)
@@ -0,0 +1,139 @@
import math
from numpy import clip, interp
from opendbc.car.common.filter_simple import FirstOrderFilter, HighPassFilter
from opendbc.car.common.pid import PIDController
from opendbc.car.tesla.pedal.controller import PEDAL_RAMP_RATE_DOWN, PEDAL_RAMP_RATE_UP, get_zero_torque
from opendbc.car.tesla.preap.constants import (
VDAS_AEGO_FILTER_RC,
VDAS_FUTURE_T_BP,
VDAS_FUTURE_T_V,
VDAS_INNER_K_BP,
VDAS_INNER_KI_V,
VDAS_INNER_KP_V,
)
from opendbc.car.tesla.preap.ff_table_default import ACCEL_BP, DEFAULT_TABLE, SPEED_BP
from opendbc.car.tesla.preap.nap_conf import ACCEL_MAX, PEDAL_BP, PEDAL_DI_MIN, PEDAL_MAX_VALUES, REGEN_MAX
PID_ERROR_DEADBAND = 0.1
GRAVITY = 9.81
PITCH_LP_RC = 0.5
PITCH_HP_RC1 = 0.1
PITCH_HP_RC2 = 1.0
MAX_PITCH_COMPENSATION = 1.5
class GradeEstimator:
def __init__(self, dt: float = 0.02):
self.pitch_lp = FirstOrderFilter(0.0, PITCH_LP_RC, dt)
self.pitch_hp = HighPassFilter(0.0, PITCH_HP_RC1, PITCH_HP_RC2, dt)
def update(self, orientation_ned: list[float]) -> tuple[float, float]:
if len(orientation_ned) < 2:
return 0.0, 0.0
pitch = orientation_ned[1]
self.pitch_lp.update(pitch)
self.pitch_hp.update(pitch)
grade_accel = math.sin(self.pitch_lp.x) * GRAVITY
pitch_comp = float(clip(math.sin(self.pitch_hp.x) * GRAVITY, -MAX_PITCH_COMPENSATION, MAX_PITCH_COMPENSATION))
return grade_accel, pitch_comp
def reset(self) -> None:
self.pitch_lp.x = 0.0
self.pitch_hp.x = 0.0
self.pitch_hp._f1.x = 0.0
self.pitch_hp._f2.x = 0.0
class JerkLimiter:
def __init__(self, j_max: float = 2.5, dt: float = 0.02):
self.j_max = j_max
self.dt = dt
self.a_limited = 0.0
def update(self, a_cmd: float) -> float:
da_max = self.j_max * self.dt
self.a_limited += float(clip(a_cmd - self.a_limited, -da_max, da_max))
return self.a_limited
def reset(self, a_init: float = 0.0) -> None:
self.a_limited = a_init
class FeedforwardModel:
def __init__(self):
self.speed_bp = list(SPEED_BP)
self.accel_bp = list(ACCEL_BP)
self.table = [list(row) for row in DEFAULT_TABLE]
def get(self, a_cmd: float, v_ego: float, zero_torque_di: float) -> float:
si = float(interp(v_ego, self.speed_bp, range(len(self.speed_bp))))
si_lo = int(si)
si_hi = min(si_lo + 1, len(self.speed_bp) - 1)
sf = si - si_lo
di_lo = float(interp(a_cmd, self.accel_bp, self.table[si_lo]))
di_hi = float(interp(a_cmd, self.accel_bp, self.table[si_hi]))
base_di = di_lo + sf * (di_hi - di_lo)
if a_cmd < 0:
blend = float(clip((a_cmd - REGEN_MAX) / (0.0 - REGEN_MAX), 0.0, 1.0))
else:
blend = float(1.0 - a_cmd / ACCEL_MAX)
return base_di + zero_torque_di * blend
class VirtualDAS:
def __init__(self, dt: float = 0.02):
self.dt = dt
self.jerk_limiter = JerkLimiter(dt=dt)
self.ff_model = FeedforwardModel()
self.grade_estimator = GradeEstimator(dt=dt)
self.inner_pid = PIDController(
k_p=(VDAS_INNER_K_BP, VDAS_INNER_KP_V),
k_i=(VDAS_INNER_K_BP, VDAS_INNER_KI_V),
k_f=0.0,
pos_limit=PEDAL_RAMP_RATE_UP,
neg_limit=-PEDAL_RAMP_RATE_DOWN,
rate=1.0 / dt,
)
self.a_ego_filter = FirstOrderFilter(0.0, VDAS_AEGO_FILTER_RC, dt)
self.prev_a_ego_filtered = 0.0
self.prev_pedal_di = 0.0
def update(self, a_cmd: float, v_ego: float, prev_pedal_di: float, a_ego: float = 0.0,
freeze_integrator: bool = False, orientation_ned: list[float] | None = None) -> float:
a_limited = self.jerk_limiter.update(a_cmd)
grade_accel, pitch_comp = self.grade_estimator.update(orientation_ned or [])
ff_di = self._feedforward(a_limited, v_ego) + pitch_comp
a_ego_corrected = a_ego - grade_accel
a_ego_filtered = self.a_ego_filter.update(a_ego_corrected)
j_ego = (a_ego_filtered - self.prev_a_ego_filtered) / self.dt
self.prev_a_ego_filtered = a_ego_filtered
future_t = float(interp(v_ego, VDAS_FUTURE_T_BP, VDAS_FUTURE_T_V))
error = a_limited - (a_ego_filtered + j_ego * future_t)
if abs(error) < PID_ERROR_DEADBAND:
error = 0.0
pedal_di = ff_di + float(self.inner_pid.update(error, speed=v_ego, freeze_integrator=freeze_integrator))
max_pedal_value = float(interp(v_ego, PEDAL_BP, PEDAL_MAX_VALUES))
pedal_di = float(clip(pedal_di, PEDAL_DI_MIN, max_pedal_value))
pedal_di = float(clip(pedal_di, prev_pedal_di - PEDAL_RAMP_RATE_DOWN, prev_pedal_di + PEDAL_RAMP_RATE_UP))
self.prev_pedal_di = pedal_di
return pedal_di
def reset(self, a_init: float = 0.0, pedal_di_init: float = 0.0) -> None:
self.jerk_limiter.reset(a_init)
self.inner_pid.reset()
self.grade_estimator.reset()
self.a_ego_filter.x = 0.0
self.prev_a_ego_filtered = 0.0
self.prev_pedal_di = pedal_di_init
def _feedforward(self, a_cmd: float, v_ego: float) -> float:
zero_torque_di = get_zero_torque().get(v_ego)
max_pedal_value = float(interp(v_ego, PEDAL_BP, PEDAL_MAX_VALUES))
return float(clip(self.ff_model.get(a_cmd, v_ego, zero_torque_di), PEDAL_DI_MIN, max_pedal_value))
@@ -0,0 +1,87 @@
from opendbc.can import CANParser
from opendbc.car import Bus, structs
from opendbc.car.interfaces import RadarInterfaceBase
from opendbc.car.tesla.preap.nap_conf import nap_conf
from opendbc.car.tesla.values import CANBUS, DBC, CAR
_BOSCH_RADAR_STATUS_MSG = 769
_BOSCH_RADAR_POINT_A_BASE = 784
_BOSCH_RADAR_POINT_B_BASE = 785
_BOSCH_RADAR_POINT_STRIDE = 3
_BOSCH_RADAR_POINTS = 32
_BOSCH_TRIGGER_MSG = _BOSCH_RADAR_POINT_B_BASE + ((_BOSCH_RADAR_POINTS - 1) * _BOSCH_RADAR_POINT_STRIDE)
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)
self.radar_off_can = CP.radarUnavailable or CP.carFingerprint != CAR.TESLA_MODEL_S_PREAP
self.updated_messages: set[int] = set()
self.track_id = 0
self.radar_offset = float(nap_conf.radar_offset) if CP.carFingerprint == CAR.TESLA_MODEL_S_PREAP else 0.0
if self.radar_off_can:
self.rcp = None
else:
messages = [(_BOSCH_RADAR_STATUS_MSG, 8)]
for i in range(_BOSCH_RADAR_POINTS):
messages.append((_BOSCH_RADAR_POINT_A_BASE + (i * _BOSCH_RADAR_POINT_STRIDE), 8))
messages.append((_BOSCH_RADAR_POINT_B_BASE + (i * _BOSCH_RADAR_POINT_STRIDE), 8))
self.rcp = CANParser(DBC[CP.carFingerprint][Bus.radar], messages, CANBUS.radar)
self.trigger_msg = _BOSCH_TRIGGER_MSG
def update(self, can_strings):
if self.radar_off_can or self.rcp is None:
return super().update(None)
vls = self.rcp.update(can_strings)
self.updated_messages.update(vls)
if self.trigger_msg not in self.updated_messages:
return None
ret = structs.RadarData()
if not self.rcp.can_valid:
ret.errors.canError = True
radar_status = self.rcp.vl[_BOSCH_RADAR_STATUS_MSG]
ret.errors.radarFault = bool(radar_status["RADC_HWFail"])
ret.errors.radarUnavailableTemporary = False
current_points: set[int] = set()
for i in range(_BOSCH_RADAR_POINTS):
msg_a_id = _BOSCH_RADAR_POINT_A_BASE + (i * _BOSCH_RADAR_POINT_STRIDE)
msg_b_id = _BOSCH_RADAR_POINT_B_BASE + (i * _BOSCH_RADAR_POINT_STRIDE)
msg_a = self.rcp.vl[msg_a_id]
msg_b = self.rcp.vl[msg_b_id]
if msg_a["Index"] != msg_b["Index2"]:
continue
if not msg_a["Tracked"] or msg_a["LongDist"] <= 0.0 or msg_a["LongDist"] > 250.0 or msg_a["ProbExist"] < 50.0:
self.pts.pop(i, None)
continue
current_points.add(i)
if i not in self.pts:
self.pts[i] = structs.RadarData.RadarPoint()
self.pts[i].trackId = self.track_id
self.track_id += 1
point = self.pts[i]
point.dRel = msg_a["LongDist"]
point.yRel = msg_a["LatDist"] + self.radar_offset
point.vRel = msg_a["LongSpeed"]
point.aRel = msg_a["LongAccel"]
point.yvRel = msg_b["LatSpeed"]
point.measured = bool(msg_a["Meas"])
for point_id in list(self.pts.keys()):
if point_id not in current_points:
del self.pts[point_id]
ret.points = list(self.pts.values())
self.updated_messages.clear()
return ret
@@ -0,0 +1,435 @@
#pragma once
#include "opendbc/safety/declarations.h"
#if defined(STM32H7) || defined(STM32F4)
void can_send(CANPacket_t *to_push, uint8_t bus_number, bool skip_tx_hook);
void can_set_checksum(CANPacket_t *packet);
#endif
#define PREAP_GET_BYTES_04(msg) ((msg)->data[0] | ((msg)->data[1] << 8) | ((msg)->data[2] << 16) | ((msg)->data[3] << 24))
#define PREAP_GET_BYTES_48(msg) ((msg)->data[4] | ((msg)->data[5] << 8) | ((msg)->data[6] << 16) | ((msg)->data[7] << 24))
#define PREAP_WORD_TO_BYTES(dst8, src32) 0[dst8] = ((src32) & 0xFFU); 1[dst8] = (((src32) >> 8U) & 0xFFU); 2[dst8] = (((src32) >> 16U) & 0xFFU); 3[dst8] = (((src32) >> 24U) & 0xFFU)
#define PREAP_FLAG_ENABLE_PEDAL 1U
#define PREAP_FLAG_RADAR_EMULATION 2U
#define PREAP_FLAG_RADAR_BEHIND_NOSECONE 4U
#define PREAP_CANCEL_ECHO_WINDOW_US 600000U
static bool preap_enable_pedal = false;
static bool preap_radar_emulation = false;
static bool preap_radar_behind_nosecone = false;
static int preap_gear = 4;
static int preap_gear_prev = 4;
static bool preap_doors_open = false;
static uint32_t preap_last_stalk_engage_us = 0;
static int preap_radar_epas_type = 0;
static int preap_radar_position = 0;
static uint8_t tesla_preap_get_counter(const CANPacket_t *msg) {
if (msg->addr == 0x370U) {
return msg->data[6] & 0x0FU;
}
return 0U;
}
static uint32_t tesla_preap_get_checksum(const CANPacket_t *msg) {
if (msg->addr == 0x370U) {
return msg->data[7];
}
if (msg->addr == 0x488U) {
return msg->data[3];
}
return 0U;
}
static uint32_t tesla_preap_compute_checksum(const CANPacket_t *msg) {
int checksum_byte = -1;
if (msg->addr == 0x370U) {
checksum_byte = 7;
} else if (msg->addr == 0x488U) {
checksum_byte = 3;
}
if (checksum_byte == -1) {
return 0U;
}
uint8_t checksum = (uint8_t)(msg->addr & 0xFFU) + (uint8_t)((msg->addr >> 8) & 0xFFU);
const int len = GET_LEN(msg);
for (int i = 0; i < len; i++) {
if (i != checksum_byte) {
checksum += msg->data[i];
}
}
return checksum;
}
static const int preap_crc_lookup[256] = {
0x00, 0x1D, 0x3A, 0x27, 0x74, 0x69, 0x4E, 0x53, 0xE8, 0xF5, 0xD2, 0xCF, 0x9C, 0x81, 0xA6, 0xBB,
0xCD, 0xD0, 0xF7, 0xEA, 0xB9, 0xA4, 0x83, 0x9E, 0x25, 0x38, 0x1F, 0x02, 0x51, 0x4C, 0x6B, 0x76,
0x87, 0x9A, 0xBD, 0xA0, 0xF3, 0xEE, 0xC9, 0xD4, 0x6F, 0x72, 0x55, 0x48, 0x1B, 0x06, 0x21, 0x3C,
0x4A, 0x57, 0x70, 0x6D, 0x3E, 0x23, 0x04, 0x19, 0xA2, 0xBF, 0x98, 0x85, 0xD6, 0xCB, 0xEC, 0xF1,
0x13, 0x0E, 0x29, 0x34, 0x67, 0x7A, 0x5D, 0x40, 0xFB, 0xE6, 0xC1, 0xDC, 0x8F, 0x92, 0xB5, 0xA8,
0xDE, 0xC3, 0xE4, 0xF9, 0xAA, 0xB7, 0x90, 0x8D, 0x36, 0x2B, 0x0C, 0x11, 0x42, 0x5F, 0x78, 0x65,
0x94, 0x89, 0xAE, 0xB3, 0xE0, 0xFD, 0xDA, 0xC7, 0x7C, 0x61, 0x46, 0x5B, 0x08, 0x15, 0x32, 0x2F,
0x59, 0x44, 0x63, 0x7E, 0x2D, 0x30, 0x17, 0x0A, 0xB1, 0xAC, 0x8B, 0x96, 0xC5, 0xD8, 0xFF, 0xE2,
0x26, 0x3B, 0x1C, 0x01, 0x52, 0x4F, 0x68, 0x75, 0xCE, 0xD3, 0xF4, 0xE9, 0xBA, 0xA7, 0x80, 0x9D,
0xEB, 0xF6, 0xD1, 0xCC, 0x9F, 0x82, 0xA5, 0xB8, 0x03, 0x1E, 0x39, 0x24, 0x77, 0x6A, 0x4D, 0x50,
0xA1, 0xBC, 0x9B, 0x86, 0xD5, 0xC8, 0xEF, 0xF2, 0x49, 0x54, 0x73, 0x6E, 0x3D, 0x20, 0x07, 0x1A,
0x6C, 0x71, 0x56, 0x4B, 0x18, 0x05, 0x22, 0x3F, 0x84, 0x99, 0xBE, 0xA3, 0xF0, 0xED, 0xCA, 0xD7,
0x35, 0x28, 0x0F, 0x12, 0x41, 0x5C, 0x7B, 0x66, 0xDD, 0xC0, 0xE7, 0xFA, 0xA9, 0xB4, 0x93, 0x8E,
0xF8, 0xE5, 0xC2, 0xDF, 0x8C, 0x91, 0xB6, 0xAB, 0x10, 0x0D, 0x2A, 0x37, 0x64, 0x79, 0x5E, 0x43,
0xB2, 0xAF, 0x88, 0x95, 0xC6, 0xDB, 0xFC, 0xE1, 0x5A, 0x47, 0x60, 0x7D, 0x2E, 0x33, 0x14, 0x09,
0x7F, 0x62, 0x45, 0x58, 0x0B, 0x16, 0x31, 0x2C, 0x97, 0x8A, 0xAD, 0xB0, 0xE3, 0xFE, 0xD9, 0xC4
};
static int preap_compute_crc8(uint32_t lo, uint32_t hi, int msg_len) {
int crc = 0xFF;
for (int x = 0; x < msg_len; x++) {
const int value = (x <= 3) ? ((lo >> (x * 8)) & 0xFF) : ((hi >> ((x - 4) * 8)) & 0xFF);
crc = preap_crc_lookup[crc ^ value];
}
return crc ^ 0xFF;
}
static void preap_radar_readdr(const CANPacket_t *src, uint16_t new_addr) {
#if defined(STM32H7) || defined(STM32F4)
CANPacket_t pkt;
pkt.returned = 0U;
pkt.rejected = 0U;
pkt.extended = src->extended;
pkt.bus = 1;
pkt.addr = new_addr;
pkt.data_len_code = src->data_len_code;
for (int i = 0; i < GET_LEN(src); i++) {
pkt.data[i] = src->data[i];
}
can_set_checksum(&pkt);
can_send(&pkt, 1, true);
#else
(void)src;
(void)new_addr;
#endif
}
static uint8_t preap_byte_sum_checksum(const CANPacket_t *pkt) {
uint8_t checksum = (uint8_t)(pkt->addr & 0xFFU) + (uint8_t)((pkt->addr >> 8) & 0xFFU);
const int len = GET_LEN(pkt);
for (int i = 0; i < (len - 1); i++) {
checksum += pkt->data[i];
}
return checksum;
}
static void tesla_preap_gtw_emulation(const CANPacket_t *to_fwd) {
const int bus_num = to_fwd->bus;
const int addr = to_fwd->addr;
if (bus_num == 0 && preap_radar_emulation) {
switch (addr) {
case 0x45: preap_radar_readdr(to_fwd, 0x219); break;
case 0x108: preap_radar_readdr(to_fwd, 0x109); break;
case 0x145: preap_radar_readdr(to_fwd, 0x149); break;
case 0x20A: preap_radar_readdr(to_fwd, 0x159); break;
case 0x308: preap_radar_readdr(to_fwd, 0x209); break;
case 0x30A: preap_radar_readdr(to_fwd, 0x2D9); break;
case 0x405: preap_radar_readdr(to_fwd, 0x2B9); break;
default: break;
}
if (addr == 0x398) {
CANPacket_t pkt = {.returned = 0U, .rejected = 0U, .extended = to_fwd->extended,
.bus = 1, .addr = 0x2A9, .data_len_code = to_fwd->data_len_code};
uint32_t lo = PREAP_GET_BYTES_04(to_fwd);
uint32_t hi = PREAP_GET_BYTES_48(to_fwd);
lo = (lo & 0xFFFFF33F) | 0x100U | 0x440U;
hi = (hi & 0xCFFF0F0FU) | 0x10000000U | ((uint32_t)preap_radar_position << 4) | ((uint32_t)preap_radar_epas_type << 12);
PREAP_WORD_TO_BYTES(&pkt.data[0], lo);
PREAP_WORD_TO_BYTES(&pkt.data[4], hi);
pkt.data[7] = preap_byte_sum_checksum(&pkt);
#if defined(STM32H7) || defined(STM32F4)
can_set_checksum(&pkt);
can_send(&pkt, 1, true);
#endif
}
if (addr == 0x0EU) {
CANPacket_t pkt = {.returned = 0U, .rejected = 0U, .extended = to_fwd->extended,
.bus = 1, .addr = 0x199, .data_len_code = to_fwd->data_len_code};
uint32_t lo = PREAP_GET_BYTES_04(to_fwd);
uint32_t hi = PREAP_GET_BYTES_48(to_fwd);
if (((lo >> 16) & 0xFF3FU) == 0xFF3FU) {
lo = (lo & 0x00C0FFFFU) | (0x0020U << 16);
hi = (hi & 0x00FFFFF0U) | 0x00000004U;
const int crc = preap_compute_crc8(lo, hi, 7);
hi |= ((uint32_t)crc << 24);
}
PREAP_WORD_TO_BYTES(&pkt.data[0], lo);
PREAP_WORD_TO_BYTES(&pkt.data[4], hi);
#if defined(STM32H7) || defined(STM32F4)
can_set_checksum(&pkt);
can_send(&pkt, 1, true);
#endif
}
if (addr == 0x115U) {
preap_radar_readdr(to_fwd, 0x129);
const uint32_t hi_src = PREAP_GET_BYTES_48(to_fwd);
const int counter = ((hi_src & 0xF0U) >> 4) & 0x0F;
const uint32_t syn_lo = 0x000C0000U | ((uint32_t)counter << 28);
const int checksum = (0x38 + 0x0C + (counter << 4)) & 0xFF;
CANPacket_t pkt = {.returned = 0U, .rejected = 0U, .extended = 0,
.bus = 1, .addr = 0x1A9, .data_len_code = 5};
PREAP_WORD_TO_BYTES(&pkt.data[0], syn_lo);
PREAP_WORD_TO_BYTES(&pkt.data[4], (uint32_t)checksum);
#if defined(STM32H7) || defined(STM32F4)
can_set_checksum(&pkt);
can_send(&pkt, 1, true);
#endif
}
if (addr == 0x118U) {
preap_radar_readdr(to_fwd, 0x119);
const uint32_t lo = PREAP_GET_BYTES_04(to_fwd);
const int ws_counter = PREAP_GET_BYTES_48(to_fwd) & 0x0F;
const int raw_speed = (int)((0xFFF0000U & lo) >> 16);
int speed;
if (raw_speed == 0xFFF) {
speed = 0x1FFF;
} else {
const int mph_x100 = raw_speed * 5 - 2500;
const int kph_x100 = mph_x100 * 1609 / 1000;
speed = (kph_x100 < 0) ? 0 : ((kph_x100 / 4) & 0x1FFF);
}
const uint32_t ws_lo = (uint32_t)(speed | (speed << 13) | (speed << 26));
uint32_t ws_hi = (uint32_t)((speed >> 6) | (speed << 7) | (ws_counter << 20)) & 0x00FFFFFFU;
int ws_checksum = 0x76;
ws_checksum = (ws_checksum + (int)(ws_lo & 0xFF) + (int)((ws_lo >> 8) & 0xFF) + (int)((ws_lo >> 16) & 0xFF) + (int)((ws_lo >> 24) & 0xFF)) & 0xFF;
ws_checksum = (ws_checksum + (int)(ws_hi & 0xFF) + (int)((ws_hi >> 8) & 0xFF) + (int)((ws_hi >> 16) & 0xFF)) & 0xFF;
ws_hi |= ((uint32_t)ws_checksum << 24);
CANPacket_t pkt = {.returned = 0U, .rejected = 0U, .extended = 0,
.bus = 1, .addr = 0x169, .data_len_code = 8};
PREAP_WORD_TO_BYTES(&pkt.data[0], ws_lo);
PREAP_WORD_TO_BYTES(&pkt.data[4], ws_hi);
#if defined(STM32H7) || defined(STM32F4)
can_set_checksum(&pkt);
can_send(&pkt, 1, true);
#endif
}
}
}
static void tesla_preap_rx_hook(const CANPacket_t *msg) {
if (preap_enable_pedal && (msg->addr == 0x552U)) {
const int pedal_val = (msg->data[0] << 8) | msg->data[1];
gas_pressed = pedal_val > 650;
return;
}
if (msg->bus != 0U) {
return;
}
if (msg->addr == 0x370U) {
const int angle_meas_new = (((msg->data[4] & 0x3FU) << 8) | msg->data[5]) - 8192U;
update_sample(&angle_meas, angle_meas_new);
const int hands_on_level = msg->data[4] >> 6;
const int eac_status = msg->data[6] >> 5;
const int eac_error_code = msg->data[2] >> 4;
const bool epas_rejecting = (eac_status == 0) && (eac_error_code >= 6) && (eac_error_code <= 9);
steering_disengage = (hands_on_level >= 3) || epas_rejecting;
if (steering_disengage && !steering_disengage_prev) {
pcm_cruise_check(false);
}
}
if (msg->addr == 0x155U) {
const float speed = (((msg->data[5] << 8) | msg->data[6]) * 0.01f) * KPH_TO_MS;
UPDATE_VEHICLE_SPEED(speed);
vehicle_moving = speed > (0.5f * KPH_TO_MS);
}
if ((msg->addr == 0x108U) && !preap_enable_pedal) {
gas_pressed = msg->data[6] != 0U;
}
if (msg->addr == 0x20AU) {
brake_pressed = false;
}
if (msg->addr == 0x368U) {
const int cruise_state = (msg->data[1] >> 4) & 0x07U;
if (cruise_state == 3) {
vehicle_moving = false;
}
}
if (msg->addr == 0x118U) {
preap_gear = (msg->data[1] >> 4) & 0x07;
if ((preap_gear_prev == 4) && (preap_gear != 4)) {
controls_allowed = false;
}
preap_gear_prev = preap_gear;
}
if (msg->addr == 0x318U) {
const int d_fl = (msg->data[1] >> 4) & 0x03;
const int d_fr = (msg->data[1] >> 6) & 0x03;
const int d_rl = (msg->data[2] >> 6) & 0x03;
const int d_rr = (msg->data[3] >> 5) & 0x03;
const int d_ft = (msg->data[6] >> 2) & 0x03;
const int d_tr = (msg->data[5] >> 6) & 0x03;
preap_doors_open = (d_fl == 1) || (d_fr == 1) || (d_rl == 1) || (d_rr == 1) || (d_ft == 1) || (d_tr == 1);
if (preap_doors_open) {
controls_allowed = false;
}
}
if (msg->addr == 0x45U) {
const int lever = msg->data[0] & 0x3FU;
if (lever == 2) {
if ((preap_gear == 4) && !preap_doors_open) {
pcm_cruise_check(true);
preap_last_stalk_engage_us = microsecond_timer_get();
}
} else if (lever == 1) {
const uint32_t elapsed = microsecond_timer_get() - preap_last_stalk_engage_us;
if (elapsed > PREAP_CANCEL_ECHO_WINDOW_US) {
pcm_cruise_check(false);
}
}
}
}
static bool tesla_preap_tx_hook(const CANPacket_t *msg) {
const AngleSteeringLimits PREAP_STEERING_LIMITS = {
.max_angle = 3600,
.angle_deg_to_can = 10,
.frequency = 50U,
};
const AngleSteeringParams PREAP_STEERING_PARAMS = {
.slip_factor = -0.0005666f,
.steer_ratio = 15.f,
.wheelbase = 2.96f,
};
bool violation = false;
if (msg->addr == 0x488U) {
const int raw_angle_can = ((msg->data[0] & 0x7FU) << 8) | msg->data[1];
const int desired_angle = raw_angle_can - 16384;
const int steer_control_type = msg->data[2] >> 6;
const bool steer_control_enabled = steer_control_type == 1;
if (steer_angle_cmd_checks_vm(desired_angle, steer_control_enabled, PREAP_STEERING_LIMITS, PREAP_STEERING_PARAMS)) {
violation = true;
}
if ((steer_control_type != 0) && (steer_control_type != 1)) {
violation = true;
}
}
if (msg->addr == 0x214U) {
const int epas_control_type = msg->data[0] & 0x07U;
if (epas_control_type > 1) {
violation = true;
}
}
if (msg->addr == 0x2B9U) {
const int aeb_event = msg->data[2] & 0x03U;
if (aeb_event != 0) {
violation = true;
}
}
if (msg->addr == 0x551U) {
if (!preap_enable_pedal) {
violation = true;
} else {
const bool pedal_enable = (msg->data[4] & 0x80U) != 0U;
const int raw_gas_cmd = (msg->data[0] << 8) | msg->data[1];
if (pedal_enable) {
if (!get_longitudinal_allowed()) {
violation = true;
}
} else if (raw_gas_cmd > 500) {
violation = true;
}
}
}
return !violation;
}
static bool tesla_preap_fwd_hook(int bus_num, int addr) {
(void)bus_num;
(void)addr;
return false;
}
static safety_config tesla_preap_init(uint16_t param) {
preap_enable_pedal = GET_FLAG(param, PREAP_FLAG_ENABLE_PEDAL);
preap_radar_emulation = GET_FLAG(param, PREAP_FLAG_RADAR_EMULATION);
preap_radar_behind_nosecone = GET_FLAG(param, PREAP_FLAG_RADAR_BEHIND_NOSECONE);
preap_gear = 4;
preap_gear_prev = 4;
preap_doors_open = false;
preap_last_stalk_engage_us = 0;
preap_radar_epas_type = 0;
preap_radar_position = preap_radar_behind_nosecone ? 1 : 0;
static const CanMsg PREAP_TX_MSGS[] = {
{0x488, 0, 4, .check_relay = false, .disable_static_blocking = true},
{0x2B9, 0, 8, .check_relay = false, .disable_static_blocking = true},
{0x214, 0, 3, .check_relay = false, .disable_static_blocking = true},
{0x551, 0, 6, .check_relay = false, .disable_static_blocking = true},
{0x551, 2, 6, .check_relay = false, .disable_static_blocking = true},
{0x45, 0, 8, .check_relay = false, .disable_static_blocking = true},
};
static RxCheck preap_rx_checks[] = {
{.msg = {{0x370, 0, 8, 25U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x108, 0, 8, 100U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x118, 0, 6, 100U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x20A, 0, 8, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x368, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x318, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x45, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x155, 0, 8, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
};
static RxCheck preap_rx_checks_with_pedal[] = {
{.msg = {{0x370, 0, 8, 25U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x108, 0, 8, 100U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x118, 0, 6, 100U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x20A, 0, 8, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x368, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x318, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x45, 0, 8, 10U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x155, 0, 8, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}, {0}}},
{.msg = {{0x552, 0, 6, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true},
{0x552, 2, 6, 50U, .ignore_quality_flag = true, .ignore_checksum = true, .ignore_counter = true}, {0}}},
};
return preap_enable_pedal ? BUILD_SAFETY_CFG(preap_rx_checks_with_pedal, PREAP_TX_MSGS)
: BUILD_SAFETY_CFG(preap_rx_checks, PREAP_TX_MSGS);
}
const safety_hooks tesla_preap_hooks = {
.init = tesla_preap_init,
.rx = tesla_preap_rx_hook,
.rx_all = tesla_preap_gtw_emulation,
.tx = tesla_preap_tx_hook,
.fwd = tesla_preap_fwd_hook,
.get_checksum = tesla_preap_get_checksum,
.compute_checksum = tesla_preap_compute_checksum,
.get_counter = tesla_preap_get_counter,
.get_quality_flag_valid = NULL,
};
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
import unittest
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
import opendbc.safety.tests.common as common
class TestTeslaPreAPSafety(common.SafetyTestBase):
TX_MSGS = [[0x488, 0], [0x2B9, 0], [0x214, 0], [0x551, 0], [0x551, 2], [0x45, 0]]
def setUp(self):
self.safety = libsafety_py.libsafety
self._set_mode(0)
def _set_mode(self, param: int) -> None:
self.safety.set_safety_hooks(CarParams.SafetyModel.teslaPreap, param)
self.safety.init_tests()
@staticmethod
def _gear_msg(gear: int = 4):
dat = bytearray(6)
dat[1] = (gear & 0x7) << 4
return common.make_msg(0, 0x118, 6, dat)
@staticmethod
def _doors_msg(opened: bool):
dat = bytearray(8)
if opened:
dat[1] = 0x10
return common.make_msg(0, 0x318, 8, dat)
@staticmethod
def _stalk_msg(lever: int):
dat = bytearray(8)
dat[0] = lever & 0x3F
return common.make_msg(0, 0x45, 8, dat)
@staticmethod
def _steering_status_msg(hands_on_level: int = 0, eac_status: int = 1, eac_error_code: int = 0, angle_tenths: int = 0):
raw_angle = angle_tenths + 8192
dat = bytearray(8)
dat[2] = (eac_error_code & 0xF) << 4
dat[4] = ((hands_on_level & 0x3) << 6) | ((raw_angle >> 8) & 0x3F)
dat[5] = raw_angle & 0xFF
dat[6] = (eac_status & 0x7) << 5
return common.make_msg(0, 0x370, 8, dat)
@staticmethod
def _steer_cmd_msg(angle_tenths: int, steer_control_type: int = 1):
raw_angle = angle_tenths + 16384
dat = bytearray(4)
dat[0] = (raw_angle >> 8) & 0x7F
dat[1] = raw_angle & 0xFF
dat[2] = (steer_control_type & 0x3) << 6
return common.make_msg(0, 0x488, 4, dat)
@staticmethod
def _epas_control_msg(epas_control_type: int):
dat = bytearray(3)
dat[0] = epas_control_type & 0x7
return common.make_msg(0, 0x214, 3, dat)
@staticmethod
def _long_msg(aeb_event: int = 0):
dat = bytearray(8)
dat[2] = aeb_event & 0x3
return common.make_msg(0, 0x2B9, 8, dat)
@staticmethod
def _pedal_cmd_msg(raw_cmd: int = 700, enable: bool = True, bus: int = 0):
dat = bytearray(6)
dat[0] = (raw_cmd >> 8) & 0xFF
dat[1] = raw_cmd & 0xFF
dat[4] = 0x80 if enable else 0x00
return common.make_msg(bus, 0x551, 6, dat)
def _prime_can_engage(self):
self.assertTrue(self._rx(self._gear_msg(4)))
self.assertTrue(self._rx(self._doors_msg(False)))
def test_mode_param_round_trip(self):
self._set_mode(1 | 2 | 4)
self.assertEqual(self.safety.get_current_safety_mode(), 35)
self.assertEqual(self.safety.get_current_safety_param(), 1 | 2 | 4)
def test_stalk_engage_and_cancel(self):
self._prime_can_engage()
self.assertFalse(self.safety.get_controls_allowed())
self.assertTrue(self._rx(self._stalk_msg(2)))
self.assertTrue(self.safety.get_controls_allowed())
self.safety.set_timer(700000)
self.assertTrue(self._rx(self._stalk_msg(1)))
self.assertFalse(self.safety.get_controls_allowed())
def test_door_open_blocks_engagement(self):
self.assertTrue(self._rx(self._gear_msg(4)))
self.assertTrue(self._rx(self._doors_msg(True)))
self.assertTrue(self._rx(self._stalk_msg(2)))
self.assertFalse(self.safety.get_controls_allowed())
def test_steering_disengage_drops_controls(self):
self._prime_can_engage()
self.assertTrue(self._rx(self._stalk_msg(2)))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self._rx(self._steering_status_msg(hands_on_level=3)))
self.assertFalse(self.safety.get_controls_allowed())
self.assertTrue(self.safety.get_steering_disengage_prev())
def test_steering_tx_limits(self):
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._steer_cmd_msg(0, 1)))
self.assertFalse(self._tx(self._steer_cmd_msg(0, 2)))
self.assertFalse(self._tx(self._steer_cmd_msg(4000, 1)))
self.assertTrue(self._tx(self._epas_control_msg(1)))
self.assertFalse(self._tx(self._epas_control_msg(2)))
def test_aeb_is_blocked(self):
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._long_msg(0)))
self.assertFalse(self._tx(self._long_msg(1)))
def test_pedal_tx_requires_pedal_flag_and_longitudinal_allowed(self):
self.safety.set_controls_allowed(True)
self.assertFalse(self._tx(self._pedal_cmd_msg()))
self._set_mode(1)
self.safety.set_controls_allowed(False)
self.assertFalse(self._tx(self._pedal_cmd_msg()))
self.safety.set_controls_allowed(True)
self.assertTrue(self._tx(self._pedal_cmd_msg()))
self.assertTrue(self._tx(self._pedal_cmd_msg(raw_cmd=0, enable=False)))
def test_forwarding_passthrough(self):
self.assertEqual(self.safety.safety_fwd_hook(0, 0x123), 2)
self.assertEqual(self.safety.safety_fwd_hook(2, 0x123), 0)
if __name__ == "__main__":
unittest.main()