mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-02 03:52:11 +08:00
alertmanager type hints (#2201)
* alertmanager types * clean that up * more cleanup * little more * space old-commit-hash: 15e4048318c2bf2e3e064c3f1e7e288edb9f902c
This commit is contained in:
@@ -1,24 +1,21 @@
|
||||
import os
|
||||
import copy
|
||||
import json
|
||||
from typing import List, Optional
|
||||
|
||||
from cereal import car, log
|
||||
from common.basedir import BASEDIR
|
||||
from common.params import Params
|
||||
from common.realtime import DT_CTRL
|
||||
from selfdrive.controls.lib.events import Alert
|
||||
from selfdrive.swaglog import cloudlog
|
||||
|
||||
AlertSize = log.ControlsState.AlertSize
|
||||
AlertStatus = log.ControlsState.AlertStatus
|
||||
VisualAlert = car.CarControl.HUDControl.VisualAlert
|
||||
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
|
||||
|
||||
|
||||
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f:
|
||||
OFFROAD_ALERTS = json.load(f)
|
||||
|
||||
|
||||
def set_offroad_alert(alert, show_alert, extra_text=None):
|
||||
def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = None) -> None:
|
||||
if show_alert:
|
||||
a = OFFROAD_ALERTS[alert]
|
||||
if extra_text is not None:
|
||||
@@ -29,19 +26,30 @@ def set_offroad_alert(alert, show_alert, extra_text=None):
|
||||
Params().delete(alert)
|
||||
|
||||
|
||||
class AlertManager():
|
||||
class AlertManager:
|
||||
|
||||
def __init__(self):
|
||||
self.activealerts = []
|
||||
self.activealerts: List[Alert] = []
|
||||
self.clear_current_alert()
|
||||
|
||||
def alert_present(self):
|
||||
def alert_present(self) -> bool:
|
||||
return len(self.activealerts) > 0
|
||||
|
||||
def add_many(self, frame, alerts, enabled=True):
|
||||
def clear_current_alert(self) -> None:
|
||||
self.alert_type: str = ""
|
||||
self.alert_text_1: str = ""
|
||||
self.alert_text_2: str = ""
|
||||
self.alert_status = log.ControlsState.AlertStatus.normal
|
||||
self.alert_size = log.ControlsState.AlertSize.none
|
||||
self.visual_alert = car.CarControl.HUDControl.VisualAlert.none
|
||||
self.audible_alert = car.CarControl.HUDControl.AudibleAlert.none
|
||||
self.alert_rate: float = 0.
|
||||
|
||||
def add_many(self, frame: int, alerts: List[Alert], enabled: bool = True) -> None:
|
||||
for a in alerts:
|
||||
self.add(frame, a, enabled=enabled)
|
||||
|
||||
def add(self, frame, alert, enabled=True):
|
||||
def add(self, frame: int, alert: Alert, enabled: bool = True) -> None:
|
||||
added_alert = copy.copy(alert)
|
||||
added_alert.start_time = frame * DT_CTRL
|
||||
|
||||
@@ -54,26 +62,18 @@ class AlertManager():
|
||||
# sort by priority first and then by start_time
|
||||
self.activealerts.sort(key=lambda k: (k.alert_priority, k.start_time), reverse=True)
|
||||
|
||||
def process_alerts(self, frame):
|
||||
def process_alerts(self, frame: int) -> None:
|
||||
cur_time = frame * DT_CTRL
|
||||
|
||||
# first get rid of all the expired alerts
|
||||
self.activealerts = [a for a in self.activealerts if a.start_time +
|
||||
max(a.duration_sound, a.duration_hud_alert, a.duration_text) > cur_time]
|
||||
|
||||
current_alert = self.activealerts[0] if self.alert_present() else None
|
||||
|
||||
# start with assuming no alerts
|
||||
self.alert_type = ""
|
||||
self.alert_text_1 = ""
|
||||
self.alert_text_2 = ""
|
||||
self.alert_status = AlertStatus.normal
|
||||
self.alert_size = AlertSize.none
|
||||
self.visual_alert = VisualAlert.none
|
||||
self.audible_alert = AudibleAlert.none
|
||||
self.alert_rate = 0.
|
||||
self.clear_current_alert()
|
||||
|
||||
if current_alert:
|
||||
current_alert = self.activealerts[0] if self.alert_present() else None
|
||||
if current_alert is not None:
|
||||
self.alert_type = current_alert.alert_type
|
||||
|
||||
if current_alert.start_time + current_alert.duration_sound > cur_time:
|
||||
|
||||
Reference in New Issue
Block a user