mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-23 12:12:07 +08:00
@@ -2,9 +2,9 @@ from cereal import car
|
||||
from common.numpy_fast import clip
|
||||
from opendbc.can.packer import CANPacker
|
||||
from selfdrive.car import apply_std_steer_angle_limits
|
||||
from selfdrive.car.ford.fordcan import create_acc_msg, create_acc_ui_msg, create_button_msg, create_lat_ctl_msg, \
|
||||
create_lat_ctl2_msg, create_lka_msg, create_lkas_ui_msg
|
||||
from selfdrive.car.ford.values import CANBUS, CANFD_CARS, CarControllerParams
|
||||
from selfdrive.car.ford.fordcan import CanBus, create_acc_msg, create_acc_ui_msg, create_button_msg, \
|
||||
create_lat_ctl_msg, create_lat_ctl2_msg, create_lka_msg, create_lkas_ui_msg
|
||||
from selfdrive.car.ford.values import CANFD_CARS, CarControllerParams
|
||||
|
||||
LongCtrlState = car.CarControl.Actuators.LongControlState
|
||||
VisualAlert = car.CarControl.HUDControl.VisualAlert
|
||||
@@ -27,6 +27,7 @@ class CarController:
|
||||
self.CP = CP
|
||||
self.VM = VM
|
||||
self.packer = CANPacker(dbc_name)
|
||||
self.CAN = CanBus(CP)
|
||||
self.frame = 0
|
||||
|
||||
self.apply_curvature_last = 0
|
||||
@@ -45,15 +46,15 @@ class CarController:
|
||||
|
||||
### acc buttons ###
|
||||
if CC.cruiseControl.cancel:
|
||||
can_sends.append(create_button_msg(self.packer, CS.buttons_stock_values, cancel=True))
|
||||
can_sends.append(create_button_msg(self.packer, CS.buttons_stock_values, cancel=True, bus=CANBUS.main))
|
||||
can_sends.append(create_button_msg(self.packer, self.CAN.camera, CS.buttons_stock_values, cancel=True))
|
||||
can_sends.append(create_button_msg(self.packer, self.CAN.main, CS.buttons_stock_values, cancel=True))
|
||||
elif CC.cruiseControl.resume and (self.frame % CarControllerParams.BUTTONS_STEP) == 0:
|
||||
can_sends.append(create_button_msg(self.packer, CS.buttons_stock_values, resume=True))
|
||||
can_sends.append(create_button_msg(self.packer, CS.buttons_stock_values, resume=True, bus=CANBUS.main))
|
||||
can_sends.append(create_button_msg(self.packer, self.CAN.camera, CS.buttons_stock_values, resume=True))
|
||||
can_sends.append(create_button_msg(self.packer, self.CAN.main, CS.buttons_stock_values, resume=True))
|
||||
# if stock lane centering isn't off, send a button press to toggle it off
|
||||
# the stock system checks for steering pressed, and eventually disengages cruise control
|
||||
elif CS.acc_tja_status_stock_values["Tja_D_Stat"] != 0 and (self.frame % CarControllerParams.ACC_UI_STEP) == 0:
|
||||
can_sends.append(create_button_msg(self.packer, CS.buttons_stock_values, tja_toggle=True))
|
||||
can_sends.append(create_button_msg(self.packer, self.CAN.camera, CS.buttons_stock_values, tja_toggle=True))
|
||||
|
||||
### lateral control ###
|
||||
# send steer msg at 20Hz
|
||||
@@ -71,13 +72,13 @@ class CarController:
|
||||
# TODO: extended mode
|
||||
mode = 1 if CC.latActive else 0
|
||||
counter = (self.frame // CarControllerParams.STEER_STEP) % 0xF
|
||||
can_sends.append(create_lat_ctl2_msg(self.packer, mode, 0., 0., -apply_curvature, 0., counter))
|
||||
can_sends.append(create_lat_ctl2_msg(self.packer, self.CAN, mode, 0., 0., -apply_curvature, 0., counter))
|
||||
else:
|
||||
can_sends.append(create_lat_ctl_msg(self.packer, CC.latActive, 0., 0., -apply_curvature, 0.))
|
||||
can_sends.append(create_lat_ctl_msg(self.packer, self.CAN, CC.latActive, 0., 0., -apply_curvature, 0.))
|
||||
|
||||
# send lka msg at 33Hz
|
||||
if (self.frame % CarControllerParams.LKA_STEP) == 0:
|
||||
can_sends.append(create_lka_msg(self.packer))
|
||||
can_sends.append(create_lka_msg(self.packer, self.CAN))
|
||||
|
||||
### longitudinal control ###
|
||||
# send acc msg at 50Hz
|
||||
@@ -89,16 +90,16 @@ class CarController:
|
||||
gas = CarControllerParams.INACTIVE_GAS
|
||||
|
||||
stopping = CC.actuators.longControlState == LongCtrlState.stopping
|
||||
can_sends.append(create_acc_msg(self.packer, CC.longActive, gas, accel, stopping))
|
||||
can_sends.append(create_acc_msg(self.packer, self.CAN, CC.longActive, gas, accel, stopping))
|
||||
|
||||
### ui ###
|
||||
send_ui = (self.main_on_last != main_on) or (self.lkas_enabled_last != CC.latActive) or (self.steer_alert_last != steer_alert)
|
||||
# send lkas ui msg at 1Hz or if ui state changes
|
||||
if (self.frame % CarControllerParams.LKAS_UI_STEP) == 0 or send_ui:
|
||||
can_sends.append(create_lkas_ui_msg(self.packer, main_on, CC.latActive, steer_alert, hud_control, CS.lkas_status_stock_values))
|
||||
can_sends.append(create_lkas_ui_msg(self.packer, self.CAN, main_on, CC.latActive, steer_alert, hud_control, CS.lkas_status_stock_values))
|
||||
# send acc ui msg at 5Hz or if ui state changes
|
||||
if (self.frame % CarControllerParams.ACC_UI_STEP) == 0 or send_ui:
|
||||
can_sends.append(create_acc_ui_msg(self.packer, self.CP, main_on, CC.latActive,
|
||||
can_sends.append(create_acc_ui_msg(self.packer, self.CAN, self.CP, main_on, CC.latActive,
|
||||
CS.out.cruiseState.standstill, hud_control,
|
||||
CS.acc_tja_status_stock_values))
|
||||
|
||||
|
||||
@@ -3,7 +3,8 @@ from common.conversions import Conversions as CV
|
||||
from opendbc.can.can_define import CANDefine
|
||||
from opendbc.can.parser import CANParser
|
||||
from selfdrive.car.interfaces import CarStateBase
|
||||
from selfdrive.car.ford.values import CANBUS, DBC, CarControllerParams
|
||||
from selfdrive.car.ford.fordcan import CanBus
|
||||
from selfdrive.car.ford.values import DBC, CarControllerParams
|
||||
|
||||
GearShifter = car.CarState.GearShifter
|
||||
TransmissionType = car.CarParams.TransmissionType
|
||||
@@ -211,7 +212,7 @@ class CarState(CarStateBase):
|
||||
("Side_Detect_R_Stat", 5),
|
||||
]
|
||||
|
||||
return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, CANBUS.main)
|
||||
return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, CanBus(CP).main)
|
||||
|
||||
@staticmethod
|
||||
def get_cam_can_parser(CP):
|
||||
@@ -268,4 +269,4 @@ class CarState(CarStateBase):
|
||||
("IPMA_Data", 1),
|
||||
]
|
||||
|
||||
return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, CANBUS.camera)
|
||||
return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, CanBus(CP).camera)
|
||||
|
||||
@@ -1,9 +1,26 @@
|
||||
from cereal import car
|
||||
from selfdrive.car.ford.values import CANBUS
|
||||
from selfdrive.car import CanBusBase
|
||||
|
||||
HUDControl = car.CarControl.HUDControl
|
||||
|
||||
|
||||
class CanBus(CanBusBase):
|
||||
def __init__(self, CP=None, fingerprint=None) -> None:
|
||||
super().__init__(CP, fingerprint)
|
||||
|
||||
@property
|
||||
def main(self) -> int:
|
||||
return self.offset
|
||||
|
||||
@property
|
||||
def radar(self):
|
||||
return self.offset + 1
|
||||
|
||||
@property
|
||||
def camera(self):
|
||||
return self.offset + 2
|
||||
|
||||
|
||||
def calculate_lat_ctl2_checksum(mode: int, counter: int, dat: bytearray):
|
||||
curvature = (dat[2] << 3) | ((dat[3]) >> 5)
|
||||
curvature_rate = (dat[6] << 3) | ((dat[7]) >> 5)
|
||||
@@ -17,7 +34,7 @@ def calculate_lat_ctl2_checksum(mode: int, counter: int, dat: bytearray):
|
||||
return 0xFF - (checksum & 0xFF)
|
||||
|
||||
|
||||
def create_lka_msg(packer):
|
||||
def create_lka_msg(packer, CAN: CanBus):
|
||||
"""
|
||||
Creates an empty CAN message for the Ford LKA Command.
|
||||
|
||||
@@ -26,10 +43,10 @@ def create_lka_msg(packer):
|
||||
Frequency is 33Hz.
|
||||
"""
|
||||
|
||||
return packer.make_can_msg("Lane_Assist_Data1", CANBUS.main, {})
|
||||
return packer.make_can_msg("Lane_Assist_Data1", CAN.main, {})
|
||||
|
||||
|
||||
def create_lat_ctl_msg(packer, lat_active: bool, path_offset: float, path_angle: float, curvature: float,
|
||||
def create_lat_ctl_msg(packer, CAN: CanBus, lat_active: bool, path_offset: float, path_angle: float, curvature: float,
|
||||
curvature_rate: float):
|
||||
"""
|
||||
Creates a CAN message for the Ford TJA/LCA Command.
|
||||
@@ -66,10 +83,10 @@ def create_lat_ctl_msg(packer, lat_active: bool, path_offset: float, path_angle:
|
||||
"LatCtlCurv_NoRate_Actl": curvature_rate, # Curvature rate [-0.001024|0.00102375] 1/meter^2
|
||||
"LatCtlCurv_No_Actl": curvature, # Curvature [-0.02|0.02094] 1/meter
|
||||
}
|
||||
return packer.make_can_msg("LateralMotionControl", CANBUS.main, values)
|
||||
return packer.make_can_msg("LateralMotionControl", CAN.main, values)
|
||||
|
||||
|
||||
def create_lat_ctl2_msg(packer, mode: int, path_offset: float, path_angle: float, curvature: float,
|
||||
def create_lat_ctl2_msg(packer, CAN: CanBus, mode: int, path_offset: float, path_angle: float, curvature: float,
|
||||
curvature_rate: float, counter: int):
|
||||
"""
|
||||
Create a CAN message for the new Ford Lane Centering command.
|
||||
@@ -95,13 +112,13 @@ def create_lat_ctl2_msg(packer, mode: int, path_offset: float, path_angle: float
|
||||
}
|
||||
|
||||
# calculate checksum
|
||||
dat = packer.make_can_msg("LateralMotionControl2", CANBUS.main, values)[2]
|
||||
dat = packer.make_can_msg("LateralMotionControl2", 0, values)[2]
|
||||
values["LatCtlPath_No_Cs"] = calculate_lat_ctl2_checksum(mode, counter, dat)
|
||||
|
||||
return packer.make_can_msg("LateralMotionControl2", CANBUS.main, values)
|
||||
return packer.make_can_msg("LateralMotionControl2", CAN.main, values)
|
||||
|
||||
|
||||
def create_acc_msg(packer, long_active: bool, gas: float, accel: float, stopping: bool):
|
||||
def create_acc_msg(packer, CAN: CanBus, long_active: bool, gas: float, accel: float, stopping: bool):
|
||||
"""
|
||||
Creates a CAN message for the Ford ACC Command.
|
||||
|
||||
@@ -122,10 +139,10 @@ def create_acc_msg(packer, long_active: bool, gas: float, accel: float, stopping
|
||||
"AccBrkDecel_B_Rq": 1 if decel else 0, # Deceleration request: 0=Inactive, 1=Active
|
||||
"AccStopStat_B_Rq": 1 if stopping else 0,
|
||||
}
|
||||
return packer.make_can_msg("ACCDATA", CANBUS.main, values)
|
||||
return packer.make_can_msg("ACCDATA", CAN.main, values)
|
||||
|
||||
|
||||
def create_acc_ui_msg(packer, CP, main_on: bool, enabled: bool, standstill: bool, hud_control,
|
||||
def create_acc_ui_msg(packer, CAN: CanBus, CP, main_on: bool, enabled: bool, standstill: bool, hud_control,
|
||||
stock_values: dict):
|
||||
"""
|
||||
Creates a CAN message for the Ford IPC adaptive cruise, forward collision warning and traffic jam
|
||||
@@ -197,10 +214,11 @@ def create_acc_ui_msg(packer, CP, main_on: bool, enabled: bool, standstill: bool
|
||||
"AccTGap_D_Dsply": 4, # Fixed time gap in UI
|
||||
})
|
||||
|
||||
return packer.make_can_msg("ACCDATA_3", CANBUS.main, values)
|
||||
return packer.make_can_msg("ACCDATA_3", CAN.main, values)
|
||||
|
||||
|
||||
def create_lkas_ui_msg(packer, main_on: bool, enabled: bool, steer_alert: bool, hud_control, stock_values: dict):
|
||||
def create_lkas_ui_msg(packer, CAN: CanBus, main_on: bool, enabled: bool, steer_alert: bool, hud_control,
|
||||
stock_values: dict):
|
||||
"""
|
||||
Creates a CAN message for the Ford IPC IPMA/LKAS status.
|
||||
|
||||
@@ -263,11 +281,10 @@ def create_lkas_ui_msg(packer, main_on: bool, enabled: bool, steer_alert: bool,
|
||||
"LaActvStats_D_Dsply": lines, # LKAS status (lines) [0|31]
|
||||
"LaHandsOff_D_Dsply": hands_on_wheel_dsply, # 0=HandsOn, 1=Level1 (w/o chime), 2=Level2 (w/ chime), 3=Suppressed
|
||||
})
|
||||
return packer.make_can_msg("IPMA_Data", CANBUS.main, values)
|
||||
return packer.make_can_msg("IPMA_Data", CAN.main, values)
|
||||
|
||||
|
||||
def create_button_msg(packer, stock_values: dict, cancel=False, resume=False, tja_toggle=False,
|
||||
bus: int = CANBUS.camera):
|
||||
def create_button_msg(packer, bus: int, stock_values: dict, cancel=False, resume=False, tja_toggle=False):
|
||||
"""
|
||||
Creates a CAN message for the Ford SCCM buttons/switches.
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from cereal import car
|
||||
from panda import Panda
|
||||
from common.conversions import Conversions as CV
|
||||
from selfdrive.car import STD_CARGO_KG, get_safety_config
|
||||
from selfdrive.car.ford.fordcan import CanBus
|
||||
from selfdrive.car.ford.values import CAR, Ecu
|
||||
from selfdrive.car.interfaces import CarInterfaceBase
|
||||
|
||||
@@ -14,7 +15,6 @@ class CarInterface(CarInterfaceBase):
|
||||
@staticmethod
|
||||
def _get_params(ret, candidate, fingerprint, car_fw, experimental_long, docs):
|
||||
ret.carName = "ford"
|
||||
ret.safetyConfigs = [get_safety_config(car.CarParams.SafetyModel.ford)]
|
||||
|
||||
# These cars are dashcam only for lack of test coverage.
|
||||
# Once a user confirms each car works and a test route is
|
||||
@@ -26,9 +26,15 @@ class CarInterface(CarInterfaceBase):
|
||||
ret.steerActuatorDelay = 0.2
|
||||
ret.steerLimitTimer = 1.0
|
||||
|
||||
CAN = CanBus(fingerprint=fingerprint)
|
||||
cfgs = [get_safety_config(car.CarParams.SafetyModel.ford)]
|
||||
if CAN.main >= 4:
|
||||
cfgs.insert(0, get_safety_config(car.CarParams.SafetyModel.noOutput))
|
||||
ret.safetyConfigs = cfgs
|
||||
|
||||
ret.experimentalLongitudinalAvailable = True
|
||||
if experimental_long:
|
||||
ret.safetyConfigs[0].safetyParam |= Panda.FLAG_FORD_LONG_CONTROL
|
||||
ret.safetyConfigs[-1].safetyParam |= Panda.FLAG_FORD_LONG_CONTROL
|
||||
ret.openpilotLongitudinalControl = True
|
||||
|
||||
if candidate == CAR.BRONCO_SPORT_MK1:
|
||||
@@ -61,7 +67,7 @@ class CarInterface(CarInterfaceBase):
|
||||
|
||||
# Auto Transmission: 0x732 ECU or Gear_Shift_by_Wire_FD1
|
||||
found_ecus = [fw.ecu for fw in car_fw]
|
||||
if Ecu.shiftByWire in found_ecus or 0x5A in fingerprint[0] or docs:
|
||||
if Ecu.shiftByWire in found_ecus or 0x5A in fingerprint[CAN.main] or docs:
|
||||
ret.transmissionType = TransmissionType.automatic
|
||||
else:
|
||||
ret.transmissionType = TransmissionType.manual
|
||||
@@ -69,7 +75,7 @@ class CarInterface(CarInterfaceBase):
|
||||
|
||||
# BSM: Side_Detect_L_Stat, Side_Detect_R_Stat
|
||||
# TODO: detect bsm in car_fw?
|
||||
ret.enableBsm = 0x3A6 in fingerprint[0] and 0x3A7 in fingerprint[0]
|
||||
ret.enableBsm = 0x3A6 in fingerprint[CAN.main] and 0x3A7 in fingerprint[CAN.main]
|
||||
|
||||
# LCA can steer down to zero
|
||||
ret.minSteerSpeed = 0.
|
||||
|
||||
@@ -3,7 +3,8 @@ from math import cos, sin
|
||||
from cereal import car
|
||||
from opendbc.can.parser import CANParser
|
||||
from common.conversions import Conversions as CV
|
||||
from selfdrive.car.ford.values import CANBUS, DBC, RADAR
|
||||
from selfdrive.car.ford.fordcan import CanBus
|
||||
from selfdrive.car.ford.values import DBC, RADAR
|
||||
from selfdrive.car.interfaces import RadarInterfaceBase
|
||||
|
||||
DELPHI_ESR_RADAR_MSGS = list(range(0x500, 0x540))
|
||||
@@ -12,16 +13,16 @@ DELPHI_MRR_RADAR_START_ADDR = 0x120
|
||||
DELPHI_MRR_RADAR_MSG_COUNT = 64
|
||||
|
||||
|
||||
def _create_delphi_esr_radar_can_parser():
|
||||
def _create_delphi_esr_radar_can_parser(CP) -> CANParser:
|
||||
msg_n = len(DELPHI_ESR_RADAR_MSGS)
|
||||
signals = list(zip(['X_Rel'] * msg_n + ['Angle'] * msg_n + ['V_Rel'] * msg_n,
|
||||
DELPHI_ESR_RADAR_MSGS * 3))
|
||||
checks = list(zip(DELPHI_ESR_RADAR_MSGS, [20] * msg_n))
|
||||
|
||||
return CANParser(RADAR.DELPHI_ESR, signals, checks, CANBUS.radar)
|
||||
return CANParser(RADAR.DELPHI_ESR, signals, checks, CanBus(CP).radar)
|
||||
|
||||
|
||||
def _create_delphi_mrr_radar_can_parser():
|
||||
def _create_delphi_mrr_radar_can_parser(CP) -> CANParser:
|
||||
signals = []
|
||||
checks = []
|
||||
|
||||
@@ -37,7 +38,7 @@ def _create_delphi_mrr_radar_can_parser():
|
||||
]
|
||||
checks += [(msg, 20)]
|
||||
|
||||
return CANParser(RADAR.DELPHI_MRR, signals, checks, CANBUS.radar)
|
||||
return CANParser(RADAR.DELPHI_MRR, signals, checks, CanBus(CP).radar)
|
||||
|
||||
|
||||
class RadarInterface(RadarInterfaceBase):
|
||||
@@ -50,11 +51,11 @@ class RadarInterface(RadarInterfaceBase):
|
||||
if self.radar is None or CP.radarUnavailable:
|
||||
self.rcp = None
|
||||
elif self.radar == RADAR.DELPHI_ESR:
|
||||
self.rcp = _create_delphi_esr_radar_can_parser()
|
||||
self.rcp = _create_delphi_esr_radar_can_parser(CP)
|
||||
self.trigger_msg = DELPHI_ESR_RADAR_MSGS[-1]
|
||||
self.valid_cnt = {key: 0 for key in DELPHI_ESR_RADAR_MSGS}
|
||||
elif self.radar == RADAR.DELPHI_MRR:
|
||||
self.rcp = _create_delphi_mrr_radar_can_parser()
|
||||
self.rcp = _create_delphi_mrr_radar_can_parser(CP)
|
||||
self.trigger_msg = DELPHI_MRR_RADAR_START_ADDR + DELPHI_MRR_RADAR_MSG_COUNT - 1
|
||||
else:
|
||||
raise ValueError(f"Unsupported radar: {self.radar}")
|
||||
|
||||
@@ -38,12 +38,6 @@ class CarControllerParams:
|
||||
pass
|
||||
|
||||
|
||||
class CANBUS:
|
||||
main = 0
|
||||
radar = 1
|
||||
camera = 2
|
||||
|
||||
|
||||
class CAR:
|
||||
BRONCO_SPORT_MK1 = "FORD BRONCO SPORT 1ST GEN"
|
||||
ESCAPE_MK4 = "FORD ESCAPE 4TH GEN"
|
||||
|
||||
Reference in New Issue
Block a user