mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-02 20:12:07 +08:00
ui: show personality updates (#33406)
* personality-alerts * Update events.py * Update car.capnp * no AudibleAlert * shorter * no VisualAlert * one liner * smaller * normal * what am I doing wrong? * it works, but is it messy? * arg all * lil more * update that --------- Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 35da93fa9503aa8c56aba59cf42efc88ab4d882f
This commit is contained in:
@@ -117,6 +117,7 @@ struct CarEvent @0x9b1657f34caf3ad3 {
|
||||
paramsdPermanentError @119;
|
||||
actuatorsApiUnavailable @120;
|
||||
espActive @121;
|
||||
personalityChanged @122;
|
||||
|
||||
radarCanErrorDEPRECATED @15;
|
||||
communityFeatureDisallowedDEPRECATED @62;
|
||||
|
||||
@@ -655,6 +655,7 @@ class Controls:
|
||||
if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents):
|
||||
self.personality = (self.personality - 1) % 3
|
||||
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
|
||||
self.events.add(EventName.personalityChanged)
|
||||
|
||||
return CC, lac_log
|
||||
|
||||
@@ -714,7 +715,8 @@ class Controls:
|
||||
if self.enabled:
|
||||
clear_event_types.add(ET.NO_ENTRY)
|
||||
|
||||
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer])
|
||||
pers = {v: k for k, v in log.LongitudinalPersonality.schema.enumerants.items()}[self.personality]
|
||||
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer, pers])
|
||||
self.AM.add_many(self.sm.frame, alerts)
|
||||
current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)
|
||||
if current_alert:
|
||||
|
||||
@@ -205,35 +205,35 @@ def get_display_speed(speed_ms: float, metric: bool) -> str:
|
||||
|
||||
# ********** alert callback functions **********
|
||||
|
||||
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int], Alert]
|
||||
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int, log.ControlsState], Alert]
|
||||
|
||||
|
||||
def soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
|
||||
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
if soft_disable_time < int(0.5 / DT_CTRL):
|
||||
return ImmediateDisableAlert(alert_text_2)
|
||||
return SoftDisableAlert(alert_text_2)
|
||||
return func
|
||||
|
||||
def user_soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
|
||||
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
if soft_disable_time < int(0.5 / DT_CTRL):
|
||||
return ImmediateDisableAlert(alert_text_2)
|
||||
return UserSoftDisableAlert(alert_text_2)
|
||||
return func
|
||||
|
||||
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
branch = get_short_branch() # Ensure get_short_branch is cached to avoid lags on startup
|
||||
if "REPLAY" in os.environ:
|
||||
branch = "replay"
|
||||
|
||||
return StartupAlert("WARNING: This branch is not tested", branch, alert_status=AlertStatus.userPrompt)
|
||||
|
||||
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
return NoEntryAlert(f"Drive above {get_display_speed(CP.minEnableSpeed, metric)} to engage")
|
||||
|
||||
|
||||
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
return Alert(
|
||||
f"Steer Unavailable Below {get_display_speed(CP.minSteerSpeed, metric)}",
|
||||
"",
|
||||
@@ -241,7 +241,7 @@ def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.S
|
||||
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.prompt, 0.4)
|
||||
|
||||
|
||||
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
first_word = 'Recalibration' if sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.recalibrating else 'Calibration'
|
||||
return Alert(
|
||||
f"{first_word} in Progress: {sm['liveCalibration'].calPerc:.0f}%",
|
||||
@@ -252,37 +252,37 @@ def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messag
|
||||
|
||||
# *** debug alerts ***
|
||||
|
||||
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
full_perc = round(100. - sm['deviceState'].freeSpacePercent)
|
||||
return NormalPermanentAlert("Out of Storage", f"{full_perc}% full")
|
||||
|
||||
|
||||
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
mdl = sm['modelV2'].velocity.x[0] if len(sm['modelV2'].velocity.x) else math.nan
|
||||
err = CS.vEgo - mdl
|
||||
msg = f"Speed Error: {err:.1f} m/s"
|
||||
return NoEntryAlert(msg, alert_text_1="Posenet Speed Invalid")
|
||||
|
||||
|
||||
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning]
|
||||
msg = ', '.join(not_running)
|
||||
return NoEntryAlert(msg, alert_text_1="Process Not Running")
|
||||
|
||||
|
||||
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
bs = [s for s in sm.data.keys() if not sm.all_checks([s, ])]
|
||||
msg = ', '.join(bs[:4]) # can't fit too many on one line
|
||||
return NoEntryAlert(msg, alert_text_1="Communication Issue Between Processes")
|
||||
|
||||
|
||||
def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
all_cams = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')
|
||||
bad_cams = [s.replace('State', '') for s in all_cams if s in sm.data.keys() and not sm.all_checks([s, ])]
|
||||
return NormalPermanentAlert("Camera Malfunction", ', '.join(bad_cams))
|
||||
|
||||
|
||||
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
rpy = sm['liveCalibration'].rpyCalib
|
||||
yaw = math.degrees(rpy[2] if len(rpy) == 3 else math.nan)
|
||||
pitch = math.degrees(rpy[1] if len(rpy) == 3 else math.nan)
|
||||
@@ -290,39 +290,43 @@ def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging
|
||||
return NormalPermanentAlert("Calibration Invalid", angles)
|
||||
|
||||
|
||||
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
cpu = max(sm['deviceState'].cpuTempC, default=0.)
|
||||
gpu = max(sm['deviceState'].gpuTempC, default=0.)
|
||||
temp = max((cpu, gpu, sm['deviceState'].memoryTempC))
|
||||
return NormalPermanentAlert("System Overheated", f"{temp:.0f} °C")
|
||||
|
||||
|
||||
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
return NormalPermanentAlert("Low Memory", f"{sm['deviceState'].memoryUsagePercent}% used")
|
||||
|
||||
|
||||
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
x = max(sm['deviceState'].cpuUsagePercent, default=0.)
|
||||
return NormalPermanentAlert("High CPU Usage", f"{x}% used")
|
||||
|
||||
|
||||
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
return NormalPermanentAlert("Driving Model Lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped")
|
||||
|
||||
|
||||
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
text = "Enable Adaptive Cruise to Engage"
|
||||
if CP.carName == "honda":
|
||||
text = "Enable Main Switch to Engage"
|
||||
return NoEntryAlert(text)
|
||||
|
||||
|
||||
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
|
||||
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
axes = sm['testJoystick'].axes
|
||||
gb, steer = list(axes)[:2] if len(axes) else (0., 0.)
|
||||
vals = f"Gas: {round(gb * 100.)}%, Steer: {round(steer * 100.)}%"
|
||||
return NormalPermanentAlert("Joystick Mode", vals)
|
||||
|
||||
def personality_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
|
||||
personality = str(personality).title()
|
||||
return NormalPermanentAlert(f"Driving Personality: {personality}", duration=1.5)
|
||||
|
||||
|
||||
|
||||
EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
|
||||
@@ -955,6 +959,10 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
|
||||
ET.NO_ENTRY: NoEntryAlert("Vehicle Sensors Calibrating"),
|
||||
},
|
||||
|
||||
EventName.personalityChanged: {
|
||||
ET.WARNING: personality_changed_alert,
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -973,7 +981,7 @@ if __name__ == '__main__':
|
||||
for i, alerts in EVENTS.items():
|
||||
for et, alert in alerts.items():
|
||||
if callable(alert):
|
||||
alert = alert(CP, CS, sm, False, 1)
|
||||
alert = alert(CP, CS, sm, False, 1, log.LongitudinalPersonality.standard)
|
||||
alerts_by_type[et][alert.priority].append(event_names[i])
|
||||
|
||||
all_alerts: dict[str, list[tuple[Priority, list[str]]]] = {}
|
||||
|
||||
Reference in New Issue
Block a user