Merge branch commaai/panda:master into branch sunnypilot/panda:master-new

This commit is contained in:
DevTekVE
2025-05-25 19:26:06 +02:00
committed by discountchubbs
72 changed files with 189 additions and 1517 deletions

View File

View File

@@ -1,43 +0,0 @@
#!/usr/bin/env python3
import time
from contextlib import contextmanager
from panda import Panda, PandaDFU
from panda.tests.hitl.helpers import get_random_can_messages
@contextmanager
def print_time(desc):
start = time.perf_counter()
yield
end = time.perf_counter()
print(f"{end - start:.2f}s - {desc}")
if __name__ == "__main__":
with print_time("Panda()"):
p = Panda()
with print_time("PandaDFU.list()"):
PandaDFU.list()
fxn = [
'reset',
'reconnect',
'up_to_date',
'health',
#'flash',
]
for f in fxn:
with print_time(f"Panda.{f}()"):
getattr(p, f)()
p.set_can_loopback(True)
for n in range(6):
msgs = get_random_can_messages(int(10**n))
with print_time(f"Panda.can_send_many() - {len(msgs)} msgs"):
p.can_send_many(msgs)
with print_time("Panda.can_recv()"):
m = p.can_recv()

View File

@@ -1,157 +0,0 @@
#!/usr/bin/env python3
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
black_panda = None
other_panda = None
# find out which one is black
if pandas[0].is_black() and not pandas[1].is_black():
black_panda = pandas[0]
other_panda = pandas[1]
elif not pandas[0].is_black() and pandas[1].is_black():
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors)
# Toggle relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(1)
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(1)
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_obd(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
_ = black_panda.can_recv() # can echo
cans_loop = other_panda.can_recv()
else:
_ = other_panda.can_recv() # can echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
print(" Loop on bus", str(loop[2]))
loop_buses.append(loop[2])
if len(cans_loop) == 0:
print(" No loop")
assert not os.getenv("NOASSERT")
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
assert not os.getenv("NOASSERT")
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@@ -1,165 +0,0 @@
#!/usr/bin/env python3
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
black_panda = None
other_panda = None
# find out which one is black
if pandas[0].is_black() and not pandas[1].is_black():
black_panda = pandas[0]
other_panda = pandas[1]
elif not pandas[0].is_black() and pandas[1].is_black():
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
start_time = time.time()
temp_start_time = start_time
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
runtime = time.time() - start_time
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors,
"Content errors:", content_errors, "Runtime: ", runtime)
if (time.time() - temp_start_time) > 3600 * 6:
# Toggle relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(1)
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(1)
temp_start_time = time.time()
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_obd(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
_ = black_panda.can_recv() # cans echo
cans_loop = other_panda.can_recv()
else:
_ = other_panda.can_recv() # cans echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
print(" Loop on bus", str(loop[2]))
loop_buses.append(loop[2])
if len(cans_loop) == 0:
print(" No loop")
assert os.getenv("NOASSERT")
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
assert os.getenv("NOASSERT")
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@@ -1,136 +0,0 @@
#!/usr/bin/env python3
# Relay test with loopback between black panda (+ harness and power) and white/grey panda
# Tests the relay switching multiple times / second by looking at the buses on which loop occurs.
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
open_errors = 0
closed_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter, open_errors, closed_errors
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
# Switch on relay
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0])):
open_errors += 1
raise Exception("Open error")
# Switch off relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0, 2])):
closed_errors += 1
raise Exception("Close error")
counter += 1
print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors)
def test_buses(black_panda, other_panda, test_obj):
global content_errors
send_bus, obd, recv_buses = test_obj
black_panda.send_heartbeat()
other_panda.send_heartbeat()
# Set OBD on send panda
other_panda.set_obd(True if obd else None)
# clear and flush
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
other_panda.can_send(at, st, send_bus)
time.sleep(0.05)
# check for receive
_ = other_panda.can_recv() # can echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
loop_buses.append(loop[2])
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
return False
else:
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env python3
import os
import time
import threading
from typing import Any
from opendbc.car.structs import CarParams
from panda import Panda
JUNGLE = "JUNGLE" in os.environ
if JUNGLE:
from panda import PandaJungle
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000
def flood_tx(panda):
print('Sending!')
msg = b"\xaa" * 4
packet = [[0xaa, msg, 0], [0xaa, msg, 1], [0xaa, msg, 2]] * NUM_MESSAGES_PER_BUS
panda.can_send_many(packet, timeout=10000)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")
if __name__ == "__main__":
serials = Panda.list()
if JUNGLE:
sender = Panda()
receiver = PandaJungle()
else:
if len(serials) != 2:
raise Exception("Connect two pandas to perform this test!")
sender = Panda(serials[0])
receiver = Panda(serials[1]) # type: ignore
receiver.set_safety_mode(CarParams.SafetyModel.allOutput)
sender.set_safety_mode(CarParams.SafetyModel.allOutput)
# Start transmisson
threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can in a few second time period
rx: list[Any] = []
old_len = 0
start_time = time.time()
while time.time() - start_time < 3 or len(rx) > old_len:
old_len = len(rx)
print(old_len)
rx.extend(receiver.can_recv())
print(f"Received {len(rx)} messages")

View File

@@ -1,29 +0,0 @@
#!/usr/bin/env python3
import time
import re
from panda import Panda
RED = '\033[91m'
GREEN = '\033[92m'
def colorize_errors(value):
if isinstance(value, str):
if re.search(r'(?i)No error', value):
return f'{GREEN}{value}\033[0m'
elif re.search(r'(?i)(?<!No error\s)(err|error)', value):
return f'{RED}{value}\033[0m'
return str(value)
if __name__ == "__main__":
panda = Panda()
while True:
print(chr(27) + "[2J") # clear screen
print("Connected to " + ("internal panda" if panda.is_internal() else "External panda") + f" id: {panda.get_serial()[0]}: {panda.get_version()}")
for bus in range(3):
print(f"\nBus {bus}:")
health = panda.can_health(bus)
for key, value in health.items():
print(f"{key}: {colorize_errors(value)} ", end=" ")
print()
time.sleep(1)

