This commit is contained in:
1okko
2026-05-10 07:59:33 +08:00
parent 54e6124925
commit 5c0feefabd
482 changed files with 125673 additions and 6849 deletions
+121 -70
View File
@@ -1,34 +1,23 @@
# python library to interface with panda
import os
import re
import sys
import time
import usb1
import struct
import hashlib
import binascii
import ctypes
from functools import wraps, partial
from itertools import accumulate
import opendbc
from opendbc.car.structs import CarParams
from .base import BaseHandle
from .constants import BASEDIR, FW_PATH, McuType, compute_version_hash
from .constants import FW_PATH, McuType
from .dfu import PandaDFU
from .spi import PandaSpiHandle, PandaSpiException, PandaProtocolMismatch
from .usb import PandaUsbHandle
from .utils import logger
# load libusb from pip package
try:
import libusb_package
usb1._libusb1.loadLibrary(ctypes.CDLL(str(libusb_package.get_library_path())))
except ImportError:
# TODO: remove this on next AGNOS update
pass
__version__ = '0.0.10'
CANPACKET_HEAD_SIZE = 0x6
@@ -43,22 +32,16 @@ def calculate_checksum(data):
res ^= b
return res
def _parse_c_struct(path, name):
type_to_format = {"uint8_t": "B", "uint16_t": "H", "uint32_t": "I", "float": "f"}
with open(path) as f:
lines = [l.strip() for l in f.read().split(f"struct __attribute__((packed)) {name} {{", 1)[1].split("};", 1)[0].splitlines() if l.strip()]
fields = [re.fullmatch(rf"({'|'.join(type_to_format)})\s+\w+;", l) for l in lines]
if not all(fields):
raise ValueError(f"unsupported {name} layout in {path}")
return struct.Struct("<" + "".join(type_to_format[m[1]] for m in fields))
def pack_can_buffer(arr, chunk=False, fd=False):
snds = [bytearray(), ]
def pack_can_buffer(arr, fd=False):
snds = [b'']
for address, dat, bus in arr:
assert len(dat) in LEN_TO_DLC
#logger.debug(" W 0x%x: 0x%s", address, dat.hex())
extended = 1 if address >= 0x800 else 0
data_len_code = LEN_TO_DLC[len(dat)]
header = bytearray(CANPACKET_HEAD_SIZE)
word_4b = (address << 3) | (extended << 2)
word_4b = address << 3 | extended << 2
header[0] = (data_len_code << 4) | (bus << 1) | int(fd)
header[1] = word_4b & 0xFF
header[2] = (word_4b >> 8) & 0xFF
@@ -66,10 +49,9 @@ def pack_can_buffer(arr, chunk=False, fd=False):
header[4] = (word_4b >> 24) & 0xFF
header[5] = calculate_checksum(header[:5] + dat)
snds[-1].extend(header)
snds[-1].extend(dat)
if chunk and len(snds[-1]) > 256:
snds.append(bytearray())
snds[-1] += header + dat
if len(snds[-1]) > 256: # Limit chunks to 256 bytes
snds.append(b'')
return snds
@@ -115,6 +97,7 @@ def ensure_version(desc, lib_field, panda_field, fn):
return fn(self, *args, **kwargs)
return wrapper
ensure_can_packet_version = partial(ensure_version, "CAN", "CAN_PACKET_VERSION", "can_version")
ensure_can_health_packet_version = partial(ensure_version, "CAN health", "CAN_HEALTH_PACKET_VERSION", "can_health_version")
ensure_health_packet_version = partial(ensure_version, "health", "HEALTH_PACKET_VERSION", "health_version")
@@ -122,6 +105,9 @@ ensure_health_packet_version = partial(ensure_version, "health", "HEALTH_PACKET_
class Panda:
SERIAL_DEBUG = 0
SERIAL_ESP = 1
SERIAL_LIN1 = 2
SERIAL_LIN2 = 3
SERIAL_SOM_DEBUG = 4
USB_VIDS = (0xbbaa, 0x3801) # 0x3801 is comma's registered VID
@@ -129,22 +115,36 @@ class Panda:
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
# from https://github.com/commaai/openpilot/blob/103b4df18cbc38f4129555ab8b15824d1a672bdf/cereal/log.capnp#L648
HW_TYPE_UNKNOWN = b'\x00'
HW_TYPE_WHITE_PANDA = b'\x01'
HW_TYPE_GREY_PANDA = b'\x02'
HW_TYPE_BLACK_PANDA = b'\x03'
HW_TYPE_PEDAL = b'\x04'
HW_TYPE_UNO = b'\x05'
HW_TYPE_DOS = b'\x06'
HW_TYPE_RED_PANDA = b'\x07'
HW_TYPE_RED_PANDA_V2 = b'\x08'
HW_TYPE_TRES = b'\x09'
HW_TYPE_CUATRO = b'\x0a'
HW_TYPE_BODY = b'\xb1'
CAN_PACKET_VERSION = compute_version_hash(os.path.join(opendbc.INCLUDE_PATH, "opendbc/safety/can.h"))
HEALTH_PACKET_VERSION = compute_version_hash(os.path.join(BASEDIR, "board/health.h"))
HEALTH_STRUCT = _parse_c_struct(os.path.join(BASEDIR, "board/health.h"), "health_t")
CAN_PACKET_VERSION = 4
HEALTH_PACKET_VERSION = 17
CAN_HEALTH_PACKET_VERSION = 5
HEALTH_STRUCT = struct.Struct("<IIIIIIIIBBBBBHBBBHfBBHHHB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIIHHBBBIIII")
H7_DEVICES = [HW_TYPE_RED_PANDA, HW_TYPE_TRES, HW_TYPE_CUATRO, HW_TYPE_BODY]
SUPPORTED_DEVICES = H7_DEVICES
F4_DEVICES = [HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS]
H7_DEVICES = [HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES, HW_TYPE_CUATRO]
INTERNAL_DEVICES = (HW_TYPE_TRES, HW_TYPE_CUATRO)
INTERNAL_DEVICES = (HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_TRES, HW_TYPE_CUATRO)
HAS_OBD = (HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES, HW_TYPE_CUATRO)
MAX_FAN_RPMs = {
HW_TYPE_UNO: 5100,
HW_TYPE_DOS: 6500,
HW_TYPE_TRES: 6600,
HW_TYPE_CUATRO: 12500,
}
HARNESS_STATUS_NC = 0
HARNESS_STATUS_NORMAL = 1
@@ -159,10 +159,11 @@ class Panda:
self._can_speed_kbps = can_speed_kbps
if cli and serial is None:
self._connect_serial = self._cli_select_panda()
self._connect_serial = self._cli_select_panda()
else:
self._connect_serial = serial
self._connect_serial = serial
# connect and set mcu type
self.connect(claim)
def _cli_select_panda(self):
@@ -216,10 +217,31 @@ class Panda:
if self._handle is None:
raise Exception("failed to connect to panda")
# Some fallback logic to determine panda and MCU type for old bootstubs,
# since we now support multiple MCUs and need to know which fw to flash.
# Three cases to consider:
# A) oldest bootstubs don't have any way to distinguish
# MCU or panda type
# B) slightly newer (~2 weeks after first C3's built) bootstubs
# have the panda type set in the USB bcdDevice
# C) latest bootstubs also implement the endpoint for panda type
self._bcd_hw_type = None
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
# rick - UNO to DOS for lite
if ret == bytearray(b'\x05'):
ret = bytearray(b'\x06')
missing_hw_type_endpoint = self.bootstub and ret.startswith(b'\xff\x00\xc1\x3e\xde\xad\xd0\x0d')
if missing_hw_type_endpoint and bcd is not None:
self._bcd_hw_type = bcd
# For case A, we assume F4 MCU type, since all H7 pandas should be case B at worst
self._assume_f4_mcu = (self._bcd_hw_type is None) and missing_hw_type_endpoint
self._serial = serial
self._connect_serial = serial
self._handle_open = True
self.health_version, self.can_version = self.get_packets_versions()
self._mcu_type = self.get_mcu_type()
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
logger.debug("connected")
# disable openpilot's heartbeat checks
@@ -292,7 +314,7 @@ class Panda:
handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
handle.setAutoDetachKernelDriver(True)
if claim or sys.platform == "darwin":
if claim:
handle.claimInterface(0)
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
@@ -348,9 +370,6 @@ class Panda:
return []
def reset(self, enter_bootstub=False, enter_bootloader=False, reconnect=True):
if enter_bootstub or enter_bootloader:
assert (hw_type := self.get_type()) in self.SUPPORTED_DEVICES, f"Unknown HW: {hw_type}"
# no response is expected since it resets right away
timeout = 5000 if isinstance(self._handle, PandaSpiHandle) else 15000
try:
@@ -430,14 +449,12 @@ class Panda:
pass
def flash(self, fn=None, code=None, reconnect=True):
assert (hw_type := self.get_type()) in self.SUPPORTED_DEVICES, f"Unknown HW: {hw_type}"
if self.up_to_date(fn=fn):
logger.info("flash: already up to date")
return
if not fn:
fn = os.path.join(FW_PATH, McuType.H7.config.app_fn)
fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn)
assert os.path.isfile(fn)
logger.debug("flash: main version is %s", self.get_version())
if not self.bootstub:
@@ -452,7 +469,7 @@ class Panda:
logger.debug("flash: bootstub version is %s", self.get_version())
# do flash
Panda.flash_static(self._handle, code, mcu_type=McuType.H7)
Panda.flash_static(self._handle, code, mcu_type=self._mcu_type)
# reconnect
if reconnect:
@@ -488,22 +505,22 @@ class Panda:
dfu_list = PandaDFU.list()
return True
@classmethod
def wait_for_panda(cls, serial: str | None, timeout: int) -> bool:
@staticmethod
def wait_for_panda(serial: str | None, timeout: int) -> bool:
t_start = time.monotonic()
serials = cls.list()
serials = Panda.list()
while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials):
logger.debug("waiting for panda...")
time.sleep(0.1)
if timeout is not None and (time.monotonic() - t_start) > timeout:
return False
serials = cls.list()
serials = Panda.list()
return True
def up_to_date(self, fn=None) -> bool:
current = self.get_signature()
if fn is None:
fn = os.path.join(FW_PATH, McuType.H7.config.app_fn)
fn = os.path.join(FW_PATH, self.get_mcu_type().config.app_fn)
expected = Panda.get_signature_from_firmware(fn)
return (current == expected)
@@ -542,12 +559,9 @@ class Panda:
"sbu1_voltage_mV": a[22],
"sbu2_voltage_mV": a[23],
"som_reset_triggered": a[24],
"sound_output_level": a[25],
"controls_allowed_lateral": a[26],
"controls_allowed_longitudinal": a[27],
}
@ensure_health_packet_version
@ensure_can_health_packet_version
def can_health(self, can_number):
LEC_ERROR_CODE = {
0: "No error",
@@ -607,13 +621,41 @@ class Panda:
return bytes(part_1 + part_2)
def get_type(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
# rick - UNO to DOS for lite
if ret == bytearray(b'\x05'):
ret = bytearray(b'\x06')
# old bootstubs don't implement this endpoint, see comment in Panda.device
if self._bcd_hw_type is not None and (ret is None or len(ret) != 1):
ret = self._bcd_hw_type
return ret
# Returns tuple with health packet version and CAN packet/USB packet version
def get_packets_versions(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xdd, 0, 0, 8)
if dat and len(dat) == 8:
return struct.unpack("<II", dat)
return (0, 0)
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xdd, 0, 0, 3)
if dat and len(dat) == 3:
a = struct.unpack("BBB", dat)
return (a[0], a[1], a[2])
else:
return (0, 0, 0)
def get_mcu_type(self) -> McuType:
hw_type = self.get_type()
if hw_type in Panda.F4_DEVICES:
return McuType.F4
elif hw_type in Panda.H7_DEVICES:
return McuType.H7
else:
# have to assume F4, see comment in Panda.connect
if self._assume_f4_mcu:
return McuType.F4
raise ValueError(f"unknown HW type: {hw_type}")
def has_obd(self):
return self.get_type() in Panda.HAS_OBD
def is_internal(self):
return self.get_type() in Panda.INTERNAL_DEVICES
@@ -635,7 +677,7 @@ class Panda:
return self._serial
def get_dfu_serial(self):
return PandaDFU.st_serial_to_dfu_serial(self._serial, McuType.H7)
return PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)
def get_uid(self):
"""
@@ -653,15 +695,12 @@ class Panda:
# ******************* configuration *******************
def set_alternative_experience(self, alternative_experience, safety_param_sp=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdf, int(alternative_experience), int(safety_param_sp), b'')
def set_alternative_experience(self, alternative_experience):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdf, int(alternative_experience), 0, b'')
def set_power_save(self, power_save_enabled=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe7, int(power_save_enabled), 0, b'')
def enter_stop_mode(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb5, 0, 0, b'', expect_disconnect=True)
def set_safety_mode(self, mode=CarParams.SafetyModel.silent, param=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, param, b'')
@@ -710,7 +749,7 @@ class Panda:
@ensure_can_packet_version
def can_send_many(self, arr, *, fd=False, timeout=CAN_SEND_TIMEOUT_MS):
snds = pack_can_buffer(arr, chunk=(not self.spi), fd=fd)
snds = pack_can_buffer(arr, fd=fd)
for tx in snds:
while len(tx) > 0:
bs = self._handle.bulkWrite(3, tx, timeout=timeout)
@@ -762,8 +801,18 @@ class Panda:
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20])
return ret
def send_heartbeat(self, engaged=True, engaged_mads=True):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, engaged, engaged_mads, b'')
def serial_clear(self, port_number):
"""Clears all messages (tx and rx) from the specified internal uart
ringbuffer as though it were drained.
Args:
port_number (int): port number of the uart to clear.
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
def send_heartbeat(self, engaged=True):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, engaged, 0, b'')
# disable heartbeat checks for use outside of openpilot
# sending a heartbeat will reenable the checks
@@ -793,6 +842,8 @@ class Panda:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'')
# ****************** Debug *****************
def set_green_led(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')
# arr: timer period
# ccrN: channel N pulse length