import pyray as rl import numpy as np import time import threading from collections.abc import Callable from enum import Enum from cereal import messaging, car, log from openpilot.common.filter_simple import FirstOrderFilter from openpilot.common.params import Params from openpilot.common.realtime import drop_realtime from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.ui.lib.prime_state import PrimeState from openpilot.system.ui.lib.application import gui_app from openpilot.system.hardware import HARDWARE, PC BACKLIGHT_OFFROAD = 65 if HARDWARE.get_device_type() == "mici" else 50 PARAM_UPDATE_TIME = 1 / 5.0 class UIStatus(Enum): DISENGAGED = "disengaged" ENGAGED = "engaged" OVERRIDE = "override" ALKA = "alka" class UIState: _instance: 'UIState | None' = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): self.params = Params() self.sm = messaging.SubMaster( [ "modelV2", "controlsState", "onroadEvents", "liveCalibration", "radarState", "deviceState", "pandaStates", "carParams", "driverMonitoringState", "carState", "driverStateV2", "roadCameraState", "wideRoadCameraState", "managerState", "selfdriveState", "longitudinalPlan", "gpsLocationExternal", "carOutput", "carControl", "liveParameters", "testJoystick", "rawAudioData", "controlsStateExt", "liveTracks", # dp - for dp_ui_lead ] ) self.prime_state = PrimeState() # UI Status tracking self.status: UIStatus = UIStatus.DISENGAGED self.started_frame: int = 0 self.started_time: float = 0.0 self._engaged_prev: bool = False self._started_prev: bool = False # Core state variables self.is_metric: bool = self.params.get_bool("IsMetric") self.is_release = self.params.get_bool("IsReleaseBranch") self.always_on_dm: bool = self.params.get_bool("AlwaysOnDM") self.experimental_mode: bool = self.params.get_bool("ExperimentalMode") self.usbgpu: bool = self.params.get_bool("UsbGpuPresent") self.usbgpu_compiled: bool = self.params.get_bool("UsbGpuCompiled") self.started: bool = False self.ignition: bool = False self.recording_audio: bool = False self.panda_type: log.PandaState.PandaType = log.PandaState.PandaType.unknown self.personality: log.LongitudinalPersonality = log.LongitudinalPersonality.standard self.has_longitudinal_control: bool = False self.is_body: bool | None = None self.CP: car.CarParams | None = None self.light_sensor: float = -1.0 self._params_thread: threading.Thread | None = None # Callbacks self._offroad_transition_callbacks: list[Callable[[], None]] = [] self._engaged_transition_callbacks: list[Callable[[], None]] = [] self._on_body_changed_callbacks: list[Callable[[], None]] = [] # dp - ALKA self.dp_alka_active: bool = False # dp self.dp_ui_display_mode = 0 # dp self.dp_ui_hide_hud_speed_ms: float = float(int(self.params.get("dp_ui_hide_hud_speed_kph") or 0) * 0.278) # dp self.dp_ui_rainbow = self.params.get_bool("dp_ui_rainbow") # dp self.dp_ui_lead = int(self.params.get("dp_ui_lead") or 0) # dp self.dp_dev_disable_connect = self.params.get_bool("dp_dev_disable_connect") def add_offroad_transition_callback(self, callback: Callable[[], None]): self._offroad_transition_callbacks.append(callback) def add_engaged_transition_callback(self, callback: Callable[[], None]): self._engaged_transition_callbacks.append(callback) def add_on_body_changed_callbacks(self, callback: Callable[[], None]): self._on_body_changed_callbacks.append(callback) @property def engaged(self) -> bool: return self.started and self.sm["selfdriveState"].enabled def is_onroad(self) -> bool: return self.started def is_offroad(self) -> bool: return not self.started def update(self) -> None: self.prime_state.start() # start thread after manager forks ui if self._params_thread is None: self._params_thread = threading.Thread(target=self._params_refresh_worker, daemon=True) self._params_thread.start() self.sm.update(0) self._update_state() self._update_status() device.update() def _params_refresh_worker(self): drop_realtime() while True: self.update_params() time.sleep(PARAM_UPDATE_TIME) def _update_state(self) -> None: # Handle panda states updates if self.sm.updated["pandaStates"]: panda_states = self.sm["pandaStates"] if len(panda_states) > 0: # Get panda type from first panda self.panda_type = panda_states[0].pandaType # Check ignition status across all pandas if self.panda_type != log.PandaState.PandaType.unknown: self.ignition = any(state.ignitionLine or state.ignitionCan for state in panda_states) elif self.sm.frame - self.sm.recv_frame["pandaStates"] > 5 * rl.get_fps(): self.panda_type = log.PandaState.PandaType.unknown # Handle wide road camera state updates if self.sm.updated["wideRoadCameraState"]: cam_state = self.sm["wideRoadCameraState"] # rick - for c3: Scale factor based on sensor type scale = 6.0 if cam_state.sensor == 'ar0231' else 1.0 self.light_sensor = max(100.0 - scale * cam_state.exposureValPercent, 0.0) elif not self.sm.alive["wideRoadCameraState"] or not self.sm.valid["wideRoadCameraState"]: self.light_sensor = -1 # Update started state self.started = self.sm["deviceState"].started and self.ignition # Update body state if self.CP is not None and self.is_body != self.CP.notCar: self.is_body = self.CP.notCar for callback in self._on_body_changed_callbacks: callback() # dp - ALKA if self.sm.updated["controlsStateExt"]: self.dp_alka_active = self.sm["controlsStateExt"].alkaActive # dp self.dp_ui_display_mode = int(self.params.get("dp_ui_display_mode") or 0) self.dp_ui_display_mode_cruise_available = False self.dp_ui_display_mode_cruise_enabled = False def _update_status(self) -> None: if self.started and self.sm.updated["selfdriveState"]: ss = self.sm["selfdriveState"] state = ss.state if state in (log.SelfdriveState.OpenpilotState.preEnabled, log.SelfdriveState.OpenpilotState.overriding): self.status = UIStatus.OVERRIDE else: self.status = UIStatus.ENGAGED if ss.enabled else UIStatus.DISENGAGED # Check for engagement state changes if self.engaged != self._engaged_prev: for callback in self._engaged_transition_callbacks: callback() self._engaged_prev = self.engaged # Handle onroad/offroad transition if self.started != self._started_prev or self.sm.frame == 1: if self.started: self.status = UIStatus.DISENGAGED self.started_frame = self.sm.frame self.started_time = time.monotonic() for callback in self._offroad_transition_callbacks: callback() self._started_prev = self.started # dp if self.sm.updated["carState"]: self.dp_ui_display_mode_cruise_available = self.sm["carState"].cruiseState.available self.dp_ui_display_mode_cruise_enabled = self.sm["carState"].cruiseState.enabled def update_params(self) -> None: # For slower operations # Update longitudinal control state CP_bytes = self.params.get("CarParamsPersistent") if CP_bytes is not None: self.CP = messaging.log_from_bytes(CP_bytes, car.CarParams) if self.CP.alphaLongitudinalAvailable: self.has_longitudinal_control = self.params.get_bool("AlphaLongitudinalEnabled") else: self.has_longitudinal_control = self.CP.openpilotLongitudinalControl self.recording_audio = self.params.get_bool("RecordAudio") and self.started self.is_metric = self.params.get_bool("IsMetric") self.always_on_dm = self.params.get_bool("AlwaysOnDM") self.experimental_mode = self.params.get_bool("ExperimentalMode") self.usbgpu = self.params.get_bool("UsbGpuPresent") self.usbgpu_compiled = self.params.get_bool("UsbGpuCompiled") class Device: def __init__(self): self._ignition = False self._interaction_time: float = -1 self._override_interactive_timeout: int | None = None self._interactive_timeout_callbacks: list[Callable] = [] self._prev_timed_out = False self._awake: bool = True self._offroad_brightness: int = BACKLIGHT_OFFROAD self._last_brightness: int = 0 self._brightness_filter = FirstOrderFilter(BACKLIGHT_OFFROAD, 10.00, 1 / gui_app.target_fps) self._brightness_thread: threading.Thread | None = None self._brightness_event = threading.Event() self._brightness_target: int = 0 @property def awake(self) -> bool: return self._awake def set_override_interactive_timeout(self, timeout: int | None) -> None: # Override the interactive timeout duration temporarily self._override_interactive_timeout = timeout self._reset_interactive_timeout() @property def interactive_timeout(self) -> int: if self._override_interactive_timeout is not None: return self._override_interactive_timeout ignition_timeout = 30 if gui_app.big_ui() else 5 return ignition_timeout if ui_state.ignition else 60 def _reset_interactive_timeout(self) -> None: self._interaction_time = time.monotonic() + self.interactive_timeout def add_interactive_timeout_callback(self, callback: Callable): self._interactive_timeout_callbacks.append(callback) def update(self): self._start_brightness_thread() # start thread after manager forks ui # do initial reset if self._interaction_time <= 0: self._reset_interactive_timeout() self._update_brightness() self._update_wakefulness() def _start_brightness_thread(self): if self._brightness_thread is None or not self._brightness_thread.is_alive(): self._brightness_thread = threading.Thread(target=self._brightness_worker, daemon=True) self._brightness_thread.start() def _brightness_worker(self): drop_realtime() while True: self._brightness_event.wait() self._brightness_event.clear() HARDWARE.set_screen_brightness(self._brightness_target) def set_offroad_brightness(self, brightness: int | None): if brightness is None: brightness = BACKLIGHT_OFFROAD self._offroad_brightness = min(max(brightness, 0), 100) def _update_brightness(self): clipped_brightness = self._offroad_brightness if ui_state.started and ui_state.light_sensor >= 0: clipped_brightness = ui_state.light_sensor # CIE 1931 - https://www.photonstophotos.net/GeneralTopics/Exposure/Psychometric_Lightness_and_Gamma.htm if clipped_brightness <= 8: clipped_brightness = clipped_brightness / 903.3 else: clipped_brightness = ((clipped_brightness + 16.0) / 116.0) ** 3.0 clipped_brightness = float(np.interp(clipped_brightness, [0, 1], [30, 100])) brightness = round(self._brightness_filter.update(clipped_brightness)) if not self._awake: brightness = 0 if brightness != self._last_brightness: self._brightness_target = brightness self._brightness_event.set() self._last_brightness = brightness # // Display Mode # // 0 Std. - Stock behavior. # // 1 MAIN+ - ACC MAIN on = Display ON # // 2 OP+ - OP enabled = Display ON # // 3 MAIN- - ACC MAIN on = Display OFF # // 4 OP- - OP enabled = Display OFF def _ignition_state_ovrride(self, ignition): # 0 stock behaviour or ignition is off if ui_state.dp_ui_display_mode == 0 or not ignition: return ignition # 1 MAIN+ - ACC MAIN on = Display ON if ui_state.dp_ui_display_mode == 1: if ui_state.dp_ui_display_mode_cruise_available: return True else: return False # 2 OP+ - OP enabled = Display ON if ui_state.dp_ui_display_mode == 2: if ui_state.dp_ui_display_mode_cruise_enabled: return True else: return False # 3 MAIN- - ACC MAIN on = Display OFF if ui_state.dp_ui_display_mode == 3: if ui_state.dp_ui_display_mode_cruise_available: return False else: return True # 4 OP- - OP enabled = Display OFF if ui_state.dp_ui_display_mode == 4: if ui_state.dp_ui_display_mode_cruise_enabled: return False else: return True # oops return ignition def _update_wakefulness(self): # Handle interactive timeout ignition_just_turned_off = not ui_state.ignition and self._ignition self._ignition = ui_state.ignition if ignition_just_turned_off or any(ev.left_down for ev in gui_app.mouse_events): self._reset_interactive_timeout() interaction_timeout = time.monotonic() > self._interaction_time if interaction_timeout and not self._prev_timed_out: for callback in self._interactive_timeout_callbacks: callback() self._prev_timed_out = interaction_timeout ignition = self._ignition_state_ovrride(ui_state.ignition) self._set_awake(ignition or not interaction_timeout or PC) def _set_awake(self, on: bool): if on != self._awake: self._awake = on cloudlog.debug(f"setting display power {int(on)}") HARDWARE.set_display_power(on) gui_app.set_should_render(on) # Global instance ui_state = UIState() device = Device()