View File

@@ -1,46 +0,0 @@
#!/usr/bin/env python3
import os
import time
from collections import defaultdict
import binascii
from opendbc.car.structs import CarParams
from panda import Panda
# fake
def sec_since_boot():
return time.time()
def can_printer():
p = Panda()
print(f"Connected to id: {p.get_serial()[0]}: {p.get_version()}")
time.sleep(1)
p.can_clear(0xFFFF)
p.set_safety_mode(CarParams.SafetyModel.allOutput)
start = sec_since_boot()
lp = sec_since_boot()
all_msgs = defaultdict(list)
canbus = os.getenv("CAN")
if canbus == "3":
p.set_obd(True)
canbus = "1"
while True:
can_recv = p.can_recv()
for addr, dat, bus in can_recv:
if canbus is None or str(bus) == canbus:
all_msgs[(addr, bus)].append((dat))
if sec_since_boot() - lp > 0.1:
dd = chr(27) + "[2J"
dd += "%5.2f\n" % (sec_since_boot() - start)
for (addr, bus), dat_log in sorted(all_msgs.items()):
dd += "%d: %s(%6d): %s\n" % (bus, "%04X(%4d)" % (addr, addr), len(dat_log), binascii.hexlify(dat_log[-1]).decode())
print(dd)
lp = sec_since_boot()
if __name__ == "__main__":
can_printer()

View File

@@ -1,153 +0,0 @@
#!/usr/bin/env python3
import os
import time
import random
from collections import defaultdict
from opendbc.car.structs import CarParams
from panda import Panda, calculate_checksum, DLC_TO_LEN
from panda import PandaJungle
from panda.tests.hitl.helpers import time_many_sends
H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2]
JUNGLE_SERIAL = os.getenv("JUNGLE")
H7_PANDAS_EXCLUDE = [] # type: ignore
if os.getenv("H7_PANDAS_EXCLUDE"):
H7_PANDAS_EXCLUDE = os.getenv("H7_PANDAS_EXCLUDE").strip().split(" ") # type: ignore
def panda_reset():
panda_serials = []
panda_jungle = PandaJungle(JUNGLE_SERIAL)
panda_jungle.set_can_silent(True)
panda_jungle.set_panda_power(False)
time.sleep(1)
panda_jungle.set_panda_power(True)
time.sleep(4)
for serial in Panda.list():
if serial not in H7_PANDAS_EXCLUDE:
with Panda(serial=serial) as p:
if p.get_type() in H7_HW_TYPES:
p.reset()
panda_serials.append(serial)
print("test pandas", panda_serials)
assert len(panda_serials) == 2, "Two H7 pandas required"
return panda_serials
def panda_init(serial, enable_canfd=False, enable_non_iso=False):
p = Panda(serial=serial)
p.set_power_save(False)
for bus in range(3):
p.set_can_speed_kbps(0, 500)
if enable_canfd:
p.set_can_data_speed_kbps(bus, 2000)
if enable_non_iso:
p.set_canfd_non_iso(bus, True)
p.set_safety_mode(CarParams.SafetyModel.allOutput)
return p
def test_canfd_throughput(p, p_recv=None):
two_pandas = p_recv is not None
p.set_safety_mode(CarParams.SafetyModel.allOutput)
if two_pandas:
p_recv.set_safety_mode(CarParams.SafetyModel.allOutput)
# enable output mode
else:
p.set_can_loopback(True)
tests = [
[500, 1000, 2000], # speeds
[93, 87, 78], # saturation thresholds
]
for i in range(len(tests[0])):
# set bus 0 data speed to speed
p.set_can_data_speed_kbps(0, tests[0][i])
if p_recv is not None:
p_recv.set_can_data_speed_kbps(0, tests[0][i])
time.sleep(0.05)
comp_kbps = time_many_sends(p, 0, p_recv=p_recv, msg_count=400, two_pandas=two_pandas, msg_len=64)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
saturation_pct = (comp_kbps / tests[0][i]) * 100.0
assert saturation_pct > tests[1][i]
assert saturation_pct < 100
def canfd_test(p_send, p_recv):
for n in range(100):
sent_msgs = defaultdict(set)
to_send = []
for _ in range(200):
bus = random.randrange(3)
for dlc in range(len(DLC_TO_LEN)):
address = random.randrange(1, 1<<29)
data = bytearray(random.getrandbits(8) for _ in range(DLC_TO_LEN[dlc]))
if len(data) >= 2:
data[0] = calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8"))
to_send.append([address, data, bus])
sent_msgs[bus].add((address, bytes(data)))
p_send.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while (time.monotonic() - start_time < 1) and any(len(x) > 0 for x in sent_msgs.values()):
incoming = p_recv.can_recv()
for msg in incoming:
address, data, bus = msg
if len(data) >= 2:
assert calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8")) == data[0]
k = (address, bytes(data))
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[bus].discard(k)
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {n}: bus {bus} missing {len(sent_msgs[bus])} messages"
def setup_test(enable_non_iso=False):
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=False, enable_non_iso=enable_non_iso)
p_recv = panda_init(panda_serials[1], enable_canfd=True, enable_non_iso=enable_non_iso)
# Check that sending panda CAN FD and BRS are turned off
for bus in range(3):
health = p_send.can_health(bus)
assert not health["canfd_enabled"]
assert not health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
# Receiving panda sends dummy CAN FD message that should enable CAN FD on sender side
for bus in range(3):
p_recv.can_send(0x200, b"dummymessage", bus)
p_recv.can_recv()
p_send.can_recv()
# Check if all tested buses on sending panda have swithed to CAN FD with BRS
for bus in range(3):
health = p_send.can_health(bus)
assert health["canfd_enabled"]
assert health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
return p_send, p_recv
def main():
print("[TEST CAN-FD]")
p_send, p_recv = setup_test()
canfd_test(p_send, p_recv)
print("[TEST CAN-FD non-ISO]")
p_send, p_recv = setup_test(enable_non_iso=True)
canfd_test(p_send, p_recv)
print("[TEST CAN-FD THROUGHPUT]")
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=True)
p_recv = panda_init(panda_serials[1], enable_canfd=True)
test_canfd_throughput(p_send, p_recv)
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
#!/usr/bin/env python3
import subprocess
from collections import defaultdict
def check_space(file, mcu):
MCUS = {
"H7": {
".flash": 1024*1024, # FLASH
".dtcmram": 128*1024, # DTCMRAM
".itcmram": 64*1024, # ITCMRAM
".axisram": 320*1024, # AXI SRAM
".sram12": 32*1024, # SRAM1(16kb) + SRAM2(16kb)
".sram4": 16*1024, # SRAM4
".backup_sram": 4*1024, # SRAM4
},
"F4": {
".flash": 1024*1024, # FLASH
".dtcmram": 256*1024, # RAM
".ram_d1": 64*1024, # RAM2
},
}
IGNORE_LIST = [
".ARM.attributes",
".comment",
".debug_line",
".debug_info",
".debug_abbrev",
".debug_aranges",
".debug_str",
".debug_ranges",
".debug_loc",
".debug_frame",
".debug_line_str",
".debug_rnglists",
".debug_loclists",
]
FLASH = [
".isr_vector",
".text",
".rodata",
".data"
]
RAM = [
".data",
".bss",
"._user_heap_stack" # _user_heap_stack considered free?
]
result = {}
calcs = defaultdict(int)
output = str(subprocess.check_output(f"arm-none-eabi-size -x --format=sysv {file}", shell=True), 'utf-8')
for row in output.split('\n'):
pop = False
line = row.split()
if len(line) == 3 and line[0].startswith('.'):
if line[0] in IGNORE_LIST:
continue
result[line[0]] = [line[1], line[2]]
if line[0] in FLASH:
calcs[".flash"] += int(line[1], 16)
pop = True
if line[0] in RAM:
calcs[".dtcmram"] += int(line[1], 16)
pop = True
if pop:
result.pop(line[0])
if len(result):
for line in result:
calcs[line] += int(result[line][0], 16)
print(f"=======SUMMARY FOR {mcu} FILE {file}=======")
for line in calcs:
if line in MCUS[mcu]:
used_percent = (100 - (MCUS[mcu][line] - calcs[line]) / MCUS[mcu][line] * 100)
print(f"SECTION: {line} size: {MCUS[mcu][line]} USED: {calcs[line]}({used_percent:.2f}%) FREE: {MCUS[mcu][line] - calcs[line]}")
else:
print(line, calcs[line])
print()
if __name__ == "__main__":
# red panda
check_space("../board/obj/bootstub.panda_h7.elf", "H7")
check_space("../board/obj/panda_h7.elf", "H7")
# black panda
check_space("../board/obj/bootstub.panda.elf", "F4")
check_space("../board/obj/panda.elf", "F4")
# jungle v1
check_space("../board/jungle/obj/bootstub.panda_jungle.elf", "F4")
check_space("../board/jungle/obj/panda_jungle.elf", "F4")
# jungle v2
check_space("../board/jungle/obj/bootstub.panda_jungle_h7.elf", "H7")
check_space("../board/jungle/obj/panda_jungle_h7.elf", "H7")

