diff --git a/opendbc/car/subaru/carcontroller.py b/opendbc/car/subaru/carcontroller.py index 3b9abf591..31b80843e 100644 --- a/opendbc/car/subaru/carcontroller.py +++ b/opendbc/car/subaru/carcontroller.py @@ -1,10 +1,11 @@ import numpy as np from opendbc.can import CANPacker from opendbc.car import Bus, make_tester_present_msg -from opendbc.car.lateral import apply_driver_steer_torque_limits, common_fault_avoidance +from opendbc.car.lateral import apply_driver_steer_torque_limits, apply_steer_angle_limits_vm, common_fault_avoidance from opendbc.car.interfaces import CarControllerBase from opendbc.car.subaru import subarucan from opendbc.car.subaru.values import DBC, GLOBAL_ES_ADDR, CanBus, CarControllerParams, SubaruFlags +from opendbc.car.vehicle_model import VehicleModel # FIXME: These limits aren't exact. The real limit is more than likely over a larger time period and # involves the total steering angle change rather than rate, but these limits work well for now @@ -12,10 +13,17 @@ MAX_STEER_RATE = 25 # deg/s MAX_STEER_RATE_FRAMES = 7 # tx control frames needed before torque can be cut +def get_safety_CP(): + # Use the Ascent for lateral limiting to match safety (most restrictive slip factor) + from opendbc.car.subaru.interface import CarInterface + return CarInterface.get_non_essential_params("SUBARU_ASCENT") + + class CarController(CarControllerBase): def __init__(self, dbc_names, CP): super().__init__(dbc_names, CP) self.apply_torque_last = 0 + self.apply_angle_last = 0 self.cruise_button_prev = 0 self.steer_rate_counter = 0 @@ -23,6 +31,9 @@ class CarController(CarControllerBase): self.p = CarControllerParams(CP) self.packer = CANPacker(DBC[CP.carFingerprint][Bus.pt]) + if CP.flags & SubaruFlags.LKAS_ANGLE: + self.VM = VehicleModel(get_safety_CP()) + def update(self, CC, CS, now_nanos): actuators = CC.actuators hud_control = CC.hudControl @@ -32,30 +43,34 @@ class CarController(CarControllerBase): # *** steering *** if (self.frame % self.p.STEER_STEP) == 0: - apply_torque = int(round(actuators.torque * self.p.STEER_MAX)) - - # limits due to driver torque - - new_torque = int(round(apply_torque)) - apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.p) - - if not CC.latActive: - apply_torque = 0 - - if self.CP.flags & SubaruFlags.PREGLOBAL: - can_sends.append(subarucan.create_preglobal_steering_control(self.packer, self.frame // self.p.STEER_STEP, apply_torque, CC.latActive)) + if self.CP.flags & SubaruFlags.LKAS_ANGLE: + self.apply_angle_last = apply_steer_angle_limits_vm(actuators.steeringAngleDeg, self.apply_angle_last, CS.out.vEgoRaw, + CS.out.steeringAngleDeg, CC.latActive, CarControllerParams, self.VM) + can_sends.append(subarucan.create_steering_control_angle(self.packer, self.apply_angle_last, CC.latActive)) else: - apply_steer_req = CC.latActive + apply_torque = int(round(actuators.torque * self.p.STEER_MAX)) - if self.CP.flags & SubaruFlags.STEER_RATE_LIMITED: - # Steering rate fault prevention - self.steer_rate_counter, apply_steer_req = \ - common_fault_avoidance(abs(CS.out.steeringRateDeg) > MAX_STEER_RATE, apply_steer_req, - self.steer_rate_counter, MAX_STEER_RATE_FRAMES) + # limits due to driver torque + new_torque = int(round(apply_torque)) + apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.p) - can_sends.append(subarucan.create_steering_control(self.packer, apply_torque, apply_steer_req)) + if not CC.latActive: + apply_torque = 0 - self.apply_torque_last = apply_torque + if self.CP.flags & SubaruFlags.PREGLOBAL: + can_sends.append(subarucan.create_preglobal_steering_control(self.packer, self.frame // self.p.STEER_STEP, apply_torque, CC.latActive)) + else: + apply_steer_req = CC.latActive + + if self.CP.flags & SubaruFlags.STEER_RATE_LIMITED: + # Steering rate fault prevention + self.steer_rate_counter, apply_steer_req = \ + common_fault_avoidance(abs(CS.out.steeringRateDeg) > MAX_STEER_RATE, apply_steer_req, + self.steer_rate_counter, MAX_STEER_RATE_FRAMES) + + can_sends.append(subarucan.create_steering_control(self.packer, apply_torque, apply_steer_req)) + + self.apply_torque_last = apply_torque # *** longitudinal *** @@ -137,8 +152,11 @@ class CarController(CarControllerBase): can_sends.append(subarucan.create_es_static_2(self.packer)) new_actuators = actuators.as_builder() - new_actuators.torque = self.apply_torque_last / self.p.STEER_MAX - new_actuators.torqueOutputCan = self.apply_torque_last + if self.CP.flags & SubaruFlags.LKAS_ANGLE: + new_actuators.steeringAngleDeg = self.apply_angle_last + else: + new_actuators.torque = self.apply_torque_last / self.p.STEER_MAX + new_actuators.torqueOutputCan = self.apply_torque_last self.frame += 1 return new_actuators, can_sends diff --git a/opendbc/car/subaru/carstate.py b/opendbc/car/subaru/carstate.py index 4d242f835..adb4f7b65 100644 --- a/opendbc/car/subaru/carstate.py +++ b/opendbc/car/subaru/carstate.py @@ -86,10 +86,6 @@ class CarState(CarStateBase): if self.CP.flags & (SubaruFlags.HYBRID | SubaruFlags.LKAS_ANGLE): # ES_DashStatus->Cruise_Activated_Dash is likely intended for the dash display only, as it falls # during user gas override and at standstill. ES_Status is missing on hybrid, so we use ES_Brake instead - - # TODO: ES_Brake->Cruise_Activated has been seen staying high when Crosstrek 2025 angle LKAS user pressed - # brake while engaged at a stop. ES_Status and ES_DashStatus->Signal7 correctly fell, but is either missing or - # always zero on hybrids. Probably need to split angle & hybrid. 0x27 and 0x225 on hybrids may work for them. ret.cruiseState.enabled = cp_es_brake.vl["ES_Brake"]['Cruise_Activated'] != 0 ret.cruiseState.available = cp_cam.vl["ES_DashStatus"]['Cruise_On'] != 0 else: diff --git a/opendbc/car/subaru/interface.py b/opendbc/car/subaru/interface.py index 5b16f17d2..1568050ea 100644 --- a/opendbc/car/subaru/interface.py +++ b/opendbc/car/subaru/interface.py @@ -18,7 +18,10 @@ class CarInterface(CarInterfaceBase): # - replacement for ES_Distance so we can cancel the cruise control # - to find the Cruise_Activated bit from the car # - proper panda safety setup (use the correct cruise_activated bit, throttle from Throttle_Hybrid, etc) - ret.dashcamOnly = bool(ret.flags & (SubaruFlags.PREGLOBAL | SubaruFlags.LKAS_ANGLE | SubaruFlags.HYBRID)) + ret.dashcamOnly = bool(ret.flags & (SubaruFlags.PREGLOBAL | SubaruFlags.HYBRID)) + # LKAS_ANGLE cars are dashcam-only until validated + if ret.flags & SubaruFlags.LKAS_ANGLE and candidate != CAR.SUBARU_CROSSTREK_2025: + ret.dashcamOnly = True ret.autoResumeSng = False # Detect infotainment message sent from the camera @@ -33,6 +36,8 @@ class CarInterface(CarInterfaceBase): ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.subaru)] if ret.flags & SubaruFlags.GLOBAL_GEN2: ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.GEN2.value + if ret.flags & SubaruFlags.LKAS_ANGLE: + ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.LKAS_ANGLE.value ret.steerLimitTimer = 0.4 ret.steerActuatorDelay = 0.1 diff --git a/opendbc/car/subaru/values.py b/opendbc/car/subaru/values.py index a702d3943..3eda0329e 100644 --- a/opendbc/car/subaru/values.py +++ b/opendbc/car/subaru/values.py @@ -1,7 +1,8 @@ from dataclasses import dataclass, field from enum import Enum, IntFlag -from opendbc.car import Bus, CarSpecs, DbcDict, PlatformConfig, Platforms, uds +from opendbc.car import ACCELERATION_DUE_TO_GRAVITY, Bus, CarSpecs, DbcDict, PlatformConfig, Platforms, uds +from opendbc.car.lateral import AngleSteeringLimits, ISO_LATERAL_ACCEL from opendbc.car.structs import CarParams from opendbc.car.docs_definitions import CarFootnote, CarHarness, CarDocs, CarParts, Column from opendbc.car.fw_query_definitions import FwQueryConfig, Request, StdQueries, p16 @@ -9,9 +10,23 @@ from opendbc.car.fw_query_definitions import FwQueryConfig, Request, StdQueries, Ecu = CarParams.Ecu +# Add extra tolerance for average banked road since safety doesn't have the roll +AVERAGE_ROAD_ROLL = 0.06 # ~3.4 degrees, 6% superelevation. higher actual roll lowers lateral acceleration + + class CarControllerParams: + ANGLE_LIMITS: AngleSteeringLimits = AngleSteeringLimits( + 190, # deg, EPS faults above ~200 + ([], []), + ([], []), + MAX_LATERAL_ACCEL=ISO_LATERAL_ACCEL + (ACCELERATION_DUE_TO_GRAVITY * AVERAGE_ROAD_ROLL), # ~3.6 m/s^2 + MAX_LATERAL_JERK=3.0 + (ACCELERATION_DUE_TO_GRAVITY * AVERAGE_ROAD_ROLL), # ~3.6 m/s^3 + MAX_ANGLE_RATE=5, # deg/frame, comfort rate limit + ) + + STEER_STEP = 2 # how often we update the steer cmd + def __init__(self, CP): - self.STEER_STEP = 2 # how often we update the steer cmd self.STEER_DELTA_UP = 50 # torque increase per refresh, 0.8s to max self.STEER_DELTA_DOWN = 70 # torque decrease per refresh self.STEER_DRIVER_ALLOWANCE = 60 # allowed driver torque before start limiting @@ -57,6 +72,7 @@ class SubaruSafetyFlags(IntFlag): GEN2 = 1 LONG = 2 PREGLOBAL_REVERSED_DRIVER_TORQUE = 4 + LKAS_ANGLE = 8 class SubaruFlags(IntFlag): @@ -211,6 +227,11 @@ class CAR(Platforms): SUBARU_ASCENT.specs, flags=SubaruFlags.LKAS_ANGLE, ) + SUBARU_CROSSTREK_2025 = SubaruGen2PlatformConfig( + [SubaruCarDocs("Subaru Crosstrek 2025", "All", car_parts=CarParts.common([CarHarness.subaru_d]))], + CarSpecs(mass=1529, wheelbase=2.67, steerRatio=17), + flags=SubaruFlags.LKAS_ANGLE, + ) SUBARU_VERSION_REQUEST = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]) + \ diff --git a/opendbc/car/torque_data/override.toml b/opendbc/car/torque_data/override.toml index a7e4c1242..4b8791401 100644 --- a/opendbc/car/torque_data/override.toml +++ b/opendbc/car/torque_data/override.toml @@ -14,6 +14,7 @@ legend = ["LAT_ACCEL_FACTOR", "MAX_LAT_ACCEL_MEASURED", "FRICTION"] "SUBARU_FORESTER_2022" = [nan, 3.0, nan] "SUBARU_OUTBACK_2023" = [nan, 3.0, nan] "SUBARU_ASCENT_2023" = [nan, 3.0, nan] +"SUBARU_CROSSTREK_2025" = [nan, 3.0, nan] # Toyota LTA also has torque "TOYOTA_RAV4_TSS2_2023" = [nan, 3.0, nan] diff --git a/opendbc/safety/modes/subaru.h b/opendbc/safety/modes/subaru.h index a27b6e590..c7f88faa1 100644 --- a/opendbc/safety/modes/subaru.h +++ b/opendbc/safety/modes/subaru.h @@ -23,9 +23,11 @@ #define MSG_SUBARU_CruiseControl 0x240U #define MSG_SUBARU_Throttle 0x40U #define MSG_SUBARU_Steering_Torque 0x119U +#define MSG_SUBARU_Steering_2 0x11aU #define MSG_SUBARU_Wheel_Speeds 0x13aU #define MSG_SUBARU_ES_LKAS 0x122U +#define MSG_SUBARU_ES_LKAS_ANGLE 0x124U #define MSG_SUBARU_ES_Brake 0x220U #define MSG_SUBARU_ES_Distance 0x221U #define MSG_SUBARU_ES_Status 0x222U @@ -70,8 +72,17 @@ {.msg = {{MSG_SUBARU_Brake_Status, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ {.msg = {{MSG_SUBARU_CruiseControl, alt_bus, 8, 20U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ +#define SUBARU_LKAS_ANGLE_RX_CHECKS(alt_bus) \ + {.msg = {{MSG_SUBARU_Throttle, SUBARU_MAIN_BUS, 8, 100U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + {.msg = {{MSG_SUBARU_Steering_Torque, SUBARU_MAIN_BUS, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + {.msg = {{MSG_SUBARU_Wheel_Speeds, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + {.msg = {{MSG_SUBARU_Brake_Status, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + {.msg = {{MSG_SUBARU_ES_Brake, alt_bus, 8, 20U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + {.msg = {{MSG_SUBARU_Steering_2, SUBARU_MAIN_BUS, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \ + static bool subaru_gen2 = false; static bool subaru_longitudinal = false; +static bool subaru_lkas_angle = false; static uint32_t subaru_get_checksum(const CANPacket_t *msg) { return (uint8_t)msg->data[0]; @@ -100,8 +111,19 @@ static void subaru_rx_hook(const CANPacket_t *msg) { update_sample(&torque_driver, torque_driver_new); } + // Steering angle measurement for LKAS_ANGLE cars + if (subaru_lkas_angle && (msg->addr == MSG_SUBARU_Steering_2) && (msg->bus == SUBARU_MAIN_BUS)) { + int angle_meas_new = GET_BYTES(msg, 3, 3) & 0x1FFFFU; + angle_meas_new = -1 * to_signed(angle_meas_new, 17); + update_sample(&angle_meas, angle_meas_new); + } + // enter controls on rising edge of ACC, exit controls on ACC off - if ((msg->addr == MSG_SUBARU_CruiseControl) && (msg->bus == alt_main_bus)) { + if (subaru_lkas_angle && (msg->addr == MSG_SUBARU_ES_Brake) && (msg->bus == alt_main_bus)) { + bool cruise_engaged = (msg->data[4] >> 7) & 1U; + pcm_cruise_check(cruise_engaged); + } + if (!subaru_lkas_angle && (msg->addr == MSG_SUBARU_CruiseControl) && (msg->bus == alt_main_bus)) { bool cruise_engaged = (msg->data[5] >> 1) & 1U; pcm_cruise_check(cruise_engaged); } @@ -144,7 +166,29 @@ static bool subaru_tx_hook(const CANPacket_t *msg) { bool tx = true; bool violation = false; - // steer cmd checks + // angle steer cmd checks + if (msg->addr == MSG_SUBARU_ES_LKAS_ANGLE) { + // Based on SUBARU_ASCENT (most restrictive slip factor) + const AngleSteeringLimits SUBARU_ANGLE_STEERING_LIMITS = { + .max_angle = 190 * 100, + .angle_deg_to_can = 100., + .frequency = 50U, + }; + + const AngleSteeringParams SUBARU_STEERING_PARAMS = { + .slip_factor = -0.000580374471400815, + .steer_ratio = 13.5, + .wheelbase = 2.89, + }; + + int desired_angle = GET_BYTES(msg, 5, 3) & 0x1FFFFU; + desired_angle = -1 * to_signed(desired_angle, 17); + bool lkas_request = GET_BIT(msg, 12U); + + violation |= steer_angle_cmd_checks_vm(desired_angle, lkas_request, SUBARU_ANGLE_STEERING_LIMITS, SUBARU_STEERING_PARAMS); + } + + // torque steer cmd checks if (msg->addr == MSG_SUBARU_ES_LKAS) { int desired_torque = ((GET_BYTES(msg, 0, 4) >> 16) & 0x1FFFU); desired_torque = -1 * to_signed(desired_torque, 13); @@ -220,6 +264,16 @@ static safety_config subaru_init(uint16_t param) { SUBARU_GEN2_LONG_ADDITIONAL_TX_MSGS() }; + static const CanMsg SUBARU_LKAS_ANGLE_TX_MSGS[] = { + SUBARU_BASE_TX_MSGS(SUBARU_MAIN_BUS, MSG_SUBARU_ES_LKAS_ANGLE) + SUBARU_COMMON_TX_MSGS(SUBARU_MAIN_BUS) + }; + + static const CanMsg SUBARU_LKAS_ANGLE_GEN2_TX_MSGS[] = { + SUBARU_BASE_TX_MSGS(SUBARU_ALT_BUS, MSG_SUBARU_ES_LKAS_ANGLE) + SUBARU_COMMON_TX_MSGS(SUBARU_ALT_BUS) + }; + static RxCheck subaru_rx_checks[] = { SUBARU_COMMON_RX_CHECKS(SUBARU_MAIN_BUS) }; @@ -228,9 +282,19 @@ static safety_config subaru_init(uint16_t param) { SUBARU_COMMON_RX_CHECKS(SUBARU_ALT_BUS) }; + static RxCheck subaru_lkas_angle_rx_checks[] = { + SUBARU_LKAS_ANGLE_RX_CHECKS(SUBARU_MAIN_BUS) + }; + + static RxCheck subaru_lkas_angle_gen2_rx_checks[] = { + SUBARU_LKAS_ANGLE_RX_CHECKS(SUBARU_ALT_BUS) + }; + const uint16_t SUBARU_PARAM_GEN2 = 1; + const uint16_t SUBARU_PARAM_LKAS_ANGLE = 8; subaru_gen2 = GET_FLAG(param, SUBARU_PARAM_GEN2); + subaru_lkas_angle = GET_FLAG(param, SUBARU_PARAM_LKAS_ANGLE); #ifdef ALLOW_DEBUG const uint16_t SUBARU_PARAM_LONGITUDINAL = 2; @@ -238,7 +302,10 @@ static safety_config subaru_init(uint16_t param) { #endif safety_config ret; - if (subaru_gen2) { + if (subaru_lkas_angle) { + ret = subaru_gen2 ? BUILD_SAFETY_CFG(subaru_lkas_angle_gen2_rx_checks, SUBARU_LKAS_ANGLE_GEN2_TX_MSGS) : \ + BUILD_SAFETY_CFG(subaru_lkas_angle_rx_checks, SUBARU_LKAS_ANGLE_TX_MSGS); + } else if (subaru_gen2) { ret = subaru_longitudinal ? BUILD_SAFETY_CFG(subaru_gen2_rx_checks, SUBARU_GEN2_LONG_TX_MSGS) : \ BUILD_SAFETY_CFG(subaru_gen2_rx_checks, SUBARU_GEN2_TX_MSGS); } else { diff --git a/opendbc/safety/tests/test_subaru.py b/opendbc/safety/tests/test_subaru.py index a882f0722..fdb7911a3 100755 --- a/opendbc/safety/tests/test_subaru.py +++ b/opendbc/safety/tests/test_subaru.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 import enum import unittest +import numpy as np + +from functools import partial from opendbc.car.subaru.values import SubaruSafetyFlags from opendbc.car.structs import CarParams +from opendbc.car.vehicle_model import VehicleModel from opendbc.safety.tests.libsafety import libsafety_py import opendbc.safety.tests.common as common -from opendbc.safety.tests.common import CANPackerSafety -from functools import partial +from opendbc.safety.tests.common import CANPackerSafety, round_speed, away_round class SubaruMsg(enum.IntEnum): @@ -233,5 +236,157 @@ class TestSubaruGen2LongitudinalSafety(TestSubaruLongitudinalSafetyBase, TestSub self.assertFalse(self._tx(self._es_uds_msg(msg))) +class TestSubaruAngleSafetyBase(TestSubaruSafetyBase, common.AngleSteeringSafetyTest): + STEER_ANGLE_MAX = 190 + DEG_TO_CAN = 100 + + # VM-based limits, not breakpoint-based + ANGLE_RATE_BP = None + ANGLE_RATE_UP = None + ANGLE_RATE_DOWN = None + + LATERAL_FREQUENCY = 50 + + cnt_angle_cmd = 0 + + def setUp(self): + self.__class__.cnt_angle_cmd = 0 + super().setUp() + from opendbc.car.subaru.carcontroller import get_safety_CP + self.VM = VehicleModel(get_safety_CP()) + + def _speed_msg(self, speed): + # speed is in m/s for angle tests, convert to kph for DBC + speed_kph = speed * 3.6 + values = {s: speed_kph for s in ["FR", "FL", "RR", "RL"]} + return self.packer.make_can_msg_safety("Wheel_Speeds", self.ALT_MAIN_BUS, values) + + def _angle_cmd_msg(self, angle, enabled, increment_timer=True): + values = {"LKAS_Output": angle, "LKAS_Request": enabled, "SET_3": 3} + if increment_timer: + self.safety.set_timer(self.cnt_angle_cmd * int(1e6 / self.LATERAL_FREQUENCY)) + self.__class__.cnt_angle_cmd += 1 + return self.packer.make_can_msg_safety("ES_LKAS_ANGLE", SUBARU_MAIN_BUS, values) + + def _angle_meas_msg(self, angle): + values = {"Steering_Angle": angle} + return self.packer.make_can_msg_safety("Steering_2", SUBARU_MAIN_BUS, values) + + def _pcm_status_msg(self, enable): + values = {"Cruise_Activated": enable} + return self.packer.make_can_msg_safety("ES_Brake", self.ALT_MAIN_BUS, values) + + def test_angle_cmd_when_enabled(self): + # VM-based limits are tested below + pass + + def _find_max_allowed_angle_can(self, sign): + """Binary search for the exact max angle CAN value the safety allows.""" + lo, hi = 0, self.STEER_ANGLE_MAX * self.DEG_TO_CAN + 10 + while lo < hi: + mid = (lo + hi + 1) // 2 + self.safety.set_desired_angle_last(mid * sign) + if self._tx(self._angle_cmd_msg(mid / self.DEG_TO_CAN * sign, True)): + lo = mid + else: + hi = mid - 1 + return lo + + def test_lateral_accel_limit(self): + for speed in np.linspace(0, 40, 100): + speed = max(speed, 1) + # match Wheel_Speeds rounding on CAN (factor 0.057 kph) + speed = round_speed(away_round(speed * 3.6 / 0.057) * 0.057 / 3.6) + for sign in (-1, 1): + self.safety.set_controls_allowed(True) + self._reset_speed_measurement(speed + 1) + self._tx(self._angle_cmd_msg(0, True)) + + # find the exact max angle CAN the safety allows + max_angle_can = self._find_max_allowed_angle_can(sign) + + # at limit + self.safety.set_desired_angle_last(max_angle_can * sign) + self.assertTrue(self._tx(self._angle_cmd_msg(max_angle_can / self.DEG_TO_CAN * sign, True))) + + # 1 unit above limit + above_limit_can = max_angle_can + 1 + self.safety.set_desired_angle_last(above_limit_can * sign) + self._tx(self._angle_cmd_msg(above_limit_can / self.DEG_TO_CAN * sign, True)) + + # at low speeds max angle is above STEER_ANGLE_MAX, so adding 1 has no effect + should_tx = max_angle_can >= self.STEER_ANGLE_MAX * self.DEG_TO_CAN + self.assertEqual(should_tx, self._tx(self._angle_cmd_msg(above_limit_can / self.DEG_TO_CAN * sign, True))) + + def _find_max_allowed_delta_can(self, sign): + """Binary search for the exact max angle delta CAN value the safety allows from angle 0.""" + lo, hi = 0, self.STEER_ANGLE_MAX * self.DEG_TO_CAN + while lo < hi: + mid = (lo + hi + 1) // 2 + # reset to angle 0 + self.safety.set_desired_angle_last(0) + if self._tx(self._angle_cmd_msg(mid / self.DEG_TO_CAN * sign, True)): + lo = mid + else: + hi = mid - 1 + return lo + + def test_lateral_jerk_limit(self): + for speed in np.linspace(0, 40, 100): + speed = max(speed, 1) + # match Wheel_Speeds rounding on CAN + speed = round_speed(away_round(speed * 3.6 / 0.057) * 0.057 / 3.6) + for sign in (-1, 1): + self.safety.set_controls_allowed(True) + self._reset_speed_measurement(speed + 1) + self._tx(self._angle_cmd_msg(0, True)) + + # find the exact max delta CAN the safety allows from angle 0 + max_delta_can = self._find_max_allowed_delta_can(sign) + + # Stay within limits + # Up + self.safety.set_desired_angle_last(0) + self.assertTrue(self._tx(self._angle_cmd_msg(max_delta_can / self.DEG_TO_CAN * sign, True))) + + # Don't change + self.assertTrue(self._tx(self._angle_cmd_msg(max_delta_can / self.DEG_TO_CAN * sign, True))) + + # Down + self.assertTrue(self._tx(self._angle_cmd_msg(0, True))) + + # Inject too high rates + # Up + above_delta_can = max_delta_can + 1 + self.assertFalse(self._tx(self._angle_cmd_msg(above_delta_can / self.DEG_TO_CAN * sign, True))) + + # Don't change + self.safety.set_desired_angle_last(round(above_delta_can * sign)) + self.assertTrue(self._tx(self._angle_cmd_msg(above_delta_can / self.DEG_TO_CAN * sign, True))) + + # Down + self.assertFalse(self._tx(self._angle_cmd_msg(0, True))) + + # Recover + self.assertTrue(self._tx(self._angle_cmd_msg(0, True))) + + +class TestSubaruGen1AngleStockLongitudinalSafety(TestSubaruStockLongitudinalSafetyBase, TestSubaruAngleSafetyBase): + FLAGS = SubaruSafetyFlags.LKAS_ANGLE + TX_MSGS = lkas_tx_msgs(SUBARU_MAIN_BUS, SubaruMsg.ES_LKAS_ANGLE) + RELAY_MALFUNCTION_ADDRS = {SUBARU_MAIN_BUS: (SubaruMsg.ES_LKAS_ANGLE, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State, + SubaruMsg.ES_Infotainment)} + FWD_BLACKLISTED_ADDRS = fwd_blacklisted_addr(SubaruMsg.ES_LKAS_ANGLE) + + +class TestSubaruGen2AngleStockLongitudinalSafety(TestSubaruStockLongitudinalSafetyBase, TestSubaruAngleSafetyBase): + ALT_MAIN_BUS = SUBARU_ALT_BUS + FLAGS = SubaruSafetyFlags.GEN2 | SubaruSafetyFlags.LKAS_ANGLE + TX_MSGS = lkas_tx_msgs(SUBARU_ALT_BUS, SubaruMsg.ES_LKAS_ANGLE) + RELAY_MALFUNCTION_ADDRS = {SUBARU_MAIN_BUS: (SubaruMsg.ES_LKAS_ANGLE, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State, + SubaruMsg.ES_Infotainment)} + FWD_BLACKLISTED_ADDRS = fwd_blacklisted_addr(SubaruMsg.ES_LKAS_ANGLE) + + if __name__ == "__main__": unittest.main()