mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-21 01:42:06 +08:00
03a4f7ef9a
* init: tizi_replay.py from pr 37123 * separate coverage folder * ui replay: adjust HOLD constant, fix coverage, use separate folder for coverage * openpilot prefix * fix directory * fix ui_state * fix settings click pos * remove * attempt merge replay files * remove * todo * fix recording * spacing * simplify * comment * refactor hold * refactor: remove layout definitions from VARIANTS and import conditionally in run_replay * refactor: remove VARIANTS config * add argparser with --big flag and improve coverage sources * refactor * lowercase * refactor: combine scripts * add types * refactor: move imports for gui_app and ui_state to improve coverage and organization * update * update script * comment * fix headless * todo * fix: get_time and get_frame_time determinism * todo * remove file accidently commited * fix: improve inject_click and handle_event for deterministic event timestamps * comment * simplify add * refactor script building * fix mici clicks * pass in pm * fix wifi state * refactor clicks * more refactor * click cancel instead of remove overlay * setup_send_fn * add setup fn * dummy update * change * remove todo * rename fn to frame_fn * refactor * fix workflow * rename raylib ui preview to old * rename mici workflow * fix diff videos * ignore sub html and mp4 files * rename for diff * rename for diff again (mici) * use ScriptEvent instead of DummyEvent, and move mouse events directly to it; rename hold to wait * fix: only import MouseEvent for type hint to fix coverage * adjust settings button click * clarify * move ScriptEvent to replay_script * add handle_event function * remove passing in setup function, and refactor click events * clean * formatting * refactor * no import * comment * refactor * refactor setup functions to replay_setup * refactor * add ReplayContext * refactor * move more setup functions * refactor and simplify * refactor * refactor: add Script class * refactor: enhance Script event handling and add wait functionality * refactor * remove setup_and_click * use script.setup instead * comments * rename wait_frames to wait_after * add comments * revert workflows * revert rename * move arg parsing to main * remove quotes * add type * return types * type * VariantType * rename to LayoutVariant * clarify * switch * todo * Revert "fix diff videos" This reverts commit 7a6e45a409cb7e6d7a330317639fcee74ef8bd31. * add todos * add more coverage * wait 2 frames by default * add comment * comment * switch * fix space Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * remove extra Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Remove unnecessary blank line in ReplayContext class * simplify --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
250 lines
9.1 KiB
Python
250 lines
9.1 KiB
Python
from __future__ import annotations
|
|
from typing import TYPE_CHECKING
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
|
|
from cereal import car, log, messaging
|
|
from cereal.messaging import PubMaster
|
|
from openpilot.common.basedir import BASEDIR
|
|
from openpilot.common.params import Params
|
|
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
|
|
from openpilot.selfdrive.ui.tests.diff.replay import FPS, LayoutVariant
|
|
from openpilot.system.updated.updated import parse_release_notes
|
|
|
|
WAIT = int(FPS * 0.5) # Default frames to wait after events
|
|
|
|
AlertSize = log.SelfdriveState.AlertSize
|
|
AlertStatus = log.SelfdriveState.AlertStatus
|
|
|
|
BRANCH_NAME = "this-is-a-really-super-mega-ultra-max-extreme-ultimate-long-branch-name"
|
|
|
|
|
|
@dataclass
|
|
class ScriptEvent:
|
|
if TYPE_CHECKING:
|
|
# Only import for type checking to avoid excluding the application code from coverage
|
|
from openpilot.system.ui.lib.application import MouseEvent
|
|
|
|
setup: Callable | None = None # Setup function to run prior to adding mouse events
|
|
mouse_events: list[MouseEvent] | None = None # Mouse events to send to the application on this event's frame
|
|
send_fn: Callable | None = None # When set, the main loop uses this as the new persistent sender
|
|
|
|
|
|
ScriptEntry = tuple[int, ScriptEvent] # (frame, event)
|
|
|
|
|
|
class Script:
|
|
def __init__(self, fps: int) -> None:
|
|
self.fps = fps
|
|
self.frame = 0
|
|
self.entries: list[ScriptEntry] = []
|
|
|
|
def get_frame_time(self) -> float:
|
|
return self.frame / self.fps
|
|
|
|
def add(self, event: ScriptEvent, before: int = 0, after: int = 0) -> None:
|
|
"""Add event to the script, optionally with the given number of frames to wait before or after the event."""
|
|
self.frame += before
|
|
self.entries.append((self.frame, event))
|
|
self.frame += after
|
|
|
|
def end(self) -> None:
|
|
"""Add a final empty event to mark the end of the script."""
|
|
self.add(ScriptEvent()) # Without this, it will just end on the last event without waiting for any specified delay after it
|
|
|
|
def wait(self, frames: int) -> None:
|
|
"""Add a delay for the given number of frames followed by an empty event."""
|
|
self.add(ScriptEvent(), before=frames)
|
|
|
|
def setup(self, fn: Callable, wait_after: int = WAIT) -> None:
|
|
"""Add a setup function to be called immediately followed by a delay of the given number of frames."""
|
|
self.add(ScriptEvent(setup=fn), after=wait_after)
|
|
|
|
def set_send(self, fn: Callable, wait_after: int = WAIT) -> None:
|
|
"""Set a new persistent send function to be called every frame."""
|
|
self.add(ScriptEvent(send_fn=fn), after=wait_after)
|
|
|
|
# TODO: Also add more complex gestures, like swipe or drag
|
|
def click(self, x: int, y: int, wait_after: int = WAIT, wait_between: int = 2) -> None:
|
|
"""Add a click event to the script for the given position and specify frames to wait between mouse events or after the click."""
|
|
# NOTE: By default we wait a couple frames between mouse events so pressed states will be rendered
|
|
from openpilot.system.ui.lib.application import MouseEvent, MousePos
|
|
|
|
# TODO: Add support for long press (left_down=True)
|
|
mouse_down = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=True, left_released=False, left_down=False, t=self.get_frame_time())
|
|
self.add(ScriptEvent(mouse_events=[mouse_down]), after=wait_between)
|
|
mouse_up = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=True, left_down=False, t=self.get_frame_time())
|
|
self.add(ScriptEvent(mouse_events=[mouse_up]), after=wait_after)
|
|
|
|
|
|
# --- Setup functions ---
|
|
|
|
def put_update_params(params: Params | None = None) -> None:
|
|
if params is None:
|
|
params = Params()
|
|
params.put("UpdaterCurrentReleaseNotes", parse_release_notes(BASEDIR))
|
|
params.put("UpdaterNewReleaseNotes", parse_release_notes(BASEDIR))
|
|
params.put("UpdaterTargetBranch", BRANCH_NAME)
|
|
|
|
|
|
def setup_offroad_alerts() -> None:
|
|
put_update_params(Params())
|
|
set_offroad_alert("Offroad_TemperatureTooHigh", True, extra_text='99C')
|
|
set_offroad_alert("Offroad_ExcessiveActuation", True, extra_text='longitudinal')
|
|
set_offroad_alert("Offroad_IsTakingSnapshot", True)
|
|
|
|
|
|
def setup_update_available() -> None:
|
|
params = Params()
|
|
params.put_bool("UpdateAvailable", True)
|
|
params.put("UpdaterNewDescription", f"0.10.2 / {BRANCH_NAME} / 0a1b2c3 / Jan 01")
|
|
put_update_params(params)
|
|
|
|
|
|
def setup_developer_params() -> None:
|
|
CP = car.CarParams()
|
|
CP.alphaLongitudinalAvailable = True
|
|
Params().put("CarParamsPersistent", CP.to_bytes())
|
|
|
|
|
|
# --- Send functions ---
|
|
|
|
def send_onroad(pm: PubMaster) -> None:
|
|
ds = messaging.new_message('deviceState')
|
|
ds.deviceState.started = True
|
|
ds.deviceState.networkType = log.DeviceState.NetworkType.wifi
|
|
|
|
ps = messaging.new_message('pandaStates', 1)
|
|
ps.pandaStates[0].pandaType = log.PandaState.PandaType.dos
|
|
ps.pandaStates[0].ignitionLine = True
|
|
|
|
pm.send('deviceState', ds)
|
|
pm.send('pandaStates', ps)
|
|
|
|
|
|
def make_network_state_setup(pm: PubMaster, network_type) -> Callable:
|
|
def _send() -> None:
|
|
ds = messaging.new_message('deviceState')
|
|
ds.deviceState.networkType = network_type
|
|
pm.send('deviceState', ds)
|
|
return _send
|
|
|
|
|
|
def make_alert_setup(pm: PubMaster, size, text1, text2, status) -> Callable:
|
|
def _send() -> None:
|
|
send_onroad(pm)
|
|
alert = messaging.new_message('selfdriveState')
|
|
ss = alert.selfdriveState
|
|
ss.alertSize = size
|
|
ss.alertText1 = text1
|
|
ss.alertText2 = text2
|
|
ss.alertStatus = status
|
|
pm.send('selfdriveState', alert)
|
|
return _send
|
|
|
|
|
|
# --- Script builders ---
|
|
|
|
def build_mici_script(pm: PubMaster, main_layout, script: Script) -> None:
|
|
"""Build the replay script for the mici layout."""
|
|
from openpilot.system.ui.lib.application import gui_app
|
|
|
|
center = (gui_app.width // 2, gui_app.height // 2)
|
|
|
|
# TODO: Explore more
|
|
script.wait(FPS)
|
|
script.click(*center, FPS) # Open settings
|
|
script.click(*center, FPS) # Open toggles
|
|
script.end()
|
|
|
|
|
|
def build_tizi_script(pm: PubMaster, main_layout, script: Script) -> None:
|
|
"""Build the replay script for the tizi layout."""
|
|
|
|
def make_home_refresh_setup(fn: Callable) -> Callable:
|
|
"""Return setup function that calls the given function to modify state and forces an immediate refresh on the home layout."""
|
|
from openpilot.selfdrive.ui.layouts.main import MainState
|
|
|
|
def setup():
|
|
fn()
|
|
main_layout._layouts[MainState.HOME].last_refresh = 0
|
|
|
|
return setup
|
|
|
|
# TODO: Better way of organizing the events
|
|
|
|
# === Homescreen ===
|
|
script.set_send(make_network_state_setup(pm, log.DeviceState.NetworkType.wifi))
|
|
|
|
# === Offroad Alerts (auto-transitions via HomeLayout refresh) ===
|
|
script.setup(make_home_refresh_setup(setup_offroad_alerts))
|
|
|
|
# === Update Available (auto-transitions via HomeLayout refresh) ===
|
|
script.setup(make_home_refresh_setup(setup_update_available))
|
|
|
|
# === Settings - Device (click sidebar settings button) ===
|
|
script.click(150, 90)
|
|
script.click(1985, 790) # reset calibration confirmation
|
|
script.click(650, 750) # cancel
|
|
|
|
# === Settings - Network ===
|
|
script.click(278, 450)
|
|
script.click(1880, 100) # advanced network settings
|
|
script.click(630, 80) # back
|
|
|
|
# === Settings - Toggles ===
|
|
script.click(278, 600)
|
|
script.click(1200, 280) # experimental mode description
|
|
|
|
# === Settings - Software ===
|
|
script.setup(put_update_params, wait_after=0)
|
|
script.click(278, 720)
|
|
|
|
# === Settings - Firehose ===
|
|
script.click(278, 845)
|
|
|
|
# === Settings - Developer (set CarParamsPersistent first) ===
|
|
script.setup(setup_developer_params, wait_after=0)
|
|
script.click(278, 950)
|
|
script.click(2000, 960) # toggle alpha long
|
|
script.click(1500, 875) # confirm
|
|
|
|
# === Keyboard modal (SSH keys button in developer panel) ===
|
|
script.click(1930, 470) # click SSH keys
|
|
script.click(1930, 115) # click cancel on keyboard
|
|
|
|
# === Close settings ===
|
|
script.click(250, 160)
|
|
|
|
# === Onroad ===
|
|
script.set_send(lambda: send_onroad(pm))
|
|
script.click(1000, 500) # click onroad to toggle sidebar
|
|
|
|
# === Onroad alerts ===
|
|
# Small alert (normal)
|
|
script.set_send(make_alert_setup(pm, AlertSize.small, "Small Alert", "This is a small alert", AlertStatus.normal))
|
|
# Medium alert (userPrompt)
|
|
script.set_send(make_alert_setup(pm, AlertSize.mid, "Medium Alert", "This is a medium alert", AlertStatus.userPrompt))
|
|
# Full alert (critical)
|
|
script.set_send(make_alert_setup(pm, AlertSize.full, "DISENGAGE IMMEDIATELY", "Driver Distracted", AlertStatus.critical))
|
|
# Full alert multiline
|
|
script.set_send(make_alert_setup(pm, AlertSize.full, "Reverse\nGear", "", AlertStatus.normal))
|
|
# Full alert long text
|
|
script.set_send(make_alert_setup(pm, AlertSize.full, "TAKE CONTROL IMMEDIATELY", "Calibration Invalid: Remount Device & Recalibrate", AlertStatus.userPrompt))
|
|
|
|
# End
|
|
script.end()
|
|
|
|
|
|
def build_script(pm: PubMaster, main_layout, variant: LayoutVariant) -> list[ScriptEntry]:
|
|
"""Build the replay script for the appropriate layout variant and return list of script entries."""
|
|
print(f"Building {variant} replay script...")
|
|
|
|
script = Script(FPS)
|
|
builder = build_tizi_script if variant == 'tizi' else build_mici_script
|
|
builder(pm, main_layout, script)
|
|
|
|
print(f"Built replay script with {len(script.entries)} events and {script.frame} frames ({script.get_frame_time():.2f} seconds)")
|
|
|
|
return script.entries
|