View File

@@ -1,21 +0,0 @@
#!/usr/bin/env bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
OP_ROOT="$DIR/../../"
PANDA_ROOT="$DIR/../"
if [ -z "$BUILD" ]; then
docker pull docker.io/commaai/panda:latest
else
docker build --cache-from docker.io/commaai/panda:latest -t docker.io/commaai/panda:latest -f $PANDA_ROOT/Dockerfile $PANDA_ROOT
fi
docker run \
-it \
--rm \
--volume $OP_ROOT:$OP_ROOT \
--workdir $PWD \
--env PYTHONPATH=$OP_ROOT \
docker.io/commaai/panda:latest \
/bin/bash

View File

@@ -1,62 +0,0 @@
#!/usr/bin/env python3
import os
import sys
import time
import select
import codecs
from panda import Panda
setcolor = ["\033[1;32;40m", "\033[1;31;40m"]
unsetcolor = "\033[00m"
port_number = int(os.getenv("PORT", "0"))
claim = os.getenv("CLAIM") is not None
no_color = os.getenv("NO_COLOR") is not None
no_reconnect = os.getenv("NO_RECONNECT") is not None
if __name__ == "__main__":
while True:
try:
serials = Panda.list()
if os.getenv("SERIAL"):
serials = [x for x in serials if x == os.getenv("SERIAL")]
pandas = [Panda(x, claim=claim) for x in serials]
decoders = [codecs.getincrementaldecoder('utf-8')() for _ in pandas]
if not len(pandas):
print("no pandas found")
if no_reconnect:
sys.exit(0)
time.sleep(1)
continue
if os.getenv("BAUD") is not None:
for panda in pandas:
panda.set_uart_baud(port_number, int(os.getenv("BAUD"))) # type: ignore
while True:
for i, panda in enumerate(pandas):
while True:
ret = panda.serial_read(port_number)
if len(ret) > 0:
decoded = decoders[i].decode(ret)
if no_color:
sys.stdout.write(decoded)
else:
sys.stdout.write(setcolor[i] + decoded + unsetcolor)
sys.stdout.flush()
else:
break
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
ln = sys.stdin.readline()
if claim:
panda.serial_write(port_number, ln)
time.sleep(0.01)
except KeyboardInterrupt:
break
except Exception:
print("panda disconnected!")
time.sleep(0.5)

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env python3
import matplotlib.pyplot as plt # pylint: disable=import-error
HASHING_PRIME = 23
REGISTER_MAP_SIZE = 0x3FF
BYTES_PER_REG = 4
# From ST32F413 datasheet
REGISTER_ADDRESS_REGIONS = [
(0x40000000, 0x40007FFF),
(0x40010000, 0x400107FF),
(0x40011000, 0x400123FF),
(0x40012C00, 0x40014BFF),
(0x40015000, 0x400153FF),
(0x40015800, 0x40015BFF),
(0x40016000, 0x400167FF),
(0x40020000, 0x40021FFF),
(0x40023000, 0x400233FF),
(0x40023800, 0x40023FFF),
(0x40026000, 0x400267FF),
(0x50000000, 0x5003FFFF),
(0x50060000, 0x500603FF),
(0x50060800, 0x50060BFF),
(0x50060800, 0x50060BFF),
(0xE0000000, 0xE00FFFFF)
]
def _hash(reg_addr):
return (((reg_addr >> 16) ^ ((((reg_addr + 1) & 0xFFFF) * HASHING_PRIME) & 0xFFFF)) & REGISTER_MAP_SIZE)
# Calculate hash for each address
hashes = []
double_hashes = []
for (start_addr, stop_addr) in REGISTER_ADDRESS_REGIONS:
for addr in range(start_addr, stop_addr + 1, BYTES_PER_REG):
h = _hash(addr)
hashes.append(h)
double_hashes.append(_hash(h))
# Make histograms
plt.subplot(2, 1, 1)
plt.hist(hashes, bins=REGISTER_MAP_SIZE)
plt.title("Number of collisions per _hash")
plt.xlabel("Address")
plt.subplot(2, 1, 2)
plt.hist(double_hashes, bins=REGISTER_MAP_SIZE)
plt.title("Number of collisions per double _hash")
plt.xlabel("Address")
plt.show()

