mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-28 01:52:06 +08:00
konik
This commit is contained in:
@@ -14,6 +14,7 @@ from openpilot.common.params import Params
|
||||
from openpilot.common.time_helpers import system_time_valid
|
||||
from openpilot.system.athena.registration import register
|
||||
from openpilot.system.hardware import HARDWARE
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
from openpilot.system.version import get_build_metadata
|
||||
|
||||
from openpilot.starpilot.assets.theme_manager import ThemeManager
|
||||
@@ -27,6 +28,56 @@ from openpilot.starpilot.common.starpilot_variables import (
|
||||
)
|
||||
|
||||
|
||||
def _normalize_dongle_id(value):
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode("utf-8", errors="ignore")
|
||||
if value is None:
|
||||
return None
|
||||
value = str(value).strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def _read_persisted_stock_dongle_id():
|
||||
persisted_dongle_id_path = Path(Paths.persist_root()) / "comma" / "dongle_id"
|
||||
if not persisted_dongle_id_path.is_file():
|
||||
return None
|
||||
return _normalize_dongle_id(persisted_dongle_id_path.read_text())
|
||||
|
||||
|
||||
def _ensure_stock_dongle_id(params):
|
||||
current_dongle_id = _normalize_dongle_id(params.get("DongleId"))
|
||||
konik_dongle_id = _normalize_dongle_id(params.get("KonikDongleId"))
|
||||
stock_dongle_id = _normalize_dongle_id(params.get("StockDongleId"))
|
||||
|
||||
if stock_dongle_id not in (None, konik_dongle_id):
|
||||
return stock_dongle_id
|
||||
|
||||
candidate = _read_persisted_stock_dongle_id()
|
||||
if candidate in (None, konik_dongle_id):
|
||||
candidate = current_dongle_id if current_dongle_id != konik_dongle_id else None
|
||||
|
||||
if candidate is not None and candidate != stock_dongle_id:
|
||||
params.put("StockDongleId", candidate)
|
||||
|
||||
return candidate
|
||||
|
||||
|
||||
def sync_konik_dongle_id(params):
|
||||
current_dongle_id = _normalize_dongle_id(params.get("DongleId"))
|
||||
konik_dongle_id = _normalize_dongle_id(params.get("KonikDongleId"))
|
||||
stock_dongle_id = _ensure_stock_dongle_id(params)
|
||||
|
||||
if use_konik_server():
|
||||
if konik_dongle_id is None:
|
||||
konik_dongle_id = _normalize_dongle_id(register(show_spinner=True, register_konik=True))
|
||||
if konik_dongle_id is not None:
|
||||
params.put("KonikDongleId", konik_dongle_id)
|
||||
if konik_dongle_id is not None and current_dongle_id != konik_dongle_id:
|
||||
params.put("DongleId", konik_dongle_id)
|
||||
elif current_dongle_id == konik_dongle_id and stock_dongle_id is not None:
|
||||
params.put("DongleId", stock_dongle_id)
|
||||
|
||||
|
||||
def seed_desktop_theme_assets():
|
||||
params = Params()
|
||||
params_memory = Params(memory=True)
|
||||
@@ -73,14 +124,7 @@ def starpilot_boot_functions(build_metadata, params):
|
||||
StarPilotVariables()
|
||||
ThemeManager(params, params_memory, boot_run=True).update_active_theme(time_validated=system_time_valid(), starpilot_toggles=get_starpilot_toggles(), boot_run=True)
|
||||
|
||||
if use_konik_server():
|
||||
if params.get("KonikDongleId") is not None:
|
||||
params.put("DongleId", params.get("KonikDongleId"))
|
||||
else:
|
||||
params.put("KonikDongleId", register(show_spinner=True, register_konik=True))
|
||||
params.put("DongleId", params.get("KonikDongleId"))
|
||||
elif params.get("DongleId") == params.get("KonikDongleId"):
|
||||
params.put("DongleId", params.get("StockDongleId"))
|
||||
sync_konik_dongle_id(params)
|
||||
|
||||
def boot_thread():
|
||||
while not system_time_valid():
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
from openpilot.starpilot.common import starpilot_functions as spf
|
||||
|
||||
|
||||
class FakeParams:
|
||||
def __init__(self, values=None):
|
||||
self.values = dict(values or {})
|
||||
|
||||
def get(self, key):
|
||||
return self.values.get(key)
|
||||
|
||||
def put(self, key, value):
|
||||
self.values[key] = value
|
||||
|
||||
|
||||
def test_sync_konik_dongle_id_preserves_stock_id_before_switching(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(spf.Paths, "persist_root", staticmethod(lambda: str(tmp_path)))
|
||||
monkeypatch.setattr(spf, "use_konik_server", lambda: True)
|
||||
monkeypatch.setattr(spf, "register", lambda **kwargs: "konik-dongle")
|
||||
|
||||
params = FakeParams({"DongleId": "stock-dongle"})
|
||||
|
||||
spf.sync_konik_dongle_id(params)
|
||||
|
||||
assert params.get("StockDongleId") == "stock-dongle"
|
||||
assert params.get("KonikDongleId") == "konik-dongle"
|
||||
assert params.get("DongleId") == "konik-dongle"
|
||||
|
||||
|
||||
def test_sync_konik_dongle_id_restores_stock_id_from_persist(monkeypatch, tmp_path):
|
||||
persist_root = tmp_path / "persist"
|
||||
persisted_dongle_id_path = persist_root / "comma" / "dongle_id"
|
||||
persisted_dongle_id_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
persisted_dongle_id_path.write_text("stock-dongle")
|
||||
|
||||
monkeypatch.setattr(spf.Paths, "persist_root", staticmethod(lambda: str(persist_root)))
|
||||
monkeypatch.setattr(spf, "use_konik_server", lambda: False)
|
||||
|
||||
params = FakeParams({
|
||||
"DongleId": "konik-dongle",
|
||||
"KonikDongleId": "konik-dongle",
|
||||
})
|
||||
|
||||
spf.sync_konik_dongle_id(params)
|
||||
|
||||
assert params.get("StockDongleId") == "stock-dongle"
|
||||
assert params.get("DongleId") == "stock-dongle"
|
||||
|
||||
|
||||
def test_sync_konik_dongle_id_skips_missing_stock_backup(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(spf.Paths, "persist_root", staticmethod(lambda: str(tmp_path)))
|
||||
monkeypatch.setattr(spf, "use_konik_server", lambda: False)
|
||||
|
||||
params = FakeParams({
|
||||
"DongleId": "konik-dongle",
|
||||
"KonikDongleId": "konik-dongle",
|
||||
})
|
||||
|
||||
spf.sync_konik_dongle_id(params)
|
||||
|
||||
assert params.get("DongleId") == "konik-dongle"
|
||||
assert params.get("StockDongleId") is None
|
||||
Reference in New Issue
Block a user