Compare commits

..

3 Commits

Author SHA1 Message Date
Jason Wen 8ee25cf540 no unittest oops 2026-07-01 20:26:43 -04:00
Jason Wen f08d34612e lateral mismatch 2026-07-01 19:49:05 -04:00
Jason Wen 830ae768ad tests 2026-07-01 19:49:05 -04:00
4 changed files with 244 additions and 268 deletions
@@ -20,7 +20,6 @@ from openpilot.system.ui.widgets.list_view import button_item
from openpilot.system.ui.sunnypilot.widgets.html_render import HtmlModalSP
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.external_storage import external_storage_item
PREBUILT_PATH = os.path.join(Paths.comma_home(), "prebuilt") if PC else "/data/openpilot/prebuilt"
@@ -53,11 +52,7 @@ class DeveloperLayoutSP(DeveloperLayout):
self.error_log_btn = button_item(tr("Error Log"), tr("VIEW"), tr("View the error log for sunnypilot crashes."), callback=self._on_error_log_clicked)
self.external_storage = external_storage_item(tr("External Storage"), description=tr("Extend your comma device's storage by inserting a USB drive " +
"into the aux port."))
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle,
self.external_storage, self.error_log_btn,]
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle, self.error_log_btn,]
@staticmethod
def _on_prebuilt_toggled(state):
@@ -1,261 +0,0 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
import threading
import subprocess
import copy
from enum import Enum
from collections.abc import Callable
from openpilot.common.params import Params
from openpilot.system.hardware import PC
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.multilang import tr, tr_noop
from openpilot.system.ui.widgets import DialogResult
from openpilot.system.ui.widgets.button import Button, ButtonStyle
from openpilot.system.ui.widgets.confirm_dialog import alert_dialog, ConfirmDialog
from openpilot.system.ui.widgets.list_view import (
ItemAction,
ListItem,
BUTTON_HEIGHT,
BUTTON_BORDER_RADIUS,
BUTTON_FONT_SIZE,
BUTTON_WIDTH,
)
VALUE_FONT_SIZE = 48
class ExternalStorageState(Enum):
DISABLED = tr_noop("DISABLED")
LOADING = tr_noop("LOADING")
CHECK = tr_noop("CHECK")
MOUNT = tr_noop("MOUNT")
UNMOUNT = tr_noop("UNMOUNT")
FORMAT = tr_noop("FORMAT")
class ExternalStorageAction(ItemAction):
MAX_WIDTH = 500
def __init__(self):
super().__init__(self.MAX_WIDTH, True)
self._params = Params()
self._error_message = ""
self._text_font = gui_app.font(FontWeight.NORMAL)
self._button = Button(
"",
click_callback=self._handle_button_click,
button_style=ButtonStyle.LIST_ACTION,
border_radius=BUTTON_BORDER_RADIUS,
font_size=BUTTON_FONT_SIZE,
)
self._value_text = ""
self._formatting = False
self._refresh_pending = False
self._state = ExternalStorageState.CHECK
self._refresh_state()
self.refresh()
def set_touch_valid_callback(self, callback):
def wrapped():
if self._state == ExternalStorageState.DISABLED:
return False
return callback()
super().set_touch_valid_callback(wrapped)
self._button.set_touch_valid_callback(wrapped)
def _run(self, cmd: str) -> bool:
return subprocess.call(["sh", "-c", cmd]) == 0
def _run_output(self, cmd: str) -> str:
try:
out = subprocess.check_output(["sh", "-c", cmd], universal_newlines=True)
return out.strip()
except Exception:
return ""
def _render(self, rect: rl.Rectangle) -> bool:
if self._error_message:
msg = copy.copy(self._error_message)
gui_app.set_modal_overlay(alert_dialog(msg))
self._error_message = ""
if self._value_text:
text_size = measure_text_cached(self._text_font, self._value_text, VALUE_FONT_SIZE)
rl.draw_text_ex(
self._text_font,
self._value_text,
(rect.x + rect.width - BUTTON_WIDTH - text_size.x - 30,
rect.y + (rect.height - text_size.y) / 2),
VALUE_FONT_SIZE,
1.0,
rl.Color(170, 170, 170, 255),
)
button_rect = rl.Rectangle(
rect.x + rect.width - BUTTON_WIDTH,
rect.y + (rect.height - BUTTON_HEIGHT) / 2,
BUTTON_WIDTH,
BUTTON_HEIGHT
)
self._button.set_rect(button_rect)
self._button.set_text(tr(self._state.value))
self._button.set_enabled(self._state not in (ExternalStorageState.LOADING,
ExternalStorageState.DISABLED))
self._button.render(button_rect)
return False
def _refresh_state(self):
if PC:
self._state = ExternalStorageState.DISABLED
self._button.set_enabled(False)
self._value_text = ""
def debounced_refresh(self):
if self._refresh_pending:
return
self._refresh_pending = True
def _timer():
import time
time.sleep(0.25)
self._refresh_pending = False
self.refresh()
threading.Thread(target=_timer, daemon=True).start()
def refresh(self):
def _work():
is_mounted = self._run("findmnt -n /mnt/external_realdata")
has_drive = self._run("lsblk -f /dev/sdg")
has_fs = self._run("lsblk -f /dev/sdg1 | grep -q ext4")
has_label = self._run("blkid /dev/sdg1 | grep -q 'LABEL=\"openpilot\"'")
info = ""
if is_mounted and has_label:
info = self._run_output(
"df -h /mnt/external_realdata | awk 'NR==2 {print $3 \"/\" $2}'"
)
def apply():
if self._formatting:
self._value_text = tr("formatting")
self._state = ExternalStorageState.FORMAT
return
if not has_drive:
self._value_text = tr("insert drive")
self._state = ExternalStorageState.CHECK
elif not has_fs or not has_label:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
elif is_mounted:
self._value_text = info
self._state = ExternalStorageState.UNMOUNT
else:
self._value_text = tr("drive detected")
self._state = ExternalStorageState.MOUNT
apply()
threading.Thread(target=_work, daemon=True).start()
def _handle_button_click(self):
st = self._state
if st == ExternalStorageState.DISABLED:
return
if st in (ExternalStorageState.CHECK, ExternalStorageState.MOUNT):
self.mount_storage()
elif st == ExternalStorageState.UNMOUNT:
self.unmount_storage()
elif st == ExternalStorageState.FORMAT:
dialog = ConfirmDialog(
tr("Are you sure you want to format this drive? This will erase all data."),
confirm_text=tr("Format"),
cancel_text=tr("Cancel"),
)
gui_app.set_modal_overlay(dialog, callback=self._confirm_format)
def _confirm_format(self, result: DialogResult):
if result == DialogResult.CONFIRM:
self.format_storage()
def mount_storage(self):
self._value_text = tr("mounting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo mount -o remount,rw / &&
sudo mkdir -p /mnt/external_realdata &&
(grep -q '/dev/sdg1 /mnt/external_realdata' /etc/fstab ||
echo '/dev/sdg1 /mnt/external_realdata ext4 defaults,nofail 0 2' >> /etc/fstab) &&
sudo systemctl daemon-reexec &&
sudo mount /mnt/external_realdata &&
sudo chown -R comma:comma /mnt/external_realdata &&
sudo chmod -R 775 /mnt/external_realdata &&
sudo mount -o remount,ro /
"""
subprocess.call(["sh", "-c", cmd])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def unmount_storage(self):
self._value_text = tr("unmounting")
self._state = ExternalStorageState.LOADING
def _work():
subprocess.call(["sh", "-c", "sudo umount /mnt/external_realdata"])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def format_storage(self):
self._formatting = True
self._value_text = tr("formatting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo wipefs -a /dev/sdg &&
sudo parted -s /dev/sdg mklabel gpt mkpart primary ext4 0% 100% &&
sudo mkfs.ext4 -F -L openpilot /dev/sdg1
"""
exitcode = subprocess.call(["sh", "-c", cmd])
def apply():
self._formatting = False
if exitcode == 0:
self.mount_storage()
else:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
apply()
threading.Thread(target=_work, daemon=True).start()
def external_storage_item(title: str | Callable[[], str], description: str | Callable[[], str]) -> ListItem:
return ListItem(
title=title,
description=description,
action_item=ExternalStorageAction()
)
+1 -1
View File
@@ -69,7 +69,7 @@ class ModularAssistiveDrivingSystem:
return False
def should_silent_lkas_enable(self, CS: structs.CarState) -> bool:
if self.steering_mode_on_brake == MadsSteeringModeOnBrake.PAUSE and self.pedal_pressed_non_gas_pressed(CS):
if self.steering_mode_on_brake == MadsSteeringModeOnBrake.PAUSE and (CS.brakePressed or CS.regenBraking or self.pedal_pressed_non_gas_pressed(CS)):
return False
if self.events_sp.contains_in_list(GEARS_ALLOW_PAUSED_SILENT):
@@ -0,0 +1,242 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from cereal import log, custom
from opendbc.car import structs
from openpilot.selfdrive.selfdrived.events import Events
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
from openpilot.sunnypilot.mads.helpers import MadsSteeringModeOnBrake, read_steering_mode_param
from openpilot.sunnypilot.mads.mads import ModularAssistiveDrivingSystem
from opendbc.sunnypilot.car.tesla.values import TeslaFlagsSP
State = custom.ModularAssistiveDrivingSystem.ModularAssistiveDrivingSystemState
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
SafetyModel = structs.CarParams.SafetyModel
def make_car_state(brake_pressed=False, regen_braking=False, standstill=False, v_ego=0.0):
cs = structs.CarState()
cs.brakePressed = brake_pressed
cs.regenBraking = regen_braking
cs.standstill = standstill
cs.vEgo = v_ego
cs.cruiseState.available = True
return cs
def make_panda_state(mocker, controls_allowed_lateral=True):
ps = mocker.MagicMock()
ps.controlsAllowedLateral = controls_allowed_lateral
ps.safetyModel = SafetyModel.hyundai
return ps
def make_mads(mocker, steering_mode):
sd = mocker.MagicMock()
sd.CP = structs.CarParams()
sd.CP.brand = "hyundai"
sd.CP_SP = structs.CarParamsSP()
sd.params = mocker.MagicMock()
sd.params.get_bool = mocker.MagicMock(side_effect=lambda k: {
"Mads": True, "MadsMainCruiseAllowed": False,
"DisengageOnAccelerator": True, "MadsUnifiedEngagementMode": False,
}.get(k, False))
sd.params.get = mocker.MagicMock(return_value=steering_mode)
sd.events = Events()
sd.events_sp = EventsSP()
sd.enabled = False
sd.enabled_prev = False
sd.initialized = True
sd.CS_prev = make_car_state()
sd.sm = {'pandaStates': [make_panda_state(mocker)]}
sd.state_machine = mocker.MagicMock()
mads = ModularAssistiveDrivingSystem(sd)
mads.enabled_toggle = True
mads.steering_mode_on_brake = steering_mode
return mads, sd
def run_frames(mads, sd, cs, n=1):
for _ in range(n):
mads.update(cs)
sd.CS_prev = cs
sd.events.clear()
sd.events_sp.clear()
# should_silent_lkas_enable across all modes
class TestShouldSilentLkasEnable:
@pytest.mark.parametrize("brake,regen", [(True, False), (False, True)])
def test_pause_blocks_reenable_on_braking_at_standstill(self, mocker, brake, regen):
mads, _ = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
cs = make_car_state(brake_pressed=brake, regen_braking=regen, standstill=True)
assert mads.should_silent_lkas_enable(cs) is False
def test_pause_allows_reenable_on_brake_release(self, mocker):
mads, _ = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
cs = make_car_state(standstill=True)
assert mads.should_silent_lkas_enable(cs) is True
def test_remain_active_ignores_brake(self, mocker):
mads, _ = make_mads(mocker, MadsSteeringModeOnBrake.REMAIN_ACTIVE)
cs = make_car_state(brake_pressed=True, standstill=True)
assert mads.should_silent_lkas_enable(cs) is True
def test_disengage_ignores_brake_for_silent_enable(self, mocker):
mads, _ = make_mads(mocker, MadsSteeringModeOnBrake.DISENGAGE)
cs = make_car_state(brake_pressed=True, standstill=True)
assert mads.should_silent_lkas_enable(cs) is True
# pause
class TestPauseMode:
def test_stays_paused_at_standstill_brake_held(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
mads.state_machine.state = State.enabled
mads.enabled = True
mads.active = True
sd.events.add(EventName.pedalPressed)
run_frames(mads, sd, make_car_state(brake_pressed=True, v_ego=15.0))
assert mads.state_machine.state == State.paused
sd.sm['pandaStates'] = [make_panda_state(mocker, False)]
run_frames(mads, sd, make_car_state(brake_pressed=True, standstill=True), n=250)
assert mads.state_machine.state == State.paused
def test_resumes_on_brake_release_at_standstill(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
mads.state_machine.state = State.paused
mads.enabled = True
mads.active = False
run_frames(mads, sd, make_car_state(standstill=True))
assert mads.state_machine.state == State.enabled
def test_full_cycle_moving_to_standstill(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
mads.state_machine.state = State.enabled
mads.enabled = True
mads.active = True
sd.events.add(EventName.pedalPressed)
run_frames(mads, sd, make_car_state(brake_pressed=True, v_ego=15.0))
assert mads.state_machine.state == State.paused
sd.sm['pandaStates'] = [make_panda_state(mocker, False)]
run_frames(mads, sd, make_car_state(brake_pressed=True, standstill=True), n=250)
assert mads.state_machine.state == State.paused
sd.sm['pandaStates'] = [make_panda_state(mocker, True)]
run_frames(mads, sd, make_car_state(standstill=True))
assert mads.state_machine.state == State.enabled
# disengage
class TestDisengageMode:
def test_brake_while_enabled_disables(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.DISENGAGE)
mads.state_machine.state = State.enabled
mads.enabled = True
mads.active = True
sd.events.add(EventName.pedalPressed)
run_frames(mads, sd, make_car_state(brake_pressed=True, v_ego=10.0))
assert mads.state_machine.state == State.disabled
def test_brake_sends_lkas_disable_when_enabled(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.DISENGAGE)
mads.state_machine.state = State.enabled
mads.enabled = True
mads.active = True
sd.events.add(EventName.pedalPressed)
mads.update_events(make_car_state(brake_pressed=True, v_ego=5.0))
assert sd.events_sp.has(EventNameSP.lkasDisable)
# remain active
class TestRemainActiveMode:
def test_brake_does_not_pause_or_disable(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.REMAIN_ACTIVE)
mads.state_machine.state = State.enabled
mads.enabled = True
mads.active = True
sd.events.add(EventName.pedalPressed)
run_frames(mads, sd, make_car_state(brake_pressed=True, v_ego=10.0))
assert mads.state_machine.state == State.enabled
# lateral mismatch counter
class TestLateralMismatchCounter:
def test_no_accumulation_while_paused(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
mads.state_machine.state = State.paused
mads.enabled = True
mads.active = False
sd.sm['pandaStates'] = [make_panda_state(mocker, False)]
run_frames(mads, sd, make_car_state(brake_pressed=True, standstill=True), n=250)
assert mads.lateral_mismatch_counter == 0
def test_accumulates_when_active_and_panda_disagrees(self, mocker):
mads, sd = make_mads(mocker, MadsSteeringModeOnBrake.PAUSE)
mads.enabled = True
mads.active = True
sd.sm['pandaStates'] = [make_panda_state(mocker, False)]
for _ in range(200):
mads.data_sample()
assert mads.lateral_mismatch_counter == 200
# brand restrictions
class TestBrandSteeringModeRestrictions:
def test_rivian_forced_to_disengage(self, mocker):
CP = structs.CarParams()
CP.brand = "rivian"
CP_SP = structs.CarParamsSP()
params = mocker.MagicMock()
assert read_steering_mode_param(CP, CP_SP, params) == MadsSteeringModeOnBrake.DISENGAGE
params.get.assert_not_called()
def test_tesla_without_vehicle_bus_forced_to_disengage(self, mocker):
CP = structs.CarParams()
CP.brand = "tesla"
CP_SP = structs.CarParamsSP()
CP_SP.flags = 0
params = mocker.MagicMock()
assert read_steering_mode_param(CP, CP_SP, params) == MadsSteeringModeOnBrake.DISENGAGE
def test_tesla_with_vehicle_bus_uses_param(self, mocker):
CP = structs.CarParams()
CP.brand = "tesla"
CP_SP = structs.CarParamsSP()
CP_SP.flags = TeslaFlagsSP.HAS_VEHICLE_BUS
params = mocker.MagicMock()
params.get = mocker.MagicMock(return_value=MadsSteeringModeOnBrake.REMAIN_ACTIVE)
assert read_steering_mode_param(CP, CP_SP, params) == MadsSteeringModeOnBrake.REMAIN_ACTIVE
@pytest.mark.parametrize("brand", ["hyundai", "toyota", "honda", "gm"])
def test_other_brands_use_param(self, mocker, brand):
CP = structs.CarParams()
CP.brand = brand
CP_SP = structs.CarParamsSP()
params = mocker.MagicMock()
params.get = mocker.MagicMock(return_value=MadsSteeringModeOnBrake.REMAIN_ACTIVE)
assert read_steering_mode_param(CP, CP_SP, params) == MadsSteeringModeOnBrake.REMAIN_ACTIVE