View File

@@ -1,17 +0,0 @@
#!/usr/bin/env python3
from opendbc.car.structs import CarParams
from panda import Panda
# This script is intended to be used in conjunction with the echo_loopback_test.py test script from panda jungle.
# It sends a reversed response back for every message received containing b"test".
if __name__ == "__main__":
p = Panda()
p.set_safety_mode(CarParams.SafetyModel.allOutput)
p.set_power_save(False)
while True:
incoming = p.can_recv()
for message in incoming:
address, data, bus = message
if b'test' in data:
p.can_send(address, data[::-1], bus)

View File

@@ -1,233 +0,0 @@
#!/usr/bin/env python3
"""Used to Reverse/Test ELM protocol auto detect and OBD message response without a car."""
import os
import sys
import struct
import binascii
import time
import threading
from collections import deque
from opendbc.car.structs import CarParams
from panda import Panda
def lin_checksum(dat):
return sum(dat) % 0x100
class ELMCarSimulator():
def __init__(self, sn, silent=False, can_kbaud=500,
can=True, can11b=True, can29b=True,
lin=True):
self.__p = Panda(sn if sn else Panda.list()[0])
self.__on = True
self.__stop = False
self.__silent = silent
self.__lin_timer = None
self.__lin_active = False
self.__lin_enable = lin
self.__lin_monitor_thread = threading.Thread(target=self.__lin_monitor)
self.__can_multipart_data = None
self.__can_kbaud = can_kbaud
self.__can_extra_noise_msgs = deque()
self.__can_enable = can
self.__can11b = can11b
self.__can29b = can29b
self.__can_monitor_thread = threading.Thread(target=self.__can_monitor)
@property
def panda(self):
return self.__p
def stop(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timeout_handler()
self.__stop = True
def join(self):
if self.__lin_monitor_thread.is_alive():
self.__lin_monitor_thread.join()
if self.__can_monitor_thread.is_alive():
self.__can_monitor_thread.join()
if self.__p:
print("closing handle")
self.__p.close()
def set_enable(self, on):
self.__on = on
def start(self):
self.panda.set_safety_mode(CarParams.SafetyModel.allOutput)
if self.__lin_enable:
self.__lin_monitor_thread.start()
if self.__can_enable:
self.__can_monitor_thread.start()
#########################
# CAN related functions #
#########################
def __can_monitor(self):
print("STARTING CAN THREAD")
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
self.panda.can_recv() # Toss whatever was already there
while not self.__stop:
for address, data, src in self.panda.can_recv():
if self.__on and src == 0 and len(data) == 8 and data[0] >= 2:
if not self.__silent:
print("Processing CAN message", src, hex(address), binascii.hexlify(data))
self.__can_process_msg(data[1], data[2], address, data, src)
elif not self.__silent:
print("Rejecting CAN message", src, hex(address), binascii.hexlify(data))
def can_mode_11b(self):
self.__can11b = True
self.__can29b = False
def can_mode_29b(self):
self.__can11b = False
self.__can29b = True
def can_mode_11b_29b(self):
self.__can11b = True
self.__can29b = True
def change_can_baud(self, kbaud):
self.__can_kbaud = kbaud
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
def can_add_extra_noise(self, noise_msg, addr=None):
self.__can_extra_noise_msgs.append((addr, noise_msg))
def _can_send(self, addr, msg):
if not self.__silent:
print(" CAN Reply (%x)" % addr, binascii.hexlify(msg))
self.panda.can_send(addr, msg + b'\x00' * (8 - len(msg)), 0)
if self.__can_extra_noise_msgs:
noise = self.__can_extra_noise_msgs.popleft()
self.panda.can_send(noise[0] if noise[0] is not None else addr,
noise[1] + b'\x00' * (8 - len(noise[1])), 0)
def _can_addr_matches(self, addr):
if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0):
return True
if self.__can29b and (addr == 0x18db33f1 or (addr & 0x1FFF00FF) == 0x18da00f1):
return True
return False
def __can_process_msg(self, mode, pid, address, data, src):
if not self.__silent:
print("CAN MSG", binascii.hexlify(data[1:1 + data[0]]),
"Addr:", hex(address), "Mode:", hex(mode)[2:].zfill(2),
"PID:", hex(pid)[2:].zfill(2), "canLen:", len(data),
binascii.hexlify(data))
if self._can_addr_matches(address) and len(data) == 8:
outmsg = None
if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data):
if not self.__silent:
print("Request for more data")
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
msgnum = 1
while(self.__can_multipart_data):
datalen = min(7, len(self.__can_multipart_data))
msgpiece = struct.pack("B", 0x20 | msgnum) + self.__can_multipart_data[:datalen]
self._can_send(outaddr, msgpiece)
self.__can_multipart_data = self.__can_multipart_data[7:]
msgnum = (msgnum + 1) % 0x10
time.sleep(0.01)
else:
outmsg = self._process_obd(mode, pid)
if outmsg:
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
if len(outmsg) <= 5:
self._can_send(outaddr,
struct.pack("BBB", len(outmsg) + 2, 0x40 | data[1], pid) + outmsg)
else:
first_msg_len = min(3, len(outmsg) % 7)
payload_len = len(outmsg) + 3
msgpiece = struct.pack("BBBBB", 0x10 | ((payload_len >> 8) & 0xF),
payload_len & 0xFF,
0x40 | data[1], pid, 1) + outmsg[:first_msg_len]
self._can_send(outaddr, msgpiece)
self.__can_multipart_data = outmsg[first_msg_len:]
#########################
# General OBD functions #
#########################
def _process_obd(self, mode, pid):
if mode == 0x01: # Mode: Show current data
if pid == 0x00: # List supported things
return b"\xff\xff\xff\xfe" # b"\xBE\x1F\xB8\x10" #Bitfield, random features
elif pid == 0x01: # Monitor Status since DTC cleared
return b"\x00\x00\x00\x00" # Bitfield, random features
elif pid == 0x04: # Calculated engine load
return b"\x2f"
elif pid == 0x05: # Engine coolant temperature
return b"\x3c"
elif pid == 0x0B: # Intake manifold absolute pressure
return b"\x90"
elif pid == 0x0C: # Engine RPM
return b"\x1A\xF8"
elif pid == 0x0D: # Vehicle Speed
return b"\x53"
elif pid == 0x10: # MAF air flow rate
return b"\x01\xA0"
elif pid == 0x11: # Throttle Position
return b"\x90"
elif pid == 0x33: # Absolute Barometric Pressure
return b"\x90"
elif mode == 0x09: # Mode: Request vehicle information
if pid == 0x02: # Show VIN
return b"1D4GP00R55B123456"
if pid == 0xFC: # test long multi message. Ligned up for LIN responses
return b''.join(struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80))
if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
return b'\xAA\xAA\xAA' + b''.join(parts)
if pid == 0xFE: # test very long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF:
return b'\xAA\x00\x00' + \
b"".join((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584))
#return b"\xAA"*100#(0xFFF-3)
if __name__ == "__main__":
serial = os.getenv("SERIAL") if os.getenv("SERIAL") else None
kbaud = int(os.getenv("CANKBAUD")) if os.getenv("CANKBAUD") else 500 # type: ignore
bitwidth = int(os.getenv("CANBITWIDTH")) if os.getenv("CANBITWIDTH") else 0 # type: ignore
canenable = bool(int(os.getenv("CANENABLE"))) if os.getenv("CANENABLE") else True # type: ignore
linenable = bool(int(os.getenv("LINENABLE"))) if os.getenv("LINENABLE") else True # type: ignore
sim = ELMCarSimulator(serial, can_kbaud=kbaud, can=canenable, lin=linenable)
if(bitwidth == 0):
sim.can_mode_11b_29b()
if(bitwidth == 11):
sim.can_mode_11b()
if(bitwidth == 29):
sim.can_mode_29b()
import signal
def signal_handler(signal, frame):
print('\nShutting down simulator')
sim.stop()
sim.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
sim.start()
signal.pause()

View File

@@ -1,44 +0,0 @@
#!/usr/bin/env python3
import socket
import threading
import select
class Reader(threading.Thread):
def __init__(self, s, *args, **kwargs):
super().__init__(*args, **kwargs)
self._s = s
self.__stop = False
def stop(self):
self.__stop = True
def run(self):
while not self.__stop:
s.recv(1000)
def read_or_fail(s):
ready = select.select([s], [], [], 4)
assert ready[0], "Socket did not receive data within the timeout duration."
return s.recv(1000)
def send_msg(s, msg):
s.send(msg)
res = b''
while not res.endswith(">"):
res += read_or_fail(s)
return res
if __name__ == "__main__":
s = socket.create_connection(("192.168.0.10", 35000))
send_msg(s, b"ATZ\r")
send_msg(s, b"ATL1\r")
print(send_msg(s, b"ATE0\r"))
print(send_msg(s, b"ATS0\r"))
print(send_msg(s, b"ATSP6\r"))
print("\nLOOP\n")
while True:
print(send_msg(s, b"0100\r"))
print(send_msg(s, b"010d\r"))

View File

@@ -1,14 +0,0 @@
#!/usr/bin/env python
import time
from panda import Panda
if __name__ == "__main__":
p = Panda()
power = 0
while True:
p.set_fan_power(power)
time.sleep(5)
print("Power: ", power, "RPM:", str(p.get_fan_rpm()), "Expected:", int(6500 * power / 100))
power += 10
power %= 110

View File

@@ -1,88 +0,0 @@
#!/usr/bin/env python3
import json
import time
import threading
from panda import Panda
def drain_serial(p):
ret = []
while True:
d = p.serial_read(0)
if len(d) == 0:
break
ret.append(d)
return ret
fan_cmd = 0.
def logger(event):
# requires a build with DEBUG_FAN
with Panda(claim=False) as p, open('/tmp/fan_log', 'w') as f:
power = None
target_rpm = None
stall_count = None
rpm_fast = None
t = time.monotonic()
drain_serial(p)
while not event.is_set():
p.set_fan_power(fan_cmd)
for l in drain_serial(p)[::-1]:
ns = l.decode('utf8').strip().split(' ')
if len(ns) == 4:
target_rpm, rpm_fast, power, stall_count = (int(n, 16) for n in ns)
break
dat = {
't': time.monotonic() - t,
'cmd_power': fan_cmd,
'pwm_power': power,
'target_rpm': target_rpm,
'rpm_fast': rpm_fast,
'rpm': p.get_fan_rpm(),
'stall_counter': stall_count,
'total_stall_count': p.health()['fan_stall_count'],
}
f.write(json.dumps(dat) + '\n')
time.sleep(1/16.)
p.set_fan_power(0)
def get_overshoot_rpm(p, power):
global fan_cmd
# make sure the fan is stopped completely
fan_cmd = 0.
while p.get_fan_rpm() > 100:
time.sleep(0.1)
time.sleep(3)
# set it to 30% power to mimic going onroad
fan_cmd = power
max_rpm = 0
max_power = 0
for _ in range(70):
max_rpm = max(max_rpm, p.get_fan_rpm())
max_power = max(max_power, p.health()['fan_power'])
time.sleep(0.1)
# tolerate 10% overshoot
expected_rpm = Panda.MAX_FAN_RPMs[bytes(p.get_type())] * power / 100
overshoot = (max_rpm / expected_rpm) - 1
return overshoot, max_rpm, max_power
if __name__ == "__main__":
event = threading.Event()
threading.Thread(target=logger, args=(event, )).start()
try:
p = Panda()
for power in range(10, 101, 10):
overshoot, max_rpm, max_power = get_overshoot_rpm(p, power)
print(f"Fan power {power}%: overshoot {overshoot:.2%}, Max RPM {max_rpm}, Max power {max_power}%")
finally:
event.set()

View File

@@ -1,7 +0,0 @@
#!/usr/bin/env python3
from panda import Panda
if __name__ == "__main__":
for p in Panda.list():
pp = Panda(p)
print(f"{pp.get_serial()[0]}: {pp.get_version()}")

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python3
import time
from panda import Panda
if __name__ == "__main__":
i = 0
pi = 0
panda = Panda()
while True:
st = time.monotonic()
while time.monotonic() - st < 1:
panda.health()
i += 1
print(i, panda.health(), "\n")
print(f"Speed: {i - pi}Hz")
pi = i

View File

@@ -38,10 +38,10 @@ def test_hw_type(p):
def test_heartbeat(p, panda_jungle):
panda_jungle.set_ignition(True)
# TODO: add more cases here once the tests aren't super slow
p.set_safety_mode(mode=CarParams.SafetyModel.hyundai, param=HyundaiSafetyFlags.FLAG_HYUNDAI_LONG)
p.set_safety_mode(mode=CarParams.SafetyModel.hyundai, param=HyundaiSafetyFlags.LONG)
p.send_heartbeat()
assert p.health()['safety_mode'] == CarParams.SafetyModel.hyundai
assert p.health()['safety_param'] == HyundaiSafetyFlags.FLAG_HYUNDAI_LONG
assert p.health()['safety_param'] == HyundaiSafetyFlags.LONG
# shouldn't do anything once we're in a car safety mode
p.set_heartbeat_disabled()

View File

@@ -1,14 +0,0 @@
#!/usr/bin/env python3
import time
from panda import Panda
power = 0
if __name__ == "__main__":
p = Panda()
while True:
p.set_ir_power(power)
print("Power: ", str(power))
time.sleep(1)
power += 10
power %= 100

View File

@@ -1,10 +1,15 @@
import opendbc
import platform
CC = 'gcc'
system = platform.system()
if system == 'Darwin':
# gcc installed by homebrew has version suffix (e.g. gcc-12) in order to be
# distinguishable from system one - which acts as a symlink to clang
mac_ver = platform.mac_ver()
# gcc installed by homebrew has version suffix (e.g. gcc-12) in order to be
# distinguishable from system one - which acts as a symlink to clang
# clang works on macOS 15 and greater but has issues on earlier macOS versions.
# see: https://github.com/commaai/openpilot/issues/35093
if system == 'Darwin' and mac_ver[0] and mac_ver[0] < '15':
CC += '-13'
env = Environment(
@@ -16,23 +21,11 @@ env = Environment(
'-Wfatal-errors',
'-Wno-pointer-to-int-cast',
],
CPPPATH=[".", "../../board/", "../../../opendbc/safety/"],
CPPPATH=[".", "../../board/", opendbc.INCLUDE_PATH],
)
if system == "Darwin":
env.PrependENVPath('PATH', '/opt/homebrew/bin')
if GetOption('mutation'):
env['CC'] = 'clang-17'
flags = [
'-fprofile-instr-generate',
'-fcoverage-mapping',
'-fpass-plugin=/usr/lib/mull-ir-frontend-17',
'-g',
'-grecord-command-line',
]
env['CFLAGS'] += flags
env['LINKFLAGS'] += flags
if GetOption('ubsan'):
flags = [
"-fsanitize=undefined",
@@ -43,12 +36,3 @@ if GetOption('ubsan'):
panda = env.SharedObject("panda.os", "panda.c")
libpanda = env.SharedLibrary("libpanda.so", [panda])
if GetOption('coverage'):
env.Append(
CFLAGS=["-fprofile-arcs", "-ftest-coverage", "-fprofile-abs-path",],
LIBS=["gcov"],
)
# GCC note file is generated by compiler, ensure we build it, and allow scons to clean it up
AlwaysBuild(panda)
env.SideEffect("panda.gcno", panda)

View File

@@ -15,7 +15,7 @@ void can_tx_comms_resume_spi(void) { };
#include "faults.h"
#include "libc.h"
#include "boards/board_declarations.h"
#include "safety.h"
#include "opendbc/safety/safety.h"
#include "main_definitions.h"
#include "drivers/can_common.h"

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
import os
import time
import random
import argparse
from itertools import permutations
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
def run_test(sleep_duration):
pandas = Panda.list()
print(pandas)
if len(pandas) < 2:
raise Exception("Minimum two pandas are needed for test")
run_test_w_pandas(pandas, sleep_duration)
def run_test_w_pandas(pandas, sleep_duration):
h = [Panda(x) for x in pandas]
print("H", h)
for hh in h:
hh.set_safety_mode(CarParams.SafetyModel.allOutput)
# test both directions
for ho in permutations(list(range(len(h))), r=2):
print("***************** TESTING", ho)
panda0, panda1 = h[ho[0]], h[ho[1]]
# **** test health packet ****
print("health", ho[0], h[ho[0]].health())
# **** test can line loopback ****
for bus, obd in [(0, False), (1, False), (2, False), (1, True), (2, True)]:
print("\ntest can", bus)
# flush
cans_echo = panda0.can_recv()
cans_loop = panda1.can_recv()
panda0.set_obd(None)
panda1.set_obd(None)
if obd is True:
panda0.set_obd(bus)
panda1.set_obd(bus)
bus = 3
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
panda0.can_send(at, st, bus)
time.sleep(0.1)
# check for receive
cans_echo = panda0.can_recv()
cans_loop = panda1.can_recv()
print("Bus", bus, "echo", cans_echo, "loop", cans_loop)
assert len(cans_echo) == 1
assert len(cans_loop) == 1
assert cans_echo[0][0] == at
assert cans_loop[0][0] == at
assert cans_echo[0][1] == st
assert cans_loop[0][1] == st
assert cans_echo[0][2] == 0x80 | bus
if cans_loop[0][2] != bus:
print("EXPECTED %d GOT %d" % (bus, cans_loop[0][2]))
assert cans_loop[0][2] == bus
print("CAN pass", bus, ho)
time.sleep(sleep_duration)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@@ -1,71 +0,0 @@
#!/usr/bin/env python3
import os
import usb1
import time
import struct
import itertools
import threading
from typing import Any
from opendbc.car.structs import CarParams
from panda import Panda
JUNGLE = "JUNGLE" in os.environ
if JUNGLE:
from panda import PandaJungle
# Generate unique messages
NUM_MESSAGES_PER_BUS = 10000
messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)]
tx_messages = list(itertools.chain.from_iterable([[0xaa, msg, 0], [0xaa, msg, 1], [0xaa, msg, 2]] for msg in messages))
def flood_tx(panda):
print('Sending!')
transferred = 0
while True:
try:
print(f"Sending block {transferred}-{len(tx_messages)}: ", end="")
panda.can_send_many(tx_messages[transferred:], timeout=10)
print("OK")
break
except usb1.USBErrorTimeout as e:
transferred += (e.transferred // 16)
print("timeout, transferred: ", transferred)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")
if __name__ == "__main__":
serials = Panda.list()
receiver: Panda | PandaJungle
if JUNGLE:
sender = Panda()
receiver = PandaJungle()
else:
if len(serials) != 2:
raise Exception("Connect two pandas to perform this test!")
sender = Panda(serials[0])
receiver = Panda(serials[1])
receiver.set_safety_mode(CarParams.SafetyModel.allOutput)
sender.set_safety_mode(CarParams.SafetyModel.allOutput)
# Start transmisson
threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can, and stop when there hasn't been anything for a second
rx: list[Any] = []
old_len = 0
last_change = time.monotonic()
while time.monotonic() - last_change < 1:
if old_len < len(rx):
last_change = time.monotonic()
old_len = len(rx)
rx.extend(receiver.can_recv())
print(f"Received {len(rx)} messages")
# Check if we received everything
for bus in range(3):
received_msgs = {bytes(m[1]) for m in filter(lambda m, b=bus: m[2] == b, rx)} # type: ignore
dropped_msgs = set(messages).difference(received_msgs)
print(f"Bus {bus} dropped msgs: {len(list(dropped_msgs))} / {len(messages)}")

View File

@@ -3,6 +3,7 @@ set -e
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
PANDA_DIR=$(realpath $DIR/../../)
OPENDBC_ROOT=$(python3 -c "import opendbc; print(opendbc.INCLUDE_PATH)")
GREEN="\e[1;32m"
YELLOW="\e[1;33m"
@@ -47,7 +48,7 @@ cppcheck() {
$CPPCHECK_DIR/cppcheck --inline-suppr -I $PANDA_DIR/board/ \
-I "$(arm-none-eabi-gcc -print-file-name=include)" \
-I $PANDA_DIR/board/stm32f4/inc/ -I $PANDA_DIR/board/stm32h7/inc/ \
-I $PANDA_DIR/../opendbc/safety/ \
-I $OPENDBC_ROOT \
--suppressions-list=$DIR/suppressions.txt --suppress=*:*inc/* \
--suppress=*:*include/* --error-exitcode=2 --check-level=exhaustive --safety \
--platform=arm32-wchar_t4 $COMMON_DEFINES --checkers-report=$CHECKLIST.tmp \

View File

@@ -64,11 +64,13 @@ assert len(files) > 70, all(d in files for d in ('board/main.c', 'board/stm32f4/
for p in patterns:
mutations.append((random.choice(files), p, True))
# TODO: remove sampling once test_misra.sh is faster
mutations = random.sample(mutations, 2)
@pytest.mark.parametrize("fn, patch, should_fail", mutations)
def test_misra_mutation(fn, patch, should_fail):
with tempfile.TemporaryDirectory() as tmp:
shutil.copytree(ROOT, tmp + "/panda", dirs_exist_ok=True)
shutil.copytree(ROOT + "../opendbc", tmp + "/opendbc", dirs_exist_ok=True)
# apply patch
if fn is not None:

View File

@@ -1,32 +0,0 @@
#!/usr/bin/env python3
from panda import Panda, PandaDFU
if __name__ == "__main__":
try:
from openpilot.system.hardware import HARDWARE
HARDWARE.recover_internal_panda()
Panda.wait_for_dfu(None, 5)
except Exception:
pass
p = PandaDFU(None)
cfg = p.get_mcu_type().config
def readmem(addr, length, fn):
print(f"reading {hex(addr)} {hex(length)} bytes to {fn}")
max_size = 255
with open(fn, "wb") as f:
to_read = length
while to_read > 0:
l = min(to_read, max_size)
dat = p._handle.read(addr, l)
assert len(dat) == l
f.write(dat)
to_read -= len(dat)
addr += len(dat)
addr = cfg.bootstub_address
for i, sector_size in enumerate(cfg.sector_sizes):
readmem(addr, sector_size, f"sector_{i}.bin")
addr += sector_size

View File

@@ -1,6 +0,0 @@
#!/usr/bin/env bash
rm -f /tmp/dump_bootstub
rm -f /tmp/dump_main
dfu-util -a 0 -s 0x08000000 -U /tmp/dump_bootstub
dfu-util -a 0 -s 0x08004000 -U /tmp/dump_main

View File

@@ -1,34 +0,0 @@
#!/usr/bin/env python3
# type: ignore
from panda import Panda
from hexdump import hexdump
DEBUG = False
if __name__ == "__main__":
p = Panda()
length = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, 1)
print('Microsoft OS String Descriptor')
dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, length[0])
if DEBUG:
print(f'LEN: {hex(length[0])}')
hexdump("".join(map(chr, dat)))
ms_vendor_code = dat[16]
if DEBUG:
print(f'MS_VENDOR_CODE: {hex(length[0])}')
print('\nMicrosoft Compatible ID Feature Descriptor')
length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1)
if DEBUG:
print(f'LEN: {hex(length[0])}')
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, length[0])
hexdump("".join(map(chr, dat)))
print('\nMicrosoft Extended Properties Feature Descriptor')
length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1)
if DEBUG:
print(f'LEN: {hex(length[0])}')
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, length[0])
hexdump("".join(map(chr, dat)))

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env python3
import time
from panda import Panda, PandaDFU
class GPIO:
STM_RST_N = 124
STM_BOOT0 = 134
HUB_RST_N = 30
def gpio_init(pin, output):
with open(f"/sys/class/gpio/gpio{pin}/direction", 'wb') as f:
f.write(b"out" if output else b"in")
def gpio_set(pin, high):
with open(f"/sys/class/gpio/gpio{pin}/value", 'wb') as f:
f.write(b"1" if high else b"0")
if __name__ == "__main__":
for pin in (GPIO.STM_RST_N, GPIO.STM_BOOT0, GPIO.HUB_RST_N):
gpio_init(pin, True)
# reset USB hub
gpio_set(GPIO.HUB_RST_N, 0)
time.sleep(0.5)
gpio_set(GPIO.HUB_RST_N, 1)
# flash bootstub
print("resetting into DFU")
gpio_set(GPIO.STM_RST_N, 1)
gpio_set(GPIO.STM_BOOT0, 1)
time.sleep(1)
gpio_set(GPIO.STM_RST_N, 0)
gpio_set(GPIO.STM_BOOT0, 0)
time.sleep(1)
print("flashing bootstub")
PandaDFU(None).recover()
gpio_set(GPIO.STM_RST_N, 1)
time.sleep(0.5)
gpio_set(GPIO.STM_RST_N, 0)
time.sleep(1)
print("flashing app")
p = Panda()
assert p.bootstub
p.flash()

View File

@@ -1,17 +0,0 @@
#!/usr/bin/env python
import time
from opendbc.car.structs import CarParams
from panda import Panda
p = Panda()
while True:
p.set_safety_mode(CarParams.SafetyModel.toyota)
p.send_heartbeat()
print("ON")
time.sleep(1)
p.set_safety_mode(CarParams.SafetyModel.noOutput)
p.send_heartbeat()
print("OFF")
time.sleep(1)

View File

@@ -1,27 +0,0 @@
#!/usr/bin/env python3
from panda import Panda, PandaDFU, STBootloaderSPIHandle
if __name__ == "__main__":
try:
from openpilot.system.hardware import HARDWARE
HARDWARE.recover_internal_panda()
Panda.wait_for_dfu(None, 5)
except Exception:
pass
p = PandaDFU(None)
assert isinstance(p._handle, STBootloaderSPIHandle)
cfg = p.get_mcu_type().config
print("restoring from backup...")
addr = cfg.bootstub_address
for i, sector_size in enumerate(cfg.sector_sizes):
print(f"- sector #{i}")
p._handle.erase_sector(i)
with open(f"sector_{i}.bin", "rb") as f:
dat = f.read()
assert len(dat) == sector_size
p._handle.program(addr, dat)
addr += len(dat)
p.reset()

View File

@@ -1,7 +0,0 @@
#!/usr/bin/env bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
cd $DIR
PYTHONUNBUFFERED=1 NO_COLOR=1 CLAIM=1 PORT=4 ./debug_console.py

View File

@@ -1,21 +0,0 @@
#!/usr/bin/env python3
import os
import random
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
if __name__ == "__main__":
p = Panda()
p.set_safety_mode(CarParams.SafetyModel.allOutput)
print("Spamming all buses...")
while True:
at = random.randint(1, 2000)
st = get_test_string()[0:8]
bus = random.randint(0, 2)
p.can_send(at, st, bus)
# print("Sent message on bus: ", bus)

View File

@@ -1,33 +0,0 @@
#!/usr/bin/env python3
import struct
import time
from opendbc.car.structs import CarParams
from panda import Panda
if __name__ == "__main__":
p = Panda()
print(p.get_serial())
print(p.health())
t1 = time.time()
for _ in range(100):
p.get_serial()
t2 = time.time()
print("100 requests took %.2f ms" % ((t2 - t1) * 1000))
p.set_safety_mode(CarParams.SafetyModel.allOutput)
a = 0
while True:
# flood
msg = b"\xaa" * 4 + struct.pack("I", a)
p.can_send(0xaa, msg, 0)
p.can_send(0xaa, msg, 1)
p.can_send(0xaa, msg, 4)
time.sleep(0.01)
dat = p.can_recv()
if len(dat) > 0:
print(dat)
a += 1

View File

@@ -1,34 +0,0 @@
/*
gcc -DTEST_RSA test_rsa.c ../crypto/rsa.c ../crypto/sha.c && ./a.out
*/
#include <stdio.h>
#include <stdlib.h>
#define MAX_LEN 0x40000
char buf[MAX_LEN];
#include "../crypto/sha.h"
#include "../crypto/rsa.h"
#include "../obj/cert.h"
int main() {
FILE *f = fopen("../obj/panda.bin", "rb");
int tlen = fread(buf, 1, MAX_LEN, f);
fclose(f);
printf("read %d\n", tlen);
uint32_t *_app_start = (uint32_t *)buf;
int len = _app_start[0];
char digest[SHA_DIGEST_SIZE];
SHA_hash(&_app_start[1], len-4, digest);
printf("SHA hash done\n");
if (!RSA_verify(&rsa_key, ((void*)&_app_start[0]) + len, RSANUMBYTES, digest, SHA_DIGEST_SIZE)) {
printf("RSA fail\n");
} else {
printf("RSA match!!!\n");
}
return 0;
}

View File

@@ -1,8 +0,0 @@
#!/usr/bin/env bash
set -e
# Loops over all HW_TYPEs, see board/boards/board_declarations.h
for hw_type in {0..7}; do
echo "Testing HW_TYPE: $hw_type"
HW_TYPE=$hw_type python3 -m unittest discover .
done