Files
StarPilot/starpilot/common/starpilot_functions.py
T
firestar5683 5bd1977c89 A440
2026-05-15 01:14:06 -05:00

306 lines
12 KiB
Python

#!/usr/bin/env python3
import dataclasses
import json
import requests
import threading
import time
from pathlib import Path
from types import SimpleNamespace
from cereal import messaging
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.time_helpers import system_time_valid
from openpilot.system.hardware import HARDWARE
from openpilot.system.version import get_build_metadata
from openpilot.starpilot.assets.theme_manager import ThemeManager
from openpilot.starpilot.common.starpilot_backups import backup_starpilot
from openpilot.starpilot.common.connect_server import sync_konik_dongle_id
from openpilot.starpilot.common.maps_catalog import normalize_schedule_value, sanitize_selected_locations_csv
from openpilot.starpilot.common.theme_asset_names import find_matching_theme_asset_file
from openpilot.starpilot.common.starpilot_utilities import get_starpilot_api_info, is_FrogsGoMoo, is_url_pingable, run_cmd
from openpilot.starpilot.common.starpilot_variables import (
ERROR_LOGS_PATH, STARPILOT_API, FROGS_GO_MOO_PATH, HD_LOGS_PATH, KONIK_LOGS_PATH, MAPS_PATH, THEME_SAVE_PATH,
StarPilotVariables, get_starpilot_toggles
)
def seed_desktop_theme_assets():
params = Params()
params_memory = Params(memory=True)
params_defaults = Params(return_defaults=True)
theme_manager = ThemeManager(params, params_memory, boot_run=True)
custom_themes = params_defaults.get_bool("CustomThemes")
random_themes = custom_themes and params_defaults.get_bool("RandomThemes")
starpilot_toggles = SimpleNamespace(
boot_logo=params_defaults.get("BootLogo", encoding="utf-8", default="starpilot"),
holiday_themes=params_defaults.get_bool("HolidayThemes"),
random_themes=random_themes,
random_themes_holidays=random_themes and params_defaults.get_bool("RandomThemesHolidays"),
color_scheme=params_defaults.get("ColorScheme", encoding="utf-8", default="stock") if custom_themes else "stock",
distance_icons=params_defaults.get("DistanceIconPack", encoding="utf-8", default="stock") if custom_themes else "stock",
icon_pack=params_defaults.get("IconPack", encoding="utf-8", default="stock") if custom_themes else "stock",
sound_pack=params_defaults.get("SoundPack", encoding="utf-8", default="stock") if custom_themes else "stock",
signal_icons=params_defaults.get("SignalAnimation", encoding="utf-8", default="stock") if custom_themes else "stock",
wheel_image=params_defaults.get("WheelIcon", encoding="utf-8", default="stock") if custom_themes else "stock",
)
theme_manager.update_active_theme(
time_validated=system_time_valid(),
starpilot_toggles=starpilot_toggles,
boot_run=True,
)
theme_manager.update_theme_asset("distance_icons", starpilot_toggles.distance_icons, boot_run=True)
theme_manager.update_wheel_image(starpilot_toggles.wheel_image, boot_run=True)
def starpilot_boot_functions(build_metadata, params):
params_memory = Params(memory=True)
maps_selected_raw = params.get("MapsSelected")
maps_selected = sanitize_selected_locations_csv(maps_selected_raw)
if isinstance(maps_selected_raw, bytes):
maps_selected_raw = maps_selected_raw.decode("utf-8", errors="ignore")
if maps_selected != (maps_selected_raw or ""):
params.put("MapsSelected", maps_selected)
params.put("BuildMetadata", json.dumps(dataclasses.asdict(build_metadata)))
StarPilotVariables()
ThemeManager(params, params_memory, boot_run=True).update_active_theme(time_validated=system_time_valid(), starpilot_toggles=get_starpilot_toggles(), boot_run=True)
sync_konik_dongle_id(params)
def boot_thread():
while not system_time_valid():
print("Waiting for system time to become valid...")
time.sleep(1)
backup_starpilot(build_metadata, params)
threading.Thread(target=boot_thread, daemon=True).start()
def install_starpilot(build_metadata, params):
paths = [
ERROR_LOGS_PATH,
HD_LOGS_PATH,
KONIK_LOGS_PATH,
MAPS_PATH,
THEME_SAVE_PATH
]
for path in paths:
path.mkdir(parents=True, exist_ok=True)
register_device(build_metadata, params)
update_boot_logo(starpilot=True, selected_logo=params.get("BootLogo"))
if is_FrogsGoMoo():
mount_options = run_cmd(["findmnt", "-n", "-o", "OPTIONS", "/persist"], "Successfully retrieved mount options", "Failed to retrieve mount options")
run_cmd(["sudo", "mount", "-o", "remount,rw", "/persist"], "Successfully remounted /persist as read-write", "Failed to remount /persist")
run_cmd(["sudo", "python3", FROGS_GO_MOO_PATH], "Successfully ran frogsgomoo.py", "Failed to run frogsgomoo.py")
run_cmd(["sudo", "mount", "-o", f"remount,{mount_options}", "/persist"], "Successfully restored /persist mount options", "Failed to restore /persist mount options")
def register_device(build_metadata, params):
def register_thread():
dongle_id = params.get("DongleId")
if isinstance(dongle_id, bytes):
dongle_id = dongle_id.decode("utf-8", errors="ignore")
starpilot_dongle_id = params.get("StarPilotDongleId")
if isinstance(starpilot_dongle_id, bytes):
starpilot_dongle_id = starpilot_dongle_id.decode("utf-8", errors="ignore")
# Keep a stable local identifier even if the remote registration endpoint
# is unavailable or slow to respond.
if dongle_id and not starpilot_dongle_id:
params.put("StarPilotDongleId", dongle_id)
while not is_url_pingable(STARPILOT_API):
time.sleep(60)
payload = {
"api_token": params.get("StarPilotApiToken"),
"build_metadata": dataclasses.asdict(build_metadata),
"device": HARDWARE.get_device_type(),
"dongle_id": dongle_id,
"starpilot_dongle_id": params.get("StarPilotDongleId"),
}
try:
response = requests.post(f"{STARPILOT_API}/register", json=payload, headers={"Content-Type": "application/json", "User-Agent": "starpilot-api/1.0"}, timeout=10)
response.raise_for_status()
data = response.json()
params.put("StarPilotApiToken", data.get("api_token", ""))
params.put("StarPilotDongleId", data.get("starpilot_dongle_id"))
except Exception:
pass
threading.Thread(target=register_thread, daemon=True).start()
def uninstall_starpilot():
update_boot_logo(stock=True)
HARDWARE.uninstall()
def update_boot_logo(starpilot=False, stock=False, selected_logo=None):
if HARDWARE.get_device_type() == "pc":
return
boot_logo_location = Path("/usr/comma/bg.jpg")
if starpilot:
target_logo = Path(BASEDIR) / "starpilot/assets/other_images/starpilot_boot_logo.jpg"
if selected_logo:
selected = selected_logo.decode("utf-8", "ignore") if isinstance(selected_logo, (bytes, bytearray)) else str(selected_logo)
selected = selected.strip()
if selected.lower() not in {"", "stock", "default"}:
matched_logo = find_matching_theme_asset_file(THEME_SAVE_PATH / "bootlogos", selected)
if matched_logo is not None:
target_logo = matched_logo
elif stock:
target_logo = Path(BASEDIR) / "starpilot/assets/other_images/stock_bg.jpg"
else:
print(f'Error: Must specify either "starpilot=True" or "stock=True"')
return
if not target_logo.is_file():
print(f"Error: Target logo file not found at {target_logo}")
return
source_logo = target_logo
staged_logo = Path("/tmp/starpilot_boot_logo.jpg")
try:
from PIL import Image
with Image.open(target_logo) as img:
# weston.service always writes a JPEG copy of /usr/comma/bg.jpg; make sure
# the source image is already RGB JPEG to avoid startup failure on RGBA assets.
if img.format != "JPEG" or img.mode != "RGB":
img.convert("RGB").save(staged_logo, format="JPEG", quality=95)
source_logo = staged_logo
except Exception as error:
print(f"Error normalizing boot logo {target_logo}: {error}")
if target_logo.suffix.lower() not in {".jpg", ".jpeg"}:
print("Skipping boot logo update to keep weston startup stable.")
return
current_logo = boot_logo_location.read_bytes() if boot_logo_location.is_file() else b""
desired_logo = source_logo.read_bytes()
if current_logo != desired_logo:
mount_options = run_cmd(["findmnt", "-n", "-o", "OPTIONS", "/"], "Successfully retrieved mount options", "Failed to retrieve mount options")
run_cmd(["sudo", "mount", "-o", "remount,rw", "/"], "Successfully remounted / as read-write", "Failed to remount /")
run_cmd(["sudo", "cp", source_logo, boot_logo_location], "Successfully replaced boot logo", "Failed to replace boot logo")
run_cmd(["sudo", "mount", "-o", f"remount,{mount_options}", "/"], "Successfully restored / mount options", "Failed to restore / mount options")
def update_maps(now, params, params_memory, manual_update=False):
maps_selected_raw = params.get("MapsSelected")
maps_selected = sanitize_selected_locations_csv(maps_selected_raw)
if not maps_selected:
return
if isinstance(maps_selected_raw, bytes):
maps_selected_raw = maps_selected_raw.decode("utf-8", errors="ignore")
if maps_selected != (maps_selected_raw or ""):
params.put("MapsSelected", maps_selected)
day = now.day
is_first = day == 1
is_sunday = now.weekday() == 6
schedule = normalize_schedule_value(params.get("PreferredSchedule"))
maps_downloaded = MAPS_PATH.exists() and any(path.is_file() for path in MAPS_PATH.rglob("*"))
if maps_downloaded and (schedule == 0 or (schedule == 1 and not is_sunday) or (schedule == 2 and not is_first)) and not manual_update:
return
suffix = "th" if 11 <= day <= 13 else {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
todays_date = now.strftime(f"%B {day}{suffix}, %Y")
if maps_downloaded and params.get("LastMapsUpdate") == todays_date and not manual_update:
return
pm = messaging.PubMaster(["mapdIn"])
sm = messaging.SubMaster(["mapdExtendedOut"])
time.sleep(1)
msg = messaging.new_message("mapdIn")
msg.mapdIn.type = 0
msg.mapdIn.str = maps_selected
pm.send("mapdIn", msg)
started = False
while True:
sm.update(1000)
if params_memory.get_bool("CancelDownloadMaps"):
msg = messaging.new_message("mapdIn")
msg.mapdIn.type = 27
pm.send("mapdIn", msg)
params_memory.remove("CancelDownloadMaps")
params_memory.remove("DownloadMaps")
return
if sm.updated["mapdExtendedOut"]:
progress = sm["mapdExtendedOut"].downloadProgress
if progress.active:
started = True
if not progress.active and started:
break
params.put("LastMapsUpdate", todays_date)
params_memory.remove("DownloadMaps")
def update_openpilot(thread_manager, params):
def update_available():
run_cmd(["pkill", "-SIGUSR1", "-f", "system.updated.updated"], "Checking for updates...", "Failed to check for update...", report=False)
while params.get("UpdaterState") != "checking...":
time.sleep(1)
while params.get("UpdaterState") == "checking...":
time.sleep(1)
if not params.get_bool("UpdaterFetchAvailable"):
return False
while params.get_bool("IsOnroad") or thread_manager.is_thread_alive("lock_doors"):
time.sleep(60)
run_cmd(["pkill", "-SIGHUP", "-f", "system.updated.updated"], "Update available, downloading...", "Failed to download update...", report=False)
while not params.get_bool("UpdateAvailable"):
time.sleep(60)
return True
if params.get("UpdaterState") != "idle":
return
while params.get_bool("IsOnroad") or thread_manager.is_thread_alive("lock_doors"):
time.sleep(60)
if not update_available():
return
while True:
if not update_available():
break
while params.get_bool("IsOnroad") or thread_manager.is_thread_alive("lock_doors"):
time.sleep(60)
HARDWARE.reboot()