Subaru: Add 2025 Crosstrek angle steering support

Co-Authored-By: Jacob Waller <jacobwaller@users.noreply.github.com>
This commit is contained in:
lukasloetkolben
2026-03-28 12:35:17 -07:00
parent 90a9e4b5bc
commit 42f439fca4
7 changed files with 298 additions and 35 deletions

View File

@@ -1,10 +1,11 @@
import numpy as np
from opendbc.can import CANPacker
from opendbc.car import Bus, make_tester_present_msg
from opendbc.car.lateral import apply_driver_steer_torque_limits, common_fault_avoidance
from opendbc.car.lateral import apply_driver_steer_torque_limits, apply_steer_angle_limits_vm, common_fault_avoidance
from opendbc.car.interfaces import CarControllerBase
from opendbc.car.subaru import subarucan
from opendbc.car.subaru.values import DBC, GLOBAL_ES_ADDR, CanBus, CarControllerParams, SubaruFlags
from opendbc.car.vehicle_model import VehicleModel
# FIXME: These limits aren't exact. The real limit is more than likely over a larger time period and
# involves the total steering angle change rather than rate, but these limits work well for now
@@ -12,10 +13,17 @@ MAX_STEER_RATE = 25 # deg/s
MAX_STEER_RATE_FRAMES = 7 # tx control frames needed before torque can be cut
def get_safety_CP():
# Use the Ascent for lateral limiting to match safety (most restrictive slip factor)
from opendbc.car.subaru.interface import CarInterface
return CarInterface.get_non_essential_params("SUBARU_ASCENT")
class CarController(CarControllerBase):
def __init__(self, dbc_names, CP):
super().__init__(dbc_names, CP)
self.apply_torque_last = 0
self.apply_angle_last = 0
self.cruise_button_prev = 0
self.steer_rate_counter = 0
@@ -23,6 +31,9 @@ class CarController(CarControllerBase):
self.p = CarControllerParams(CP)
self.packer = CANPacker(DBC[CP.carFingerprint][Bus.pt])
if CP.flags & SubaruFlags.LKAS_ANGLE:
self.VM = VehicleModel(get_safety_CP())
def update(self, CC, CS, now_nanos):
actuators = CC.actuators
hud_control = CC.hudControl
@@ -32,30 +43,34 @@ class CarController(CarControllerBase):
# *** steering ***
if (self.frame % self.p.STEER_STEP) == 0:
apply_torque = int(round(actuators.torque * self.p.STEER_MAX))
# limits due to driver torque
new_torque = int(round(apply_torque))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.p)
if not CC.latActive:
apply_torque = 0
if self.CP.flags & SubaruFlags.PREGLOBAL:
can_sends.append(subarucan.create_preglobal_steering_control(self.packer, self.frame // self.p.STEER_STEP, apply_torque, CC.latActive))
if self.CP.flags & SubaruFlags.LKAS_ANGLE:
self.apply_angle_last = apply_steer_angle_limits_vm(actuators.steeringAngleDeg, self.apply_angle_last, CS.out.vEgoRaw,
CS.out.steeringAngleDeg, CC.latActive, CarControllerParams, self.VM)
can_sends.append(subarucan.create_steering_control_angle(self.packer, self.apply_angle_last, CC.latActive))
else:
apply_steer_req = CC.latActive
apply_torque = int(round(actuators.torque * self.p.STEER_MAX))
if self.CP.flags & SubaruFlags.STEER_RATE_LIMITED:
# Steering rate fault prevention
self.steer_rate_counter, apply_steer_req = \
common_fault_avoidance(abs(CS.out.steeringRateDeg) > MAX_STEER_RATE, apply_steer_req,
self.steer_rate_counter, MAX_STEER_RATE_FRAMES)
# limits due to driver torque
new_torque = int(round(apply_torque))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.p)
can_sends.append(subarucan.create_steering_control(self.packer, apply_torque, apply_steer_req))
if not CC.latActive:
apply_torque = 0
self.apply_torque_last = apply_torque
if self.CP.flags & SubaruFlags.PREGLOBAL:
can_sends.append(subarucan.create_preglobal_steering_control(self.packer, self.frame // self.p.STEER_STEP, apply_torque, CC.latActive))
else:
apply_steer_req = CC.latActive
if self.CP.flags & SubaruFlags.STEER_RATE_LIMITED:
# Steering rate fault prevention
self.steer_rate_counter, apply_steer_req = \
common_fault_avoidance(abs(CS.out.steeringRateDeg) > MAX_STEER_RATE, apply_steer_req,
self.steer_rate_counter, MAX_STEER_RATE_FRAMES)
can_sends.append(subarucan.create_steering_control(self.packer, apply_torque, apply_steer_req))
self.apply_torque_last = apply_torque
# *** longitudinal ***
@@ -137,8 +152,11 @@ class CarController(CarControllerBase):
can_sends.append(subarucan.create_es_static_2(self.packer))
new_actuators = actuators.as_builder()
new_actuators.torque = self.apply_torque_last / self.p.STEER_MAX
new_actuators.torqueOutputCan = self.apply_torque_last
if self.CP.flags & SubaruFlags.LKAS_ANGLE:
new_actuators.steeringAngleDeg = self.apply_angle_last
else:
new_actuators.torque = self.apply_torque_last / self.p.STEER_MAX
new_actuators.torqueOutputCan = self.apply_torque_last
self.frame += 1
return new_actuators, can_sends

View File

@@ -86,10 +86,6 @@ class CarState(CarStateBase):
if self.CP.flags & (SubaruFlags.HYBRID | SubaruFlags.LKAS_ANGLE):
# ES_DashStatus->Cruise_Activated_Dash is likely intended for the dash display only, as it falls
# during user gas override and at standstill. ES_Status is missing on hybrid, so we use ES_Brake instead
# TODO: ES_Brake->Cruise_Activated has been seen staying high when Crosstrek 2025 angle LKAS user pressed
# brake while engaged at a stop. ES_Status and ES_DashStatus->Signal7 correctly fell, but is either missing or
# always zero on hybrids. Probably need to split angle & hybrid. 0x27 and 0x225 on hybrids may work for them.
ret.cruiseState.enabled = cp_es_brake.vl["ES_Brake"]['Cruise_Activated'] != 0
ret.cruiseState.available = cp_cam.vl["ES_DashStatus"]['Cruise_On'] != 0
else:

View File

@@ -18,7 +18,10 @@ class CarInterface(CarInterfaceBase):
# - replacement for ES_Distance so we can cancel the cruise control
# - to find the Cruise_Activated bit from the car
# - proper panda safety setup (use the correct cruise_activated bit, throttle from Throttle_Hybrid, etc)
ret.dashcamOnly = bool(ret.flags & (SubaruFlags.PREGLOBAL | SubaruFlags.LKAS_ANGLE | SubaruFlags.HYBRID))
ret.dashcamOnly = bool(ret.flags & (SubaruFlags.PREGLOBAL | SubaruFlags.HYBRID))
# LKAS_ANGLE cars are dashcam-only until validated
if ret.flags & SubaruFlags.LKAS_ANGLE and candidate != CAR.SUBARU_CROSSTREK_2025:
ret.dashcamOnly = True
ret.autoResumeSng = False
# Detect infotainment message sent from the camera
@@ -33,6 +36,8 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.subaru)]
if ret.flags & SubaruFlags.GLOBAL_GEN2:
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.GEN2.value
if ret.flags & SubaruFlags.LKAS_ANGLE:
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.LKAS_ANGLE.value
ret.steerLimitTimer = 0.4
ret.steerActuatorDelay = 0.1

View File

@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from enum import Enum, IntFlag
from opendbc.car import Bus, CarSpecs, DbcDict, PlatformConfig, Platforms, uds
from opendbc.car import ACCELERATION_DUE_TO_GRAVITY, Bus, CarSpecs, DbcDict, PlatformConfig, Platforms, uds
from opendbc.car.lateral import AngleSteeringLimits, ISO_LATERAL_ACCEL
from opendbc.car.structs import CarParams
from opendbc.car.docs_definitions import CarFootnote, CarHarness, CarDocs, CarParts, Column
from opendbc.car.fw_query_definitions import FwQueryConfig, Request, StdQueries, p16
@@ -9,9 +10,23 @@ from opendbc.car.fw_query_definitions import FwQueryConfig, Request, StdQueries,
Ecu = CarParams.Ecu
# Add extra tolerance for average banked road since safety doesn't have the roll
AVERAGE_ROAD_ROLL = 0.06 # ~3.4 degrees, 6% superelevation. higher actual roll lowers lateral acceleration
class CarControllerParams:
ANGLE_LIMITS: AngleSteeringLimits = AngleSteeringLimits(
190, # deg, EPS faults above ~200
([], []),
([], []),
MAX_LATERAL_ACCEL=ISO_LATERAL_ACCEL + (ACCELERATION_DUE_TO_GRAVITY * AVERAGE_ROAD_ROLL), # ~3.6 m/s^2
MAX_LATERAL_JERK=3.0 + (ACCELERATION_DUE_TO_GRAVITY * AVERAGE_ROAD_ROLL), # ~3.6 m/s^3
MAX_ANGLE_RATE=5, # deg/frame, comfort rate limit
)
STEER_STEP = 2 # how often we update the steer cmd
def __init__(self, CP):
self.STEER_STEP = 2 # how often we update the steer cmd
self.STEER_DELTA_UP = 50 # torque increase per refresh, 0.8s to max
self.STEER_DELTA_DOWN = 70 # torque decrease per refresh
self.STEER_DRIVER_ALLOWANCE = 60 # allowed driver torque before start limiting
@@ -57,6 +72,7 @@ class SubaruSafetyFlags(IntFlag):
GEN2 = 1
LONG = 2
PREGLOBAL_REVERSED_DRIVER_TORQUE = 4
LKAS_ANGLE = 8
class SubaruFlags(IntFlag):
@@ -211,6 +227,11 @@ class CAR(Platforms):
SUBARU_ASCENT.specs,
flags=SubaruFlags.LKAS_ANGLE,
)
SUBARU_CROSSTREK_2025 = SubaruGen2PlatformConfig(
[SubaruCarDocs("Subaru Crosstrek 2025", "All", car_parts=CarParts.common([CarHarness.subaru_d]))],
CarSpecs(mass=1529, wheelbase=2.67, steerRatio=17),
flags=SubaruFlags.LKAS_ANGLE,
)
SUBARU_VERSION_REQUEST = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]) + \

