Compare commits

..

5 Commits

Author SHA1 Message Date
Jason Wen
813ef1ee65 cleanup 2026-03-04 23:23:49 -05:00
Jason Wen
3749b49186 Merge branch 'master' into fix_rivian_long 2026-03-04 23:15:19 -05:00
Jason Wen
6bea70ac86 pandad: gate unsupported pandas before flashing (#1754) 2026-03-04 23:15:10 -05:00
lukasloetkolben
25b6f84bb0 lets do it like that 2026-03-03 15:42:21 +01:00
lukasloetkolben
73092daefc fixed missing internal panda 2026-03-03 15:25:48 +01:00
4 changed files with 36 additions and 290 deletions

View File

@@ -32,8 +32,7 @@ def flash_panda(panda_serial: str) -> Panda:
raise
# skip flashing if the detected panda is not supported
supported_panda = check_panda_support(panda)
if not supported_panda:
if panda.get_type() not in Panda.SUPPORTED_DEVICES:
cloudlog.warning(f"Panda {panda_serial} is not supported (hw_type: {panda.get_type()}), skipping flash...")
return panda
@@ -69,10 +68,19 @@ def flash_panda(panda_serial: str) -> Panda:
return panda
def check_panda_support(panda) -> bool:
hw_type = panda.get_type()
if hw_type in Panda.SUPPORTED_DEVICES:
return True
def check_panda_support(panda_serials: list[str]) -> bool:
unsupported = []
for serial in panda_serials:
panda = Panda(serial)
hw_type = panda.get_type()
panda.close()
if hw_type in Panda.SUPPORTED_DEVICES:
return True
unsupported.append((serial, hw_type))
for serial, hw_type in unsupported:
cloudlog.warning(f"Panda {serial} is not supported (hw_type: {hw_type}), skipping...")
return False
@@ -126,13 +134,17 @@ def main() -> None:
cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
# custom flasher for xnor's Rivian Longitudinal Upgrade Kit
flash_rivian_long(panda_serials)
# skip flashing and health check if no supported panda is detected
if not check_panda_support(panda_serials):
continue
# Flash the first panda
panda_serial = panda_serials[0]
panda = flash_panda(panda_serial)
# flash Rivian longitudinal upgrade panda
flash_rivian_long(panda)
# Ensure internal panda is present if expected
if HARDWARE.has_internal_panda() and not panda.is_internal():
cloudlog.error("Internal panda is missing, trying again")
@@ -143,12 +155,6 @@ def main() -> None:
# log panda fw version
params.put("PandaSignatures", panda.get_signature())
# skip health check if the detected panda is not supported
supported_panda = check_panda_support(panda)
if not supported_panda:
cloudlog.warning(f"Panda {panda.get_usb_serial()} is not supported (hw_type: {panda.get_type()}), skipping health check...")
continue
# check health for lost heartbeat
health = panda.health()
if health["heartbeat_lost"]:

View File

@@ -20,7 +20,6 @@ from openpilot.system.ui.widgets.list_view import button_item
from openpilot.system.ui.sunnypilot.widgets.html_render import HtmlModalSP
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.external_storage import external_storage_item
PREBUILT_PATH = os.path.join(Paths.comma_home(), "prebuilt") if PC else "/data/openpilot/prebuilt"
@@ -53,11 +52,7 @@ class DeveloperLayoutSP(DeveloperLayout):
self.error_log_btn = button_item(tr("Error Log"), tr("VIEW"), tr("View the error log for sunnypilot crashes."), callback=self._on_error_log_clicked)
self.external_storage = external_storage_item(tr("External Storage"), description=tr("Extend your comma device's storage by inserting a USB drive " +
"into the aux port."))
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle,
self.external_storage, self.error_log_btn,]
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle, self.error_log_btn,]
@staticmethod
def _on_prebuilt_toggled(state):

View File

@@ -1,261 +0,0 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
import threading
import subprocess
import copy
from enum import Enum
from collections.abc import Callable
from openpilot.common.params import Params
from openpilot.system.hardware import PC
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.multilang import tr, tr_noop
from openpilot.system.ui.widgets import DialogResult
from openpilot.system.ui.widgets.button import Button, ButtonStyle
from openpilot.system.ui.widgets.confirm_dialog import alert_dialog, ConfirmDialog
from openpilot.system.ui.widgets.list_view import (
ItemAction,
ListItem,
BUTTON_HEIGHT,
BUTTON_BORDER_RADIUS,
BUTTON_FONT_SIZE,
BUTTON_WIDTH,
)
VALUE_FONT_SIZE = 48
class ExternalStorageState(Enum):
DISABLED = tr_noop("DISABLED")
LOADING = tr_noop("LOADING")
CHECK = tr_noop("CHECK")
MOUNT = tr_noop("MOUNT")
UNMOUNT = tr_noop("UNMOUNT")
FORMAT = tr_noop("FORMAT")
class ExternalStorageAction(ItemAction):
MAX_WIDTH = 500
def __init__(self):
super().__init__(self.MAX_WIDTH, True)
self._params = Params()
self._error_message = ""
self._text_font = gui_app.font(FontWeight.NORMAL)
self._button = Button(
"",
click_callback=self._handle_button_click,
button_style=ButtonStyle.LIST_ACTION,
border_radius=BUTTON_BORDER_RADIUS,
font_size=BUTTON_FONT_SIZE,
)
self._value_text = ""
self._formatting = False
self._refresh_pending = False
self._state = ExternalStorageState.CHECK
self._refresh_state()
self.refresh()
def set_touch_valid_callback(self, callback):
def wrapped():
if self._state == ExternalStorageState.DISABLED:
return False
return callback()
super().set_touch_valid_callback(wrapped)
self._button.set_touch_valid_callback(wrapped)
def _run(self, cmd: str) -> bool:
return subprocess.call(["sh", "-c", cmd]) == 0
def _run_output(self, cmd: str) -> str:
try:
out = subprocess.check_output(["sh", "-c", cmd], universal_newlines=True)
return out.strip()
except Exception:
return ""
def _render(self, rect: rl.Rectangle) -> bool:
if self._error_message:
msg = copy.copy(self._error_message)
gui_app.set_modal_overlay(alert_dialog(msg))
self._error_message = ""
if self._value_text:
text_size = measure_text_cached(self._text_font, self._value_text, VALUE_FONT_SIZE)
rl.draw_text_ex(
self._text_font,
self._value_text,
(rect.x + rect.width - BUTTON_WIDTH - text_size.x - 30,
rect.y + (rect.height - text_size.y) / 2),
VALUE_FONT_SIZE,
1.0,
rl.Color(170, 170, 170, 255),
)
button_rect = rl.Rectangle(
rect.x + rect.width - BUTTON_WIDTH,
rect.y + (rect.height - BUTTON_HEIGHT) / 2,
BUTTON_WIDTH,
BUTTON_HEIGHT
)
self._button.set_rect(button_rect)
self._button.set_text(tr(self._state.value))
self._button.set_enabled(self._state not in (ExternalStorageState.LOADING,
ExternalStorageState.DISABLED))
self._button.render(button_rect)
return False
def _refresh_state(self):
if PC:
self._state = ExternalStorageState.DISABLED
self._button.set_enabled(False)
self._value_text = ""
def debounced_refresh(self):
if self._refresh_pending:
return
self._refresh_pending = True
def _timer():
import time
time.sleep(0.25)
self._refresh_pending = False
self.refresh()
threading.Thread(target=_timer, daemon=True).start()
def refresh(self):
def _work():
is_mounted = self._run("findmnt -n /mnt/external_realdata")
has_drive = self._run("lsblk -f /dev/sdg")
has_fs = self._run("lsblk -f /dev/sdg1 | grep -q ext4")
has_label = self._run("blkid /dev/sdg1 | grep -q 'LABEL=\"openpilot\"'")
info = ""
if is_mounted and has_label:
info = self._run_output(
"df -h /mnt/external_realdata | awk 'NR==2 {print $3 \"/\" $2}'"
)
def apply():
if self._formatting:
self._value_text = tr("formatting")
self._state = ExternalStorageState.FORMAT
return
if not has_drive:
self._value_text = tr("insert drive")
self._state = ExternalStorageState.CHECK
elif not has_fs or not has_label:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
elif is_mounted:
self._value_text = info
self._state = ExternalStorageState.UNMOUNT
else:
self._value_text = tr("drive detected")
self._state = ExternalStorageState.MOUNT
apply()
threading.Thread(target=_work, daemon=True).start()
def _handle_button_click(self):
st = self._state
if st == ExternalStorageState.DISABLED:
return
if st in (ExternalStorageState.CHECK, ExternalStorageState.MOUNT):
self.mount_storage()
elif st == ExternalStorageState.UNMOUNT:
self.unmount_storage()
elif st == ExternalStorageState.FORMAT:
dialog = ConfirmDialog(
tr("Are you sure you want to format this drive? This will erase all data."),
confirm_text=tr("Format"),
cancel_text=tr("Cancel"),
)
gui_app.set_modal_overlay(dialog, callback=self._confirm_format)
def _confirm_format(self, result: DialogResult):
if result == DialogResult.CONFIRM:
self.format_storage()
def mount_storage(self):
self._value_text = tr("mounting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo mount -o remount,rw / &&
sudo mkdir -p /mnt/external_realdata &&
(grep -q '/dev/sdg1 /mnt/external_realdata' /etc/fstab ||
echo '/dev/sdg1 /mnt/external_realdata ext4 defaults,nofail 0 2' >> /etc/fstab) &&
sudo systemctl daemon-reexec &&
sudo mount /mnt/external_realdata &&
sudo chown -R comma:comma /mnt/external_realdata &&
sudo chmod -R 775 /mnt/external_realdata &&
sudo mount -o remount,ro /
"""
subprocess.call(["sh", "-c", cmd])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def unmount_storage(self):
self._value_text = tr("unmounting")
self._state = ExternalStorageState.LOADING
def _work():
subprocess.call(["sh", "-c", "sudo umount /mnt/external_realdata"])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def format_storage(self):
self._formatting = True
self._value_text = tr("formatting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo wipefs -a /dev/sdg &&
sudo parted -s /dev/sdg mklabel gpt mkpart primary ext4 0% 100% &&
sudo mkfs.ext4 -F -L openpilot /dev/sdg1
"""
exitcode = subprocess.call(["sh", "-c", cmd])
def apply():
self._formatting = False
if exitcode == 0:
self.mount_storage()
else:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
apply()
threading.Thread(target=_work, daemon=True).start()
def external_storage_item(title: str | Callable[[], str], description: str | Callable[[], str]) -> ListItem:
return ListItem(
title=title,
description=description,
action_item=ExternalStorageAction()
)

View File

@@ -74,7 +74,7 @@ def _flash_panda(panda: Panda) -> None:
panda.reconnect()
def flash_rivian_long(panda: Panda) -> None:
def flash_rivian_long(panda_serials: list[str]) -> None:
if not os.path.isfile(FW_PATH):
cloudlog.error(f"Rivian longitudinal upgrade firmware not found at {FW_PATH}")
return
@@ -83,13 +83,19 @@ def flash_rivian_long(panda: Panda) -> None:
cloudlog.info("Not a Rivian, skipping longitudinal upgrade...")
return
# only flash external black pandas (HW_TYPE_BLACK = 0x03)
if panda.get_type() == b'\x03' and not panda.is_internal():
try:
_flash_panda(panda)
except Exception:
cloudlog.exception(f"Failed to flash F4 panda {panda.get_usb_serial()}")
for serial in panda_serials:
panda = Panda(serial)
# only flash external black pandas (HW_TYPE_BLACK = 0x03)
if panda.get_type() == b'\x03' and not panda.is_internal():
try:
_flash_panda(panda)
cloudlog.info(f"Successfully flashed xnor's Rivian Longitudinal Upgrade Kit: {serial}")
except Exception:
cloudlog.exception(f"Failed to flash xnor's Rivian Longitudinal Upgrade Kit: {serial}")
panda.close()
return
if __name__ == '__main__':
flash_rivian_long(Panda())
flash_rivian_long(Panda.list())