mirror of
https://github.com/commaai/agnos-builder.git
synced 2026-06-08 11:04:51 +08:00
MDMA boot time profiling (#572)
* MDMA boot time profiling * lil cleanup * lil more * lil more
This commit is contained in:
151
scripts/mdma.py
151
scripts/mdma.py
@@ -3,13 +3,24 @@
|
||||
# dependencies = ["pyusb"]
|
||||
# ///
|
||||
import argparse
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import subprocess
|
||||
import re
|
||||
import select
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
|
||||
import usb.core
|
||||
|
||||
SERIAL_DEV = "/dev/serial/by-id/usb-Microchip_Tech_USB2_Controller_Hub-if01"
|
||||
PROMPT_RE = re.compile(rb"(?:login:|[#\$])$")
|
||||
USB_RT_PORT = 0x23
|
||||
USB_REQ_CLEAR_FEATURE = 1
|
||||
USB_REQ_SET_FEATURE = 3
|
||||
USB_PORT_POWER = 8
|
||||
|
||||
|
||||
class Pins:
|
||||
HFC_VID = 0x0424
|
||||
@@ -24,29 +35,27 @@ class Pins:
|
||||
|
||||
|
||||
class Mdma:
|
||||
def __init__(self):
|
||||
self.aux_ports = [
|
||||
(self.hub_location(Pins.USB7002_VID, Pins.USB7002_PID), "1"),
|
||||
(self.hub_location(Pins.USB4002_VID, Pins.USB4002_PID), "1"),
|
||||
]
|
||||
self.dev = None
|
||||
"""
|
||||
MDMA: the mici debug and monitoring adapter
|
||||
|
||||
def hub_location(self, vid, pid):
|
||||
needle = f"[{vid:04x}:{pid:04x} "
|
||||
for line in subprocess.check_output(["uhubctl", "-S"], text=True).splitlines():
|
||||
if line.startswith("Current status for hub ") and needle in line:
|
||||
return line.split()[4]
|
||||
raise SystemExit(f"could not find hub {vid:04x}:{pid:04x}")
|
||||
an MDMA is your best friend for low level mici (aka comma four) development.
|
||||
- power the SOC on and off
|
||||
- force the SOC into QDL mode for un-brickability
|
||||
- read and write to the SOC's UART
|
||||
- and more!
|
||||
"""
|
||||
|
||||
def _dev(self):
|
||||
self.dev = self.dev or usb.core.find(idVendor=Pins.HFC_VID, idProduct=Pins.HFC_PID)
|
||||
return self.dev
|
||||
def hub(self, vid, pid):
|
||||
hub = usb.core.find(idVendor=vid, idProduct=pid)
|
||||
if hub is None:
|
||||
raise SystemExit(f"could not find hub {vid:04x}:{pid:04x}")
|
||||
return hub
|
||||
|
||||
def reg(self, addr, value=None):
|
||||
dev = self._dev()
|
||||
def reg(self, addr, value=None, size=4):
|
||||
dev = usb.core.find(idVendor=Pins.HFC_VID, idProduct=Pins.HFC_PID)
|
||||
if value is None:
|
||||
return int.from_bytes(bytes(dev.ctrl_transfer(0xC0, 0x04, addr & 0xFFFF, addr >> 16, 4)), "little")
|
||||
dev.ctrl_transfer(0x40, 0x03, addr & 0xFFFF, addr >> 16, value.to_bytes(4, "little"))
|
||||
return int.from_bytes(bytes(dev.ctrl_transfer(0xC0, 0x04, addr & 0xFFFF, addr >> 16, size)), "little")
|
||||
dev.ctrl_transfer(0x40, 0x03, addr & 0xFFFF, addr >> 16, value.to_bytes(size, "little"))
|
||||
|
||||
def gpio(self, bit, on):
|
||||
if on:
|
||||
@@ -56,42 +65,104 @@ class Mdma:
|
||||
self.reg(Pins.PIO96_OEN, self.reg(Pins.PIO96_OEN) | bit)
|
||||
|
||||
def aux(self, action):
|
||||
for hub, port in self.aux_ports:
|
||||
subprocess.run(["uhubctl", "-S", "-e", "-l", hub, "-p", port, "-a", action], check=True)
|
||||
request = USB_REQ_SET_FEATURE if action == "on" else USB_REQ_CLEAR_FEATURE
|
||||
for vid, pid in [(Pins.USB7002_VID, Pins.USB7002_PID), (Pins.USB4002_VID, Pins.USB4002_PID)]:
|
||||
try:
|
||||
self.hub(vid, pid).ctrl_transfer(USB_RT_PORT, request, USB_PORT_POWER, 1, None, timeout=1000)
|
||||
except usb.core.USBError: # try one more time
|
||||
self.hub(vid, pid).ctrl_transfer(USB_RT_PORT, request, USB_PORT_POWER, 1, None, timeout=1000)
|
||||
|
||||
def reboot(self):
|
||||
def power_off(self):
|
||||
self.aux("off")
|
||||
self.gpio(Pins.VIN_EN, False)
|
||||
|
||||
def reboot(self, qdl):
|
||||
self.aux("off")
|
||||
self.gpio(Pins.VIN_EN, False)
|
||||
time.sleep(0.1)
|
||||
self.gpio(Pins.VIN_EN, True)
|
||||
if qdl:
|
||||
# on comma 3X and comma four, aux powering
|
||||
# up first forces QDL mode on boot
|
||||
self.aux("on")
|
||||
else:
|
||||
self.gpio(Pins.VIN_EN, True)
|
||||
boot_time = time.monotonic()
|
||||
time.sleep(0.1)
|
||||
self.gpio(Pins.VIN_EN, True)
|
||||
self.aux("on")
|
||||
|
||||
def reboot_qdl(self):
|
||||
self.aux("on")
|
||||
self.gpio(Pins.VIN_EN, False)
|
||||
time.sleep(0.1)
|
||||
self.gpio(Pins.VIN_EN, True)
|
||||
return boot_time
|
||||
|
||||
def serial(self):
|
||||
os.execvp("screen", ["screen", "/dev/serial/by-id/usb-Microchip_Tech_USB2_Controller_Hub-if01", "115200"])
|
||||
os.execvp("screen", ["screen", SERIAL_DEV, "115200"])
|
||||
|
||||
def profile_boot(self):
|
||||
# device off for clean serial
|
||||
self.power_off()
|
||||
|
||||
# open the serial device
|
||||
try:
|
||||
fd = os.open(SERIAL_DEV, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EBUSY:
|
||||
raise SystemExit(f"{SERIAL_DEV} is busy; close the serial console first")
|
||||
raise
|
||||
|
||||
attrs = termios.tcgetattr(fd)
|
||||
attrs[0] = 0
|
||||
attrs[1] = 0
|
||||
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL
|
||||
attrs[3] = 0
|
||||
attrs[4] = termios.B115200
|
||||
attrs[5] = termios.B115200
|
||||
attrs[6][termios.VMIN] = 0
|
||||
attrs[6][termios.VTIME] = 0
|
||||
termios.tcsetattr(fd, termios.TCSANOW, attrs)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) & ~os.O_NONBLOCK)
|
||||
termios.tcflush(fd, termios.TCIFLUSH)
|
||||
while (data := os.read(fd, 4096)):
|
||||
time.sleep(0.1)
|
||||
|
||||
# boot!
|
||||
start = self.reboot(qdl=False)
|
||||
|
||||
# show serial console with timestamps until boot is done
|
||||
pending = b""
|
||||
while True:
|
||||
if not select.select([fd], [], [], 0.25)[0]:
|
||||
continue
|
||||
|
||||
data = os.read(fd, 4096)
|
||||
if not data:
|
||||
continue
|
||||
pending += data.replace(b"\r\n", b"\n")
|
||||
while b"\n" in pending:
|
||||
line, pending = pending.split(b"\n", 1)
|
||||
line = line.rstrip()
|
||||
print(f"[{time.monotonic() - start:8.3f}] {line.decode(errors='replace')}", flush=True)
|
||||
if PROMPT_RE.search(line):
|
||||
return
|
||||
if PROMPT_RE.search(pending.strip()):
|
||||
print(f"[{time.monotonic() - start:8.3f}] {pending.strip().decode(errors='replace')}", flush=True)
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cmds = {
|
||||
"reboot": (lambda: Mdma().reboot(qdl=False), "reboot comma four into normal boot"),
|
||||
"reboot-qdl": (lambda: Mdma().reboot(qdl=True), "reboot comma four into QDL mode for flashing"),
|
||||
"serial": (lambda: Mdma().serial(), "open the MSM UART console with screen"),
|
||||
"serial": (lambda: Mdma().serial(), "open the MSM UART console with screen"),
|
||||
"profile-boot": (lambda: Mdma().profile_boot(), "reboot comma four and profile boot time"),
|
||||
}
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
subparsers.add_parser("reboot", help="power cycle into normal boot")
|
||||
subparsers.add_parser("reboot-qdl", help="power cycle with AUX present to enter QDL")
|
||||
subparsers.add_parser("serial", help="open the MSM UART console with screen")
|
||||
for cmd, (_, hlp) in cmds.items():
|
||||
subparsers.add_parser(cmd, help=hlp)
|
||||
if len(sys.argv) == 1:
|
||||
parser.print_help()
|
||||
raise SystemExit(0)
|
||||
args = parser.parse_args()
|
||||
|
||||
mdma = Mdma()
|
||||
if args.command == "reboot":
|
||||
mdma.reboot()
|
||||
elif args.command == "reboot-qdl":
|
||||
mdma.reboot_qdl()
|
||||
elif args.command == "serial":
|
||||
mdma.serial()
|
||||
cmds[args.command][0]()
|
||||
|
||||
Reference in New Issue
Block a user