View File

@@ -14,6 +14,7 @@ legend = ["LAT_ACCEL_FACTOR", "MAX_LAT_ACCEL_MEASURED", "FRICTION"]
"SUBARU_FORESTER_2022" = [nan, 3.0, nan]
"SUBARU_OUTBACK_2023" = [nan, 3.0, nan]
"SUBARU_ASCENT_2023" = [nan, 3.0, nan]
"SUBARU_CROSSTREK_2025" = [nan, 3.0, nan]
# Toyota LTA also has torque
"TOYOTA_RAV4_TSS2_2023" = [nan, 3.0, nan]

View File

@@ -23,9 +23,11 @@
#define MSG_SUBARU_CruiseControl 0x240U
#define MSG_SUBARU_Throttle 0x40U
#define MSG_SUBARU_Steering_Torque 0x119U
#define MSG_SUBARU_Steering_2 0x11aU
#define MSG_SUBARU_Wheel_Speeds 0x13aU
#define MSG_SUBARU_ES_LKAS 0x122U
#define MSG_SUBARU_ES_LKAS_ANGLE 0x124U
#define MSG_SUBARU_ES_Brake 0x220U
#define MSG_SUBARU_ES_Distance 0x221U
#define MSG_SUBARU_ES_Status 0x222U
@@ -70,8 +72,17 @@
{.msg = {{MSG_SUBARU_Brake_Status, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_CruiseControl, alt_bus, 8, 20U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
#define SUBARU_LKAS_ANGLE_RX_CHECKS(alt_bus) \
{.msg = {{MSG_SUBARU_Throttle, SUBARU_MAIN_BUS, 8, 100U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_Steering_Torque, SUBARU_MAIN_BUS, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_Wheel_Speeds, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_Brake_Status, alt_bus, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_ES_Brake, alt_bus, 8, 20U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
{.msg = {{MSG_SUBARU_Steering_2, SUBARU_MAIN_BUS, 8, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, \
static bool subaru_gen2 = false;
static bool subaru_longitudinal = false;
static bool subaru_lkas_angle = false;
static uint32_t subaru_get_checksum(const CANPacket_t *msg) {
return (uint8_t)msg->data[0];
@@ -100,8 +111,19 @@ static void subaru_rx_hook(const CANPacket_t *msg) {
update_sample(&torque_driver, torque_driver_new);
}
// Steering angle measurement for LKAS_ANGLE cars
if (subaru_lkas_angle && (msg->addr == MSG_SUBARU_Steering_2) && (msg->bus == SUBARU_MAIN_BUS)) {
int angle_meas_new = GET_BYTES(msg, 3, 3) & 0x1FFFFU;
angle_meas_new = -1 * to_signed(angle_meas_new, 17);
update_sample(&angle_meas, angle_meas_new);
}
// enter controls on rising edge of ACC, exit controls on ACC off
if ((msg->addr == MSG_SUBARU_CruiseControl) && (msg->bus == alt_main_bus)) {
if (subaru_lkas_angle && (msg->addr == MSG_SUBARU_ES_Brake) && (msg->bus == alt_main_bus)) {
bool cruise_engaged = (msg->data[4] >> 7) & 1U;
pcm_cruise_check(cruise_engaged);
}
if (!subaru_lkas_angle && (msg->addr == MSG_SUBARU_CruiseControl) && (msg->bus == alt_main_bus)) {
bool cruise_engaged = (msg->data[5] >> 1) & 1U;
pcm_cruise_check(cruise_engaged);
}
@@ -144,7 +166,29 @@ static bool subaru_tx_hook(const CANPacket_t *msg) {
bool tx = true;
bool violation = false;
// steer cmd checks
// angle steer cmd checks
if (msg->addr == MSG_SUBARU_ES_LKAS_ANGLE) {
// Based on SUBARU_ASCENT (most restrictive slip factor)
const AngleSteeringLimits SUBARU_ANGLE_STEERING_LIMITS = {
.max_angle = 190 * 100,
.angle_deg_to_can = 100.,
.frequency = 50U,
};
const AngleSteeringParams SUBARU_STEERING_PARAMS = {
.slip_factor = -0.000580374471400815,
.steer_ratio = 13.5,
.wheelbase = 2.89,
};
int desired_angle = GET_BYTES(msg, 5, 3) & 0x1FFFFU;
desired_angle = -1 * to_signed(desired_angle, 17);
bool lkas_request = GET_BIT(msg, 12U);
violation |= steer_angle_cmd_checks_vm(desired_angle, lkas_request, SUBARU_ANGLE_STEERING_LIMITS, SUBARU_STEERING_PARAMS);
}
// torque steer cmd checks
if (msg->addr == MSG_SUBARU_ES_LKAS) {
int desired_torque = ((GET_BYTES(msg, 0, 4) >> 16) & 0x1FFFU);
desired_torque = -1 * to_signed(desired_torque, 13);
@@ -220,6 +264,16 @@ static safety_config subaru_init(uint16_t param) {
SUBARU_GEN2_LONG_ADDITIONAL_TX_MSGS()
};
static const CanMsg SUBARU_LKAS_ANGLE_TX_MSGS[] = {
SUBARU_BASE_TX_MSGS(SUBARU_MAIN_BUS, MSG_SUBARU_ES_LKAS_ANGLE)
SUBARU_COMMON_TX_MSGS(SUBARU_MAIN_BUS)
};
static const CanMsg SUBARU_LKAS_ANGLE_GEN2_TX_MSGS[] = {
SUBARU_BASE_TX_MSGS(SUBARU_ALT_BUS, MSG_SUBARU_ES_LKAS_ANGLE)
SUBARU_COMMON_TX_MSGS(SUBARU_ALT_BUS)
};
static RxCheck subaru_rx_checks[] = {
SUBARU_COMMON_RX_CHECKS(SUBARU_MAIN_BUS)
};
@@ -228,9 +282,19 @@ static safety_config subaru_init(uint16_t param) {
SUBARU_COMMON_RX_CHECKS(SUBARU_ALT_BUS)
};
static RxCheck subaru_lkas_angle_rx_checks[] = {
SUBARU_LKAS_ANGLE_RX_CHECKS(SUBARU_MAIN_BUS)
};
static RxCheck subaru_lkas_angle_gen2_rx_checks[] = {
SUBARU_LKAS_ANGLE_RX_CHECKS(SUBARU_ALT_BUS)
};
const uint16_t SUBARU_PARAM_GEN2 = 1;
const uint16_t SUBARU_PARAM_LKAS_ANGLE = 8;
subaru_gen2 = GET_FLAG(param, SUBARU_PARAM_GEN2);
subaru_lkas_angle = GET_FLAG(param, SUBARU_PARAM_LKAS_ANGLE);
#ifdef ALLOW_DEBUG
const uint16_t SUBARU_PARAM_LONGITUDINAL = 2;
@@ -238,7 +302,10 @@ static safety_config subaru_init(uint16_t param) {
#endif
safety_config ret;
if (subaru_gen2) {
if (subaru_lkas_angle) {
ret = subaru_gen2 ? BUILD_SAFETY_CFG(subaru_lkas_angle_gen2_rx_checks, SUBARU_LKAS_ANGLE_GEN2_TX_MSGS) : \
BUILD_SAFETY_CFG(subaru_lkas_angle_rx_checks, SUBARU_LKAS_ANGLE_TX_MSGS);
} else if (subaru_gen2) {
ret = subaru_longitudinal ? BUILD_SAFETY_CFG(subaru_gen2_rx_checks, SUBARU_GEN2_LONG_TX_MSGS) : \
BUILD_SAFETY_CFG(subaru_gen2_rx_checks, SUBARU_GEN2_TX_MSGS);
} else {

View File

@@ -1,13 +1,16 @@
#!/usr/bin/env python3
import enum
import unittest
import numpy as np
from functools import partial
from opendbc.car.subaru.values import SubaruSafetyFlags
from opendbc.car.structs import CarParams
from opendbc.car.vehicle_model import VehicleModel
from opendbc.safety.tests.libsafety import libsafety_py
import opendbc.safety.tests.common as common
from opendbc.safety.tests.common import CANPackerSafety
from functools import partial
from opendbc.safety.tests.common import CANPackerSafety, round_speed, away_round
class SubaruMsg(enum.IntEnum):
@@ -233,5 +236,157 @@ class TestSubaruGen2LongitudinalSafety(TestSubaruLongitudinalSafetyBase, TestSub
self.assertFalse(self._tx(self._es_uds_msg(msg)))
class TestSubaruAngleSafetyBase(TestSubaruSafetyBase, common.AngleSteeringSafetyTest):
STEER_ANGLE_MAX = 190
DEG_TO_CAN = 100
# VM-based limits, not breakpoint-based
ANGLE_RATE_BP = None
ANGLE_RATE_UP = None
ANGLE_RATE_DOWN = None
LATERAL_FREQUENCY = 50
cnt_angle_cmd = 0
def setUp(self):
self.__class__.cnt_angle_cmd = 0
super().setUp()
from opendbc.car.subaru.carcontroller import get_safety_CP
self.VM = VehicleModel(get_safety_CP())
def _speed_msg(self, speed):
# speed is in m/s for angle tests, convert to kph for DBC
speed_kph = speed * 3.6
values = {s: speed_kph for s in ["FR", "FL", "RR", "RL"]}
return self.packer.make_can_msg_safety("Wheel_Speeds", self.ALT_MAIN_BUS, values)
def _angle_cmd_msg(self, angle, enabled, increment_timer=True):
values = {"LKAS_Output": angle, "LKAS_Request": enabled, "SET_3": 3}
if increment_timer:
self.safety.set_timer(self.cnt_angle_cmd * int(1e6 / self.LATERAL_FREQUENCY))
self.__class__.cnt_angle_cmd += 1
return self.packer.make_can_msg_safety("ES_LKAS_ANGLE", SUBARU_MAIN_BUS, values)
def _angle_meas_msg(self, angle):
values = {"Steering_Angle": angle}
return self.packer.make_can_msg_safety("Steering_2", SUBARU_MAIN_BUS, values)
def _pcm_status_msg(self, enable):
values = {"Cruise_Activated": enable}
return self.packer.make_can_msg_safety("ES_Brake", self.ALT_MAIN_BUS, values)
def test_angle_cmd_when_enabled(self):
# VM-based limits are tested below
pass
def _find_max_allowed_angle_can(self, sign):
"""Binary search for the exact max angle CAN value the safety allows."""
lo, hi = 0, self.STEER_ANGLE_MAX * self.DEG_TO_CAN + 10
while lo < hi:
mid = (lo + hi + 1) // 2
self.safety.set_desired_angle_last(mid * sign)
if self._tx(self._angle_cmd_msg(mid / self.DEG_TO_CAN * sign, True)):
lo = mid
else:
hi = mid - 1
return lo
def test_lateral_accel_limit(self):
for speed in np.linspace(0, 40, 100):
speed = max(speed, 1)
# match Wheel_Speeds rounding on CAN (factor 0.057 kph)
speed = round_speed(away_round(speed * 3.6 / 0.057) * 0.057 / 3.6)
for sign in (-1, 1):
self.safety.set_controls_allowed(True)
self._reset_speed_measurement(speed + 1)
self._tx(self._angle_cmd_msg(0, True))
# find the exact max angle CAN the safety allows
max_angle_can = self._find_max_allowed_angle_can(sign)
# at limit
self.safety.set_desired_angle_last(max_angle_can * sign)
self.assertTrue(self._tx(self._angle_cmd_msg(max_angle_can / self.DEG_TO_CAN * sign, True)))
# 1 unit above limit
above_limit_can = max_angle_can + 1
self.safety.set_desired_angle_last(above_limit_can * sign)
self._tx(self._angle_cmd_msg(above_limit_can / self.DEG_TO_CAN * sign, True))
# at low speeds max angle is above STEER_ANGLE_MAX, so adding 1 has no effect
should_tx = max_angle_can >= self.STEER_ANGLE_MAX * self.DEG_TO_CAN
self.assertEqual(should_tx, self._tx(self._angle_cmd_msg(above_limit_can / self.DEG_TO_CAN * sign, True)))
def _find_max_allowed_delta_can(self, sign):
"""Binary search for the exact max angle delta CAN value the safety allows from angle 0."""
lo, hi = 0, self.STEER_ANGLE_MAX * self.DEG_TO_CAN
while lo < hi:
mid = (lo + hi + 1) // 2
# reset to angle 0
self.safety.set_desired_angle_last(0)
if self._tx(self._angle_cmd_msg(mid / self.DEG_TO_CAN * sign, True)):
lo = mid
else:
hi = mid - 1
return lo
def test_lateral_jerk_limit(self):
for speed in np.linspace(0, 40, 100):
speed = max(speed, 1)
# match Wheel_Speeds rounding on CAN
speed = round_speed(away_round(speed * 3.6 / 0.057) * 0.057 / 3.6)
for sign in (-1, 1):
self.safety.set_controls_allowed(True)
self._reset_speed_measurement(speed + 1)
self._tx(self._angle_cmd_msg(0, True))
# find the exact max delta CAN the safety allows from angle 0
max_delta_can = self._find_max_allowed_delta_can(sign)
# Stay within limits
# Up
self.safety.set_desired_angle_last(0)
self.assertTrue(self._tx(self._angle_cmd_msg(max_delta_can / self.DEG_TO_CAN * sign, True)))
# Don't change
self.assertTrue(self._tx(self._angle_cmd_msg(max_delta_can / self.DEG_TO_CAN * sign, True)))
# Down
self.assertTrue(self._tx(self._angle_cmd_msg(0, True)))
# Inject too high rates
# Up
above_delta_can = max_delta_can + 1
self.assertFalse(self._tx(self._angle_cmd_msg(above_delta_can / self.DEG_TO_CAN * sign, True)))
# Don't change
self.safety.set_desired_angle_last(round(above_delta_can * sign))
self.assertTrue(self._tx(self._angle_cmd_msg(above_delta_can / self.DEG_TO_CAN * sign, True)))
# Down
self.assertFalse(self._tx(self._angle_cmd_msg(0, True)))
# Recover
self.assertTrue(self._tx(self._angle_cmd_msg(0, True)))
class TestSubaruGen1AngleStockLongitudinalSafety(TestSubaruStockLongitudinalSafetyBase, TestSubaruAngleSafetyBase):
FLAGS = SubaruSafetyFlags.LKAS_ANGLE
TX_MSGS = lkas_tx_msgs(SUBARU_MAIN_BUS, SubaruMsg.ES_LKAS_ANGLE)
RELAY_MALFUNCTION_ADDRS = {SUBARU_MAIN_BUS: (SubaruMsg.ES_LKAS_ANGLE, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State,
SubaruMsg.ES_Infotainment)}
FWD_BLACKLISTED_ADDRS = fwd_blacklisted_addr(SubaruMsg.ES_LKAS_ANGLE)
class TestSubaruGen2AngleStockLongitudinalSafety(TestSubaruStockLongitudinalSafetyBase, TestSubaruAngleSafetyBase):
ALT_MAIN_BUS = SUBARU_ALT_BUS
FLAGS = SubaruSafetyFlags.GEN2 | SubaruSafetyFlags.LKAS_ANGLE
TX_MSGS = lkas_tx_msgs(SUBARU_ALT_BUS, SubaruMsg.ES_LKAS_ANGLE)
RELAY_MALFUNCTION_ADDRS = {SUBARU_MAIN_BUS: (SubaruMsg.ES_LKAS_ANGLE, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State,
SubaruMsg.ES_Infotainment)}
FWD_BLACKLISTED_ADDRS = fwd_blacklisted_addr(SubaruMsg.ES_LKAS_ANGLE)
if __name__ == "__main__":
unittest.main()