Files
sunnypilot/system/ui/mici_setup.py
Jason Wen 00afa068a1 Merge branch 'upstream/openpilot/master' into sync-20260304
# Conflicts:
#	selfdrive/ui/mici/layouts/onboarding.py
2026-03-05 01:27:07 -05:00

640 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import re
import threading
import time
import urllib.request
import urllib.error
from urllib.parse import urlparse
import shutil
from collections.abc import Callable
import pyray as rl
from cereal import log
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.system.hardware import HARDWARE, TICI
from openpilot.common.realtime import config_realtime_process, set_core_affinity
from openpilot.common.swaglog import cloudlog
from openpilot.common.utils import run_cmd
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.wifi_manager import WifiManager
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.nav_widget import NavWidget
from openpilot.system.ui.widgets.button import SmallButton
from openpilot.system.ui.widgets.label import UnifiedLabel
from openpilot.system.ui.widgets.scroller import Scroller, NavScroller, ITEM_SPACING
from openpilot.system.ui.widgets.slider import LargerSlider, SmallSlider
from openpilot.selfdrive.ui.mici.layouts.settings.network import WifiNetworkButton
from openpilot.selfdrive.ui.mici.layouts.settings.network.wifi_ui import WifiUIMici
from openpilot.selfdrive.ui.mici.widgets.dialog import BigInputDialog
from openpilot.selfdrive.ui.mici.widgets.button import BigButton
NetworkType = log.DeviceState.NetworkType
OPENPILOT_URL = "https://openpilot.comma.ai"
USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
CONTINUE_PATH = "/data/continue.sh"
TMP_CONTINUE_PATH = "/data/continue.sh.new"
INSTALL_PATH = "/data/openpilot"
VALID_CACHE_PATH = "/data/.openpilot_cache"
INSTALLER_SOURCE_PATH = "/usr/comma/installer"
INSTALLER_DESTINATION_PATH = "/tmp/installer"
INSTALLER_URL_PATH = "/tmp/installer_url"
CONTINUE = """#!/usr/bin/env bash
cd /data/openpilot
exec ./launch_openpilot.sh
"""
class NetworkConnectivityMonitor:
def __init__(self, should_check: Callable[[], bool] | None = None):
self.network_connected = threading.Event()
self.wifi_connected = threading.Event()
self._should_check = should_check or (lambda: True)
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
def start(self):
self._stop_event.clear()
if self._thread is None or not self._thread.is_alive():
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
if self._thread is not None:
self._stop_event.set()
self._thread.join()
self._thread = None
def reset(self):
self.network_connected.clear()
self.wifi_connected.clear()
def _run(self):
while not self._stop_event.is_set():
if self._should_check():
try:
request = urllib.request.Request(OPENPILOT_URL, method="HEAD")
urllib.request.urlopen(request, timeout=2.0)
self.network_connected.set()
if HARDWARE.get_network_type() == NetworkType.wifi:
self.wifi_connected.set()
except Exception:
self.reset()
else:
self.reset()
if self._stop_event.wait(timeout=1.0):
break
class StartPage(Widget):
def __init__(self):
super().__init__()
self._title = UnifiedLabel("start", 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER,
alignment_vertical=rl.GuiTextAlignmentVertical.TEXT_ALIGN_MIDDLE)
self._start_bg_txt = gui_app.texture("icons_mici/setup/start_button.png", 500, 224, keep_aspect_ratio=False)
self._start_bg_pressed_txt = gui_app.texture("icons_mici/setup/start_button_pressed.png", 500, 224, keep_aspect_ratio=False)
self._scale_filter = FirstOrderFilter(1.0, 0.1, 1 / gui_app.target_fps)
self._click_delay = 0.075
def _render(self, rect: rl.Rectangle):
scale = self._scale_filter.update(1.07 if self.is_pressed else 1.0)
base_draw_x = rect.x + (rect.width - self._start_bg_txt.width) / 2
base_draw_y = rect.y + (rect.height - self._start_bg_txt.height) / 2
draw_x = base_draw_x + (self._start_bg_txt.width * (1 - scale)) / 2
draw_y = base_draw_y + (self._start_bg_txt.height * (1 - scale)) / 2
texture = self._start_bg_pressed_txt if self.is_pressed else self._start_bg_txt
rl.draw_texture_ex(texture, (draw_x, draw_y), 0, scale, rl.WHITE)
self._title.render(rl.Rectangle(rect.x, rect.y + (draw_y - base_draw_y), rect.width, rect.height))
class SoftwareSelectionPage(NavWidget):
def __init__(self, use_openpilot_callback: Callable,
use_custom_software_callback: Callable):
super().__init__()
self._openpilot_slider = LargerSlider("slide to install\nopenpilot", use_openpilot_callback)
self._openpilot_slider.set_enabled(lambda: self.enabled and not self.is_dismissing)
self._custom_software_slider = LargerSlider("slide to install\nother software", use_custom_software_callback, green=False)
self._custom_software_slider.set_enabled(lambda: self.enabled and not self.is_dismissing)
def show_event(self):
super().show_event()
self._nav_bar._alpha = 0.0
def _update_state(self):
super()._update_state()
if self.is_dismissing:
self.reset()
def reset(self):
self._openpilot_slider.reset()
self._custom_software_slider.reset()
def _render(self, rect: rl.Rectangle):
self._openpilot_slider.set_opacity(1.0 - self._custom_software_slider.slider_percentage)
self._custom_software_slider.set_opacity(1.0 - self._openpilot_slider.slider_percentage)
openpilot_rect = rl.Rectangle(
rect.x + (rect.width - self._openpilot_slider.rect.width) / 2,
rect.y,
self._openpilot_slider.rect.width,
rect.height / 2,
)
self._openpilot_slider.render(openpilot_rect)
custom_software_rect = rl.Rectangle(
rect.x + (rect.width - self._custom_software_slider.rect.width) / 2,
rect.y + rect.height / 2,
self._custom_software_slider.rect.width,
rect.height / 2,
)
self._custom_software_slider.render(custom_software_rect)
class CustomSoftwareWarningPage(NavScroller):
def __init__(self, continue_callback: Callable, back_callback: Callable):
super().__init__()
self.set_back_callback(back_callback)
self._continue_button = BigPillButton("next")
self._continue_button.set_click_callback(continue_callback)
self._scroller.add_widgets([
GreyBigButton("use caution", "when installing\n3rd party software",
gui_app.texture("icons_mici/setup/warning.png", 64, 58)),
GreyBigButton("", "• It has not been tested by comma"),
GreyBigButton("", "• It may not comply with relevant safety standards."),
GreyBigButton("", "• It may cause damage to your device and/or vehicle."),
GreyBigButton("how to restore to a\nfactory state later", "https://flash.comma.ai",
gui_app.texture("icons_mici/setup/restore.png", 64, 64)),
self._continue_button,
])
class DownloadingPage(Widget):
def __init__(self):
super().__init__()
self._title_label = UnifiedLabel("downloading...", 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY)
self._progress_label = UnifiedLabel("", 132, text_color=rl.Color(255, 255, 255, int(255 * 0.9 * 0.65)),
font_weight=FontWeight.ROMAN, alignment_vertical=rl.GuiTextAlignmentVertical.TEXT_ALIGN_BOTTOM)
self._progress = 0
def show_event(self):
super().show_event()
self.set_progress(0)
def set_progress(self, progress: int):
self._progress = progress
self._progress_label.set_text(f"{progress}%")
def _render(self, rect: rl.Rectangle):
rl.draw_rectangle_rec(rect, rl.BLACK)
self._title_label.render(rl.Rectangle(
rect.x + 12,
rect.y + 2,
rect.width,
64,
))
self._progress_label.render(rl.Rectangle(
rect.x + 12,
rect.y + 18,
rect.width,
rect.height,
))
class FailedPageBase(Widget):
def __init__(self, reboot_callback: Callable, retry_callback: Callable, title: str = "download failed"):
super().__init__()
self._title_label = UnifiedLabel(title, 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY)
self._reason_label = UnifiedLabel("", 36, text_color=rl.Color(255, 255, 255, int(255 * 0.9 * 0.65)),
font_weight=FontWeight.ROMAN)
self._reboot_slider = SmallSlider("reboot", reboot_callback)
self._reboot_slider.set_enabled(lambda: self.enabled) # for nav stack
self._retry_button = SmallButton("retry")
self._retry_button.set_click_callback(retry_callback)
self._retry_button.set_enabled(lambda: self.enabled) # for nav stack
def set_reason(self, reason: str):
self._reason_label.set_text(reason)
def show_event(self):
super().show_event()
self._reboot_slider.reset()
def _render(self, rect: rl.Rectangle):
self._title_label.render(rl.Rectangle(
rect.x + 8,
rect.y + 10,
rect.width,
64,
))
self._reason_label.render(rl.Rectangle(
rect.x + 8,
rect.y + 10 + 64,
rect.width,
36,
))
self._retry_button.set_opacity(1 - self._reboot_slider.slider_percentage)
self._retry_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._retry_button.rect.height,
self._retry_button.rect.width,
self._retry_button.rect.height,
))
self._reboot_slider.render(rl.Rectangle(
self._rect.x + self._rect.width - self._reboot_slider.rect.width,
self._rect.y + self._rect.height - self._reboot_slider.rect.height,
self._reboot_slider.rect.width,
self._reboot_slider.rect.height,
))
class FailedPage(FailedPageBase, NavWidget):
def __init__(self, reboot_callback: Callable, retry_callback: Callable, title: str = "download failed"):
super().__init__(reboot_callback, retry_callback, title)
self.set_back_callback(retry_callback)
class GreyBigButton(BigButton):
"""Users should manage newlines with this class themselves"""
LABEL_HORIZONTAL_PADDING = 30
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.set_touch_valid_callback(lambda: False)
self._rect.width = 476
self._label.set_font_size(36)
self._label.set_font_weight(FontWeight.BOLD)
self._label.set_line_height(1.0)
self._sub_label.set_font_size(36)
self._sub_label.set_text_color(rl.Color(255, 255, 255, int(255 * 0.9)))
self._sub_label.set_font_weight(FontWeight.DISPLAY_REGULAR)
self._sub_label.set_alignment_vertical(rl.GuiTextAlignmentVertical.TEXT_ALIGN_MIDDLE if not self._label.text else
rl.GuiTextAlignmentVertical.TEXT_ALIGN_BOTTOM)
self._sub_label.set_line_height(0.95)
@property
def LABEL_VERTICAL_PADDING(self):
return BigButton.LABEL_VERTICAL_PADDING if self._label.text else 18
def _width_hint(self) -> int:
return int(self._rect.width - self.LABEL_HORIZONTAL_PADDING * 2)
def _render(self, _):
rl.draw_rectangle_rounded(self._rect, 0.4, 10, rl.Color(255, 255, 255, int(255 * 0.15)))
self._draw_content(self._rect.y)
class BigPillButton(BigButton):
def __init__(self, *args, green: bool = False, disabled_background: bool = False, **kwargs):
self._green = green
self._disabled_background = disabled_background
super().__init__(*args, **kwargs)
self._label.set_font_size(48)
self._label.set_alignment(rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
self._label.set_alignment_vertical(rl.GuiTextAlignmentVertical.TEXT_ALIGN_MIDDLE)
def _load_images(self):
if self._green:
self._txt_default_bg = gui_app.texture("icons_mici/setup/start_button.png", 402, 180)
self._txt_pressed_bg = gui_app.texture("icons_mici/setup/start_button_pressed.png", 402, 180)
else:
self._txt_default_bg = gui_app.texture("icons_mici/setup/continue.png", 402, 180)
self._txt_pressed_bg = gui_app.texture("icons_mici/setup/continue_pressed.png", 402, 180)
self._txt_disabled_bg = gui_app.texture("icons_mici/setup/continue_disabled.png", 402, 180)
def set_green(self, green: bool):
if self._green != green:
self._green = green
self._load_images()
def _update_label_layout(self):
# Don't change label text size
pass
def _handle_background(self) -> tuple[rl.Texture, float, float, float]:
txt_bg, btn_x, btn_y, scale = super()._handle_background()
if self._disabled_background:
txt_bg = self._txt_disabled_bg
return txt_bg, btn_x, btn_y, scale
class NetworkSetupPageBase(Scroller):
def __init__(self, network_monitor: NetworkConnectivityMonitor, continue_callback: Callable[[bool], None],
disable_connect_hint: bool = False):
super().__init__()
self._wifi_manager = WifiManager()
self._wifi_manager.set_active(True)
self._network_monitor = network_monitor
self._custom_software = False
self._prev_has_internet = False
self._wifi_ui = WifiUIMici(self._wifi_manager)
self._connect_button = GreyBigButton("connect to\ninternet", "swipe down to go back",
gui_app.texture("icons_mici/setup/small_slider/slider_arrow.png", 64, 56, flip_x=True))
self._connect_button.set_visible(not disable_connect_hint)
self._wifi_button = WifiNetworkButton(self._wifi_manager)
self._wifi_button.set_click_callback(lambda: gui_app.push_widget(self._wifi_ui))
self._show_time = 0.0
self._pending_has_internet_scroll = False
self._pending_continue_grow_animation = False
self._pending_wifi_grow_animation = False
def on_waiting_click():
offset = (self._wifi_button.rect.x + self._wifi_button.rect.width / 2) - (self._rect.x + self._rect.width / 2)
self._scroller.scroll_to(offset, smooth=True, block_interaction=True)
# trigger grow when wifi button in view
self._pending_wifi_grow_animation = True
self._waiting_button = BigPillButton("waiting for\ninternet...", disabled_background=True)
self._waiting_button.set_click_callback(on_waiting_click)
self._continue_button = BigPillButton("install openpilot", green=True)
self._continue_button.set_click_callback(lambda: continue_callback(self._custom_software))
self._scroller.add_widgets([
self._connect_button,
self._wifi_button,
self._continue_button,
self._waiting_button,
])
gui_app.add_nav_stack_tick(self._nav_stack_tick)
def show_event(self):
super().show_event()
self._show_time = rl.get_time()
self._prev_has_internet = False
self._pending_has_internet_scroll = False
self._pending_continue_grow_animation = False
self._pending_wifi_grow_animation = False
def _nav_stack_tick(self):
self._wifi_manager.process_callbacks()
has_internet = self._network_monitor.network_connected.is_set()
if has_internet and not self._prev_has_internet:
self._pending_has_internet_scroll = True
self._prev_has_internet = has_internet
if self._pending_has_internet_scroll:
# Scrolls over to continue button, then grows once in view
elapsed = rl.get_time() - self._show_time
if elapsed > 0.5:
self._pending_has_internet_scroll = False
def scroll_to_download():
self._scroller._layout()
end_offset = -(self._scroller.content_size - self._rect.width)
remaining = self._scroller.scroll_panel.get_offset() - end_offset
self._scroller.scroll_to(remaining, smooth=True, block_interaction=True)
self._pending_continue_grow_animation = True
# Animate WifiUi down first before scroll
gui_app.pop_widgets_to(self, scroll_to_download)
def set_custom_software(self, custom_software: bool):
self._custom_software = custom_software
self._continue_button.set_text("install openpilot" if not custom_software else "choose software")
self._continue_button.set_green(not custom_software)
def set_is_updater(self):
self._continue_button.set_text("download\n& install")
self._continue_button.set_green(False)
def _update_state(self):
super()._update_state()
if self._pending_continue_grow_animation:
btn_right = self._continue_button.rect.x + self._continue_button.rect.width
visible_right = self._rect.x + self._rect.width
if btn_right < visible_right + 50:
self._pending_continue_grow_animation = False
self._continue_button.trigger_grow_animation()
if self._pending_wifi_grow_animation and abs(self._wifi_button.rect.x - ITEM_SPACING) < 50:
self._pending_wifi_grow_animation = False
self._wifi_button.trigger_grow_animation()
if self._network_monitor.network_connected.is_set():
self._continue_button.set_visible(True)
self._waiting_button.set_visible(False)
else:
self._continue_button.set_visible(False)
self._waiting_button.set_visible(True)
class NetworkSetupPage(NetworkSetupPageBase, NavScroller):
def __init__(self, network_monitor: NetworkConnectivityMonitor, continue_callback: Callable[[bool], None],
back_callback: Callable[[], None] | None):
super().__init__(network_monitor, continue_callback)
self.set_back_callback(back_callback)
class Setup(Widget):
def __init__(self):
super().__init__()
self.download_url = ""
self.download_progress = 0
self.download_thread = None
self._download_failed_reason: str | None = None
self._network_monitor = NetworkConnectivityMonitor()
self._network_monitor.start()
def getting_started_button_callback():
self._software_selection_page.reset()
gui_app.push_widget(self._software_selection_page)
self._start_page = StartPage()
self._start_page.set_click_callback(getting_started_button_callback)
self._start_page.set_enabled(lambda: self.enabled) # for nav stack
self._network_setup_page = NetworkSetupPage(self._network_monitor, self._network_setup_continue_callback, self._pop_to_software_selection)
self._software_selection_page = SoftwareSelectionPage(self._use_openpilot, lambda: gui_app.push_widget(self._custom_software_warning_page))
self._download_failed_page = FailedPage(HARDWARE.reboot, self._pop_to_software_selection)
self._custom_software_warning_page = CustomSoftwareWarningPage(lambda: self._push_network_setup(True), self._pop_to_software_selection)
self._downloading_page = DownloadingPage()
gui_app.add_nav_stack_tick(self._nav_stack_tick)
def _nav_stack_tick(self):
self._downloading_page.set_progress(self.download_progress)
if self._download_failed_reason is not None:
reason = self._download_failed_reason
self._download_failed_reason = None
self._download_failed_page.set_reason(reason)
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
gui_app.push_widget(self._download_failed_page)
def _render(self, rect: rl.Rectangle):
self._start_page.render(rect)
def close(self):
self._network_monitor.stop()
def _pop_to_software_selection(self):
# reset sliders after dismiss completes
gui_app.pop_widgets_to(self._software_selection_page, self._software_selection_page.reset)
def _use_openpilot(self):
if os.path.isdir(INSTALL_PATH) and os.path.isfile(VALID_CACHE_PATH):
os.remove(VALID_CACHE_PATH)
with open(TMP_CONTINUE_PATH, "w") as f:
f.write(CONTINUE)
run_cmd(["chmod", "+x", TMP_CONTINUE_PATH])
shutil.move(TMP_CONTINUE_PATH, CONTINUE_PATH)
shutil.copyfile(INSTALLER_SOURCE_PATH, INSTALLER_DESTINATION_PATH)
# give time for installer UI to take over
time.sleep(0.1)
gui_app.request_close()
else:
self._push_network_setup()
def _push_network_setup(self, custom_software: bool = False):
# to fire the correct continue callback later
self._network_setup_page.set_custom_software(custom_software)
gui_app.pop_widgets_to(self._software_selection_page, lambda: gui_app.push_widget(self._network_setup_page))
def _network_setup_continue_callback(self, custom_software: bool):
if not custom_software:
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
self._download(OPENPILOT_URL)
else:
def handle_keyboard_result(text):
url = text.strip()
if url:
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
self._download(url)
keyboard = BigInputDialog("custom software URL...", confirm_callback=handle_keyboard_result, auto_return_to_letters="./")
gui_app.push_widget(keyboard)
def _download(self, url: str):
# autocomplete incomplete URLs
if re.match("^([^/.]+)/([^/]+)$", url):
url = f"https://installer.comma.ai/{url}"
parsed = urlparse(url, scheme='https')
self.download_url = (urlparse(f"https://{url}") if not parsed.netloc else parsed).geturl()
self.download_progress = 0
gui_app.push_widget(self._downloading_page)
self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
self.download_thread.start()
def _download_thread(self):
try:
import tempfile
fd, tmpfile = tempfile.mkstemp(prefix="installer_")
headers = {"User-Agent": USER_AGENT,
"X-openpilot-serial": HARDWARE.get_serial(),
"X-openpilot-device-type": HARDWARE.get_device_type()}
req = urllib.request.Request(self.download_url, headers=headers)
with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
block_size = 8192
while True:
buffer = response.read(block_size)
if not buffer:
break
downloaded += len(buffer)
f.write(buffer)
if total_size:
self.download_progress = int(downloaded * 100 / total_size)
is_elf = False
with open(tmpfile, 'rb') as f:
header = f.read(4)
is_elf = header == b'\x7fELF'
if not is_elf:
self._download_failed_reason = "No custom software found at this URL."
return
# AGNOS might try to execute the installer before this process exits.
# Therefore, important to close the fd before renaming the installer.
os.close(fd)
os.rename(tmpfile, INSTALLER_DESTINATION_PATH)
with open(INSTALLER_URL_PATH, "w") as f:
f.write(self.download_url)
# give time for installer UI to take over
time.sleep(0.1)
gui_app.request_close()
except urllib.error.HTTPError as e:
if e.code == 409:
self._download_failed_reason = "Incompatible sunnypilot version"
except Exception:
self._download_failed_reason = "Invalid URL"
def main():
config_realtime_process(0, 51)
# attempt to affine. AGNOS will start setup with all cores, should only fail when manually launching with screen off
if TICI:
try:
set_core_affinity([5])
except OSError:
cloudlog.exception("Failed to set core affinity for setup process")
try:
gui_app.init_window("Setup")
setup = Setup()
gui_app.push_widget(setup)
for _ in gui_app.render():
pass
setup.close()
except Exception as e:
print(f"Setup error: {e}")
finally:
gui_app.close()
if __name__ == "__main__":
main()