feat: Squash all min-features into full

This commit is contained in:
Rick Lan
2026-06-11 20:00:23 +08:00
parent ce6611af1f
commit d328bb844d
447 changed files with 108504 additions and 163 deletions
+3
View File
@@ -88,3 +88,6 @@ build/
.context/
PLAN.md
TASK.md
# rick - keep panda_tici standalone
panda_tici/
+140
View File
@@ -0,0 +1,140 @@
# ALKA (Always-on Lane Keeping Assist) Design v3
## Overview
ALKA enables lateral control (steering) when ACC Main is ON, without requiring cruise to be engaged. This allows lane keeping assist to function independently of longitudinal control.
**Simplified Behavior (v3):**
- All brands use direct tracking: `lkas_on = acc_main_on`
- No button/toggle tracking (removed TJA, LKAS button, LKAS HUD)
- ACC Main ON = ALKA enabled, ACC Main OFF = ALKA disabled
---
## Per-Brand Summary
| Brand | Status | ACC Main Source | Notes |
|-------|--------|-----------------|-------|
| Body | Disabled | - | No steering capability |
| Chrysler | Disabled | - | Needs special handling |
| Ford | Enabled | EngBrakeData (0x165) CcStat | |
| GM | Disabled | - | No ACC Main signal |
| Honda Nidec | Enabled | SCM_FEEDBACK (0x326) MAIN_ON | |
| Honda Bosch | Enabled | SCM_FEEDBACK (0x326) MAIN_ON | |
| Hyundai | Enabled | SCC11 (0x420) bit 0 | |
| Hyundai CAN-FD | Enabled | SCC_CONTROL (0x1A0) bit 66 | |
| Hyundai Legacy | Enabled | SCC11 (0x420) bit 0 | |
| Mazda | Enabled | CRZ_CTRL (0x21C) bit 17 | |
| Nissan | Enabled | CRUISE_THROTTLE (0x239) bit 17 | |
| PSA | Disabled | - | Not implemented |
| Rivian | Disabled | - | Different architecture |
| Subaru | Enabled | CruiseControl (0x240) bit 40 | |
| Subaru Preglobal | Enabled | CruiseControl (0x144) bit 48 | |
| Tesla | Disabled | - | Different architecture |
| Toyota | Enabled | PCM_CRUISE_2 (0x1D3) bit 15 | |
| Toyota (UNSUPPORTED_DSU) | Enabled | DSU_CRUISE (0x365) bit 0 | |
| VW MQB | Enabled | TSK_06 TSK_Status (>=2) | |
| VW PQ | Enabled | Motor_5 (0x480) bit 50 (long) | |
---
## Permission Model
Lateral control requires checks at both layers. Normal path uses `controls_allowed`, ALKA path uses additional checks.
| Check | Panda | openpilot | Notes |
|-------|:-----:|:---------:|-------|
| **Normal Path** |
| `controls_allowed` (cruise engaged) | ✓ | ✓ | Either this OR ALKA path |
| **ALKA Path** |
| `alka_allowed` (brand supports) | ✓ | ✓ | Set per brand in safety init |
| `ALT_EXP_ALKA` (user enabled) | ✓ | ✓ | alternativeExperience flag |
| `lkas_on` (ACC Main ON) | ✓ | ✓ | Tracked via CAN messages |
| `vehicle_moving` / `!standstill` | ✓ | ✓ | |
| **openpilot Additional** |
| `gear_ok` (not P/N/R) | ✗ | ✓ | Python layer only |
| `calibrated` | ✗ | ✓ | Python layer only |
| `seatbelt latched` | ✗ | ✓ | Python layer only |
| `doors closed` | ✗ | ✓ | Python layer only |
| `!steerFaultTemporary` | ✗ | ✓ | Python layer only |
| `!steerFaultPermanent` | ✗ | ✓ | Python layer only |
---
## Data Flow
```
┌─────────────────────────────────────────────────────────────────────┐
│ CAN Bus │
└─────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ Safety Layer (panda C code) │ │ Python Layer │
│ │ │ │
│ rx_hook: │ │ carstate.py: │
│ - Parse ACC Main signal │ │ - Parse cruiseState.available │
│ - Set lkas_on = acc_main_on │ │ - Set self.lkas_on │
│ │ │ │
│ lat_control_allowed(): │ └─────────────┬───────────────────┘
│ - Check lkas_on + other flags │ │
│ - Gate steering commands │ ▼
└─────────────────────────────────┘ ┌─────────────────────────────────┐
│ card.py: │
│ - Publish carStateExt.lkasOn │
└─────────────┬───────────────────┘
┌─────────────────────────────────┐
│ controlsd.py: │
│ - Read carStateExt.lkasOn │
│ - Check ALKA conditions │
│ - Set CC.latActive │
└─────────────────────────────────┘
```
### Key Files
| File | Purpose |
|------|---------|
| `custom.capnp` | Defines `CarStateExt` struct with `lkasOn` field |
| `log.capnp` | Includes `carStateExt` in event union |
| `interfaces.py` | Defines `self.lkas_on = False` default in `CarStateBase` |
| `carstate.py` (per brand) | Tracks `lkas_on` based on ACC Main |
| `card.py` | Publishes `carStateExt.lkasOn` from `CI.CS.lkas_on` |
| `controlsd.py` | Reads `carStateExt.lkasOn` to determine `alka_active` |
---
## ACC Main Tracking
All brands use simple direct tracking:
```c
// Panda (C code)
if (alka_allowed && (alternative_experience & ALT_EXP_ALKA)) {
lkas_on = acc_main_on; // or GET_BIT(msg, bit_position)
}
```
```python
# Python carstate.py
self.lkas_on = ret.cruiseState.available
```
This guard ensures:
1. Brand supports ALKA (`alka_allowed`)
2. User enabled ALKA (`ALT_EXP_ALKA`)
Without both conditions, no ACC Main tracking occurs, and ALKA remains disabled.
---
## Testing
Safety tests verify:
- `alka_allowed` flag set correctly per brand
- ACC Main tracking updates `lkas_on` directly
- `lat_control_allowed()` returns true only when all conditions met
- Steering TX blocked when ALKA conditions not met
- Bus routing variants (camera_scc, unsupported_dsu)
+3 -1
View File
@@ -120,7 +120,7 @@ env = Environment(
LIBPATH=[
"#common",
"#msgq_repo",
"#selfdrive/pandad",
"#selfdrive/pandad_tici" if "TICI_DOS" in os.environ else "#selfdrive/pandad",
"#rednose/helpers",
[x.LIB_DIR for x in pkgs],
],
@@ -237,6 +237,7 @@ Export('messaging')
# Build other submodules
SConscript(['panda/SConscript'])
SConscript(['panda_tici/SConscript'])
# Build rednose library
SConscript(['rednose/SConscript'])
@@ -252,6 +253,7 @@ if arch == "larch64":
# Build selfdrive
SConscript([
'selfdrive/pandad/SConscript',
'selfdrive/pandad_tici/SConscript',
'selfdrive/controls/lib/lateral_mpc_lib/SConscript',
'selfdrive/controls/lib/longitudinal_mpc_lib/SConscript',
'selfdrive/locationd/SConscript',
+12 -4
View File
@@ -10,16 +10,24 @@ $Cxx.namespace("cereal");
# DO rename the structs
# DON'T change the identifier (e.g. @0x81c2f05a394cf4af)
struct CustomReserved0 @0x81c2f05a394cf4af {
struct ControlsStateExt @0x81c2f05a394cf4af {
alkaActive @0 :Bool;
}
struct CustomReserved1 @0xaedffd8f31e7b55d {
struct CarStateExt @0xaedffd8f31e7b55d {
# dp - ALKA: lkasOn state from carstate (mirrors panda's lkas_on)
lkasOn @0 :Bool;
}
struct CustomReserved2 @0xf35cc4560bbf6ec2 {
struct ModelExt @0xf35cc4560bbf6ec2 {
leftEdgeDetected @0 :Bool;
rightEdgeDetected @1 :Bool;
}
struct CustomReserved3 @0xda96579883444c35 {
struct DashyState @0xda96579883444c35 {
# Pre-serialized JSON bytes for dashy UI
# Aggregates all topics needed by dashy into single message
json @0 :Data;
}
struct CustomReserved4 @0x80ae746ee2596b11 {
+4 -4
View File
@@ -2548,10 +2548,10 @@ struct Event {
# DO change the name of the field and struct
# DON'T change the ID (e.g. @107)
# DON'T change which struct it points to
customReserved0 @107 :Custom.CustomReserved0;
customReserved1 @108 :Custom.CustomReserved1;
customReserved2 @109 :Custom.CustomReserved2;
customReserved3 @110 :Custom.CustomReserved3;
controlsStateExt @107 :Custom.ControlsStateExt;
carStateExt @108 :Custom.CarStateExt;
modelExt @109 :Custom.ModelExt;
dashyState @110 :Custom.DashyState;
customReserved4 @111 :Custom.CustomReserved4;
customReserved5 @112 :Custom.CustomReserved5;
customReserved6 @113 :Custom.CustomReserved6;
+4
View File
@@ -93,6 +93,10 @@ _services: dict[str, tuple] = {
"livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM),
"livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM),
"customReservedRawData0": (True, 0.),
"controlsStateExt": (True, 100.),
"carStateExt": (True, 100.),
"modelExt": (True, 20.),
"dashyState": (True, 0.), # Aggregated dashy UI state (optional)
}
SERVICE_LIST = {name: Service(*vals) for
idx, (name, vals) in enumerate(_services.items())}
+1
View File
@@ -132,4 +132,5 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"Version", {PERSISTENT, STRING}},
{"dp_dev_last_log", {CLEAR_ON_ONROAD_TRANSITION, STRING}},
{"dp_dev_reset_conf", {CLEAR_ON_MANAGER_START, BOOL, "0"}},
{"dp_dev_go_off_road", {CLEAR_ON_MANAGER_START, BOOL, "0"}},
};
+543
View File
@@ -0,0 +1,543 @@
#!/usr/bin/env python3
"""
Copyright (c) 2026, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Dashy State Aggregation Daemon
Aggregates all cereal topics needed by dashy UI into a single dashyState message.
serverd then forwards that one message over WebSocket, avoiding per-topic
serialization for every connected client.
All display formatting (units, distances, times) is done here so the frontend
can be a pure display layer with no conversion logic.
Publishes: dashyState (pre-serialized JSON at 15Hz)
"""
import json
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.common.realtime import Ratekeeper
from openpilot.common.swaglog import cloudlog
from opendbc.car.common.conversions import Conversions
# Main loop rate
LOOP_RATE = 15 # Hz
# Downsample factor for modelV2 arrays (33 points -> 17 points)
DOWNSAMPLE_FACTOR = 2
# Unit conversion constants
M_TO_FT = 3.28084
# Global state (refreshed periodically)
_is_metric = True
_params = None
_car_params_cache = None
def _ensure_params():
"""Ensure Params instance exists."""
global _params
if _params is None:
_params = Params()
return _params
def refresh_metric_preference():
"""Refresh metric preference from params (called periodically)."""
global _is_metric
try:
_is_metric = _ensure_params().get_bool("IsMetric")
except Exception:
_is_metric = True
def get_car_params_from_params():
"""Read carParams from Params storage (for immediate availability at startup)."""
global _car_params_cache
if _car_params_cache is not None:
return _car_params_cache
try:
from cereal import car
cp_bytes = _ensure_params().get("CarParams")
if cp_bytes:
with car.CarParams.from_bytes(cp_bytes) as cp:
_car_params_cache = {
'openpilotLongitudinalControl': bool(cp.openpilotLongitudinalControl),
}
return _car_params_cache
except Exception:
pass
return {'openpilotLongitudinalControl': False}
def format_speed(speed_ms: float) -> str:
"""Format speed for display (m/s -> km/h or mph)."""
if _is_metric:
return f"{max(0, speed_ms * Conversions.MS_TO_KPH):.0f}"
return f"{max(0, speed_ms * Conversions.MS_TO_MPH):.0f}"
def get_speed_unit() -> str:
"""Get current speed unit string."""
return "km/h" if _is_metric else "mph"
def get_distance_unit() -> str:
"""Get current distance unit string."""
return "km" if _is_metric else "mi"
SET_SPEED_NA = 255
def get_cruise_speed(v_cruise_cluster: float) -> int:
"""Get cruise speed value for display.
Returns the set speed in display units (km/h or mph), or 255 if not set.
"""
if not (0 < v_cruise_cluster < SET_SPEED_NA):
return SET_SPEED_NA
set_speed = v_cruise_cluster
if not _is_metric:
set_speed *= Conversions.KPH_TO_MPH
return round(set_speed)
def downsample(arr):
"""Downsample list by factor."""
if not arr:
return []
return list(arr[::DOWNSAMPLE_FACTOR])
def safe_get(obj, attr, default=None):
"""Safely get attribute from object."""
try:
return getattr(obj, attr, default)
except Exception:
return default
def extract_car_state(sm):
"""Extract carState fields used by dashy."""
cs = sm['carState']
v_ego = float(cs.vEgo)
v_ego_cluster = float(cs.vEgoCluster)
# Set speed: prefer carState.vCruiseCluster, fall back to carState.vCruise
# if the cluster value isn't populated. Both are live on carState in 0.11.1;
# the old controlsState set-speed fields moved into its deprecated group.
v_cruise = float(cs.vCruiseCluster)
if not (0 < v_cruise < SET_SPEED_NA):
v_cruise = float(cs.vCruise)
set_speed = get_cruise_speed(v_cruise)
return {
'vEgo': v_ego,
'vEgoCluster': v_ego_cluster,
'gearShifter': str(cs.gearShifter),
'aEgo': float(cs.aEgo),
'steeringAngleDeg': float(cs.steeringAngleDeg),
'steeringPressed': bool(cs.steeringPressed),
'gasPressed': bool(cs.gasPressed),
'leftBlinker': bool(cs.leftBlinker),
'rightBlinker': bool(cs.rightBlinker),
'leftBlindspot': bool(cs.leftBlindspot),
'rightBlindspot': bool(cs.rightBlindspot),
'cruiseEnabled': bool(cs.cruiseState.enabled),
# Pre-formatted display values
'speedDisplay': format_speed(v_ego),
'speedClusterDisplay': format_speed(v_ego_cluster) if v_ego_cluster > 0 else format_speed(v_ego),
'setSpeed': set_speed, # 255 = not set, otherwise display value
'speedUnit': get_speed_unit(),
}
def extract_selfdrive_state(sm):
"""Extract selfdriveState fields used by dashy."""
ss = sm['selfdriveState']
return {
'enabled': bool(ss.enabled),
'activeOverride': int(safe_get(ss, 'activeOverride', 0)),
'experimentalMode': bool(ss.experimentalMode),
'alertText1': str(ss.alertText1),
'alertText2': str(ss.alertText2),
'alertSize': str(ss.alertSize),
'alertStatus': str(ss.alertStatus),
}
def extract_device_state(sm):
"""Extract deviceState fields used by dashy."""
ds = sm['deviceState']
temp_c = float(safe_get(ds, 'maxTempC', 0))
# Pre-format temperature for display
if _is_metric:
temp_display = f"{temp_c:.0f}°" if temp_c > 0 else "--"
else:
temp_f = temp_c * 9 / 5 + 32
temp_display = f"{temp_f:.0f}°" if temp_c > 0 else "--"
return {
'cpuUsagePercent': list(ds.cpuUsagePercent) if ds.cpuUsagePercent else [],
'gpuUsagePercent': int(ds.gpuUsagePercent),
'memoryUsagePercent': int(ds.memoryUsagePercent),
'freeSpacePercent': float(ds.freeSpacePercent),
'maxTempC': temp_c,
'thermalStatus': str(ds.thermalStatus), # 'green' | 'yellow' | 'red' | 'danger'
'fanSpeedPercentDesired': int(ds.fanSpeedPercentDesired),
'powerDrawW': float(safe_get(ds, 'powerDrawW', 0)),
'deviceType': str(ds.deviceType),
'tempDisplay': temp_display,
}
def extract_lead(lead, sm):
"""Extract lead vehicle data."""
d_rel = float(lead.dRel)
v_rel = float(lead.vRel)
y_rel = float(lead.yRel)
has_lead = bool(lead.status)
# Pre-format lead display values. Each metric ships as a
# (value, unit) pair so the HUD can tabular-align numbers without
# regex-parsing on the JS side.
dist_value = "--"
dist_unit = ""
speed_value = "--"
speed_unit_str = ""
ttc_value = ""
ttc_unit = "s"
ttc_urgent = False
if has_lead:
speed_unit_str = "km/h" if _is_metric else "mph"
dist_unit = "m" if _is_metric else "ft"
conv = Conversions.MS_TO_KPH if _is_metric else Conversions.MS_TO_MPH
dist_value = f"{d_rel:.1f}" if _is_metric else f"{d_rel * M_TO_FT:.1f}"
v_ego = float(sm['carState'].vEgo) if sm.valid['carState'] else 0
# Lead's absolute speed = ego + relative (clamped to 0).
lead_speed_disp = max(0.0, v_ego + v_rel) * conv
speed_value = f"{lead_speed_disp:.1f}"
if v_ego > 0:
ttc = d_rel / v_ego
if ttc < 5.0:
ttc_value = f"{ttc:.1f}"
ttc_urgent = True
return {
'status': has_lead,
'dRel': d_rel,
'yRel': y_rel,
'vRel': v_rel,
'distValue': dist_value,
'distUnit': dist_unit,
'speedValue': speed_value,
'speedUnit': speed_unit_str,
'ttcValue': ttc_value,
'ttcUnit': ttc_unit,
'ttcUrgent': ttc_urgent,
}
def extract_radar_state(sm):
"""Extract radarState fields used by dashy."""
rs = sm['radarState']
return {
'leadOne': extract_lead(rs.leadOne, sm),
'leadTwo': extract_lead(rs.leadTwo, sm),
}
def extract_live_tracks(sm):
"""Extract liveTracks radar points for bird's eye view.
Filters out tracks that are already shown as leadOne or leadTwo.
Uses radarTrackId matching: when radarState matches a liveTrack to a lead,
radarTrackId changes from -1 (vision-only) to the track's ID.
"""
try:
lt = sm['liveTracks']
points = []
# Get lead vehicle radar track IDs to filter them out
# radarTrackId = -1 means vision-only (no radar match)
# radarTrackId >= 0 means matched to a radar track
lead_track_ids = set()
if sm.valid.get('radarState', False):
rs = sm['radarState']
if rs.leadOne.status and rs.leadOne.radarTrackId >= 0:
lead_track_ids.add(rs.leadOne.radarTrackId)
if rs.leadTwo.status and rs.leadTwo.radarTrackId >= 0:
lead_track_ids.add(rs.leadTwo.radarTrackId)
if hasattr(lt, 'points'):
for pt in lt.points:
# Skip if this track is already shown as a lead vehicle
if pt.trackId in lead_track_ids:
continue
# Drop stale tracks — radar's predicting, not measuring
if not pt.measured:
continue
# Drop stationary clutter (sign posts, guardrails,
# parked cars). |vRel| < 0.5 m/s ≈ standing still
# relative to ego; not relevant traffic.
if abs(pt.vRel) < 0.5:
continue
points.append({
'd': float(pt.dRel),
'y': float(pt.yRel),
'v': float(pt.vRel),
'm': bool(pt.measured),
})
return {'points': points}
except Exception as e:
cloudlog.warning(f"extract_live_tracks error: {e}")
return {'points': []}
def extract_model_v2(sm):
"""Extract modelV2 fields used by dashy (downsampled)."""
model = sm['modelV2']
# Position
pos = model.position
position = {
'x': downsample(list(pos.x)),
'y': downsample(list(pos.y)),
'z': downsample(list(pos.z)),
}
# Lane lines (4 lines)
lane_lines = []
for line in model.laneLines:
lane_lines.append({
'x': downsample(list(line.x)),
'y': downsample(list(line.y)),
'z': downsample(list(line.z)),
})
# Road edges (2 edges)
road_edges = []
for edge in model.roadEdges:
road_edges.append({
'x': downsample(list(edge.x)),
'y': downsample(list(edge.y)),
'z': downsample(list(edge.z)),
})
return {
'position': position,
'laneLines': lane_lines,
'laneLineProbs': list(model.laneLineProbs) if hasattr(model, 'laneLineProbs') else [0, 0, 0, 0],
'roadEdges': road_edges,
'roadEdgeStds': list(model.roadEdgeStds) if hasattr(model, 'roadEdgeStds') else [1, 1],
}
def extract_live_calibration(sm):
"""Extract liveCalibration fields used by dashy."""
cal = sm['liveCalibration']
return {
'rpyCalib': list(cal.rpyCalib) if hasattr(cal, 'rpyCalib') and cal.rpyCalib else [],
'calStatus': str(cal.calStatus) if hasattr(cal, 'calStatus') else 'uncalibrated',
'height': list(cal.height) if hasattr(cal, 'height') else [],
}
def extract_longitudinal_plan(sm):
"""Extract longitudinalPlan fields used by dashy."""
lp = sm['longitudinalPlan']
return {
'allowThrottle': bool(safe_get(lp, 'allowThrottle', True)),
}
def extract_controls_state_ext(sm):
"""Extract controlsStateExt fields used by dashy."""
cse = sm['controlsStateExt']
return {
'alkaActive': bool(safe_get(cse, 'alkaActive', False)),
}
def extract_car_params(sm):
"""Extract carParams fields used by dashy."""
cp = sm['carParams']
return {
'openpilotLongitudinalControl': bool(safe_get(cp, 'openpilotLongitudinalControl', False)),
}
# =============================================================================
# TOPIC CONFIGURATION
# =============================================================================
# Single source of truth for all subscribed topics.
# Comment out a line to disable that topic entirely.
#
# Fields:
# extractor: function(sm) -> dict, extracts data from message
# rate: 'fast' = every frame when updated
# number = slow poll divider (e.g., LOOP_RATE = 1Hz)
# 'valid' = just track valid state, no extraction
# 'subscribe' = subscribed but extracted within other extractors
# default: initial cache value (None if not specified)
# =============================================================================
TOPICS = {
# Fast topics - extract every frame when updated
'carState': {'extractor': extract_car_state, 'rate': 'fast'},
'selfdriveState': {'extractor': extract_selfdrive_state, 'rate': 'fast'},
'radarState': {'extractor': extract_radar_state, 'rate': 'fast'},
'liveTracks': {'extractor': extract_live_tracks, 'rate': 'fast'},
'modelV2': {'extractor': extract_model_v2, 'rate': 'fast'},
'longitudinalPlan': {'extractor': extract_longitudinal_plan, 'rate': 'fast'},
# Slow topics - poll at fixed intervals
'deviceState': {'extractor': extract_device_state, 'rate': LOOP_RATE // 2},
'liveCalibration': {'extractor': extract_live_calibration, 'rate': LOOP_RATE},
'carParams': {'extractor': extract_car_params, 'rate': LOOP_RATE * 2},
# Valid-only topics - just track valid state
'roadCameraState': {'rate': 'valid', 'default': False},
# Subscribe-only topics - subscribed but extracted within other extractors
'controlsState': {'rate': 'subscribe'},
# Optional/dragonpilot-specific topics - comment out to disable
'controlsStateExt': {'extractor': extract_controls_state_ext, 'rate': 'fast', 'default': {'alkaActive': False}},
}
def _available_topics(topics_cfg):
"""Filter TOPICS to only services this cereal schema knows about.
Lets dragonpilot-specific topics like controlsStateExt drop out
cleanly on a vanilla openpilot schema instead of crashing SubMaster.
Topics that drop out keep their default cache value (see 'default'
in the TOPICS entry), which the frontend already null-checks.
"""
try:
from cereal.services import SERVICE_LIST as _services
except ImportError:
try:
from cereal.services import services as _services
except ImportError:
return topics_cfg
out = {}
for name, cfg in topics_cfg.items():
if name in _services:
out[name] = cfg
else:
cloudlog.info(f"dashyd: cereal service '{name}' not available, skipping")
return out
def main():
cloudlog.info("dashyd: starting")
# Initialize metric preference
refresh_metric_preference()
topics = _available_topics(TOPICS)
# Derive services list from filtered topics
services = list(topics.keys())
sm = messaging.SubMaster(services)
pm = messaging.PubMaster(['dashyState'])
rk = Ratekeeper(LOOP_RATE)
# Initialize cache from TOPICS defaults (always include all topics so
# the frontend gets default values for dropped optional ones too).
cache = {t: cfg.get('default') for t, cfg in TOPICS.items() if cfg.get('rate') != 'subscribe'}
cache['carParams'] = get_car_params_from_params() # special: init from Params
# Build topic lists from the filtered topics (only subscribed ones run their extractors)
fast_topics = {t: cfg['extractor'] for t, cfg in topics.items() if cfg.get('rate') == 'fast'}
slow_topics = {t: (cfg['extractor'], cfg['rate']) for t, cfg in topics.items()
if isinstance(cfg.get('rate'), int)}
valid_topics = [t for t, cfg in topics.items() if cfg.get('rate') == 'valid']
cache_dirty = True
frame_count = 0
while True:
sm.update(0)
frame_count += 1
# Refresh metric preference every ~2 seconds
if frame_count % (LOOP_RATE * 2) == 0:
refresh_metric_preference()
cache_dirty = True # Force re-format with new units
# Fast topics - extract when updated
for topic, extractor in fast_topics.items():
if sm.updated[topic]:
cache[topic] = extractor(sm)
cache_dirty = True
# Slow topics - extract at fixed intervals (ignore sm.updated)
for topic, (extractor, divider) in slow_topics.items():
if frame_count % divider == 0:
cache[topic] = extractor(sm)
cache_dirty = True
# Valid-only topics - just track valid state
for topic in valid_topics:
if sm.updated[topic]:
new_val = sm.valid[topic]
if cache[topic] != new_val:
cache[topic] = new_val
cache_dirty = True
# Only serialize and publish if something changed
if cache_dirty:
# Only publish when critical openpilot data exists
critical_ready = (
cache.get('carState') is not None and
cache.get('modelV2') is not None and
cache.get('selfdriveState') is not None
)
if critical_ready:
state = {
'ts': sm.logMonoTime['carState'],
'display': {
'isMetric': _is_metric,
'speedUnit': get_speed_unit(),
'distanceUnit': get_distance_unit(),
},
**cache, # include all cached topics
}
msg = messaging.new_message('dashyState')
msg.dashyState.json = json.dumps(state).encode()
pm.send('dashyState', msg)
cache_dirty = False
rk.keep_time()
if __name__ == "__main__":
main()
+854
View File
@@ -0,0 +1,854 @@
#!/usr/bin/env python3
"""
Copyright (c) 2026, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Dashy HTTP Server
Provides REST API and static file serving for the dashy web UI.
- Settings management (read/write params)
- File browser for drive logs
- Static file serving for web UI
"""
import argparse
import ast
import asyncio
import json
import operator
import os
import logging
import time
from datetime import datetime
from functools import wraps
from urllib.parse import quote
from aiohttp import web
from cereal import messaging
from openpilot.common.params import Params
from openpilot.system.hardware import PC, HARDWARE
from openpilot.system.ui.lib.multilang import multilang as base_multilang
from dragonpilot.settings import SETTINGS
try:
from openpilot.system.version import get_build_metadata as _get_build_metadata
except Exception:
_get_build_metadata = None
# --- Configuration ---
DEFAULT_DIR = os.path.realpath(os.path.join(os.path.dirname(__file__), '..') if PC else '/data/media/0/realdata')
WEB_DIST_PATH = os.path.join(os.path.dirname(__file__), "web", "dist")
CAR_PARAMS_CACHE_TTL = 30 # seconds
logger = logging.getLogger("dashy")
class MockParams:
"""In-memory params mock for dev mode."""
_store = {}
def get(self, key, default=None): return self._store.get(key, default)
def get_bool(self, key, default=False): return bool(self._store.get(key)) if key in self._store else default
def put(self, key, value): self._store[key] = value
def put_bool(self, key, value): self._store[key] = value
def remove(self, key): self._store.pop(key, None)
def check_key(self, key): return True
# --- Caching Layer ---
class AppCache:
"""Centralized cache for expensive operations."""
def __init__(self):
self._params = None
self._car_params = None
self._car_params_time = 0
self._context = None
self._context_time = 0
self._settings_cache = None
self._settings_cache_time = 0
@property
def params(self):
"""Get shared Params instance (or mock if unavailable)."""
if self._params is None:
try:
self._params = Params()
except Exception as e:
logger.warning(f"Params unavailable, using mock: {e}")
self._params = MockParams()
return self._params
def get_car_params(self):
"""Get cached CarParams data (brand, longitudinal control)."""
now = time.time()
if self._car_params is None or (now - self._car_params_time) > CAR_PARAMS_CACHE_TTL:
self._car_params = self._parse_car_params()
self._car_params_time = now
return self._car_params
def _parse_car_params(self):
"""Parse CarParams from Params store."""
result = {'brand': '', 'openpilot_longitudinal_control': False}
try:
car_params_bytes = self.params.get("CarParams")
if car_params_bytes:
from cereal import car
with car.CarParams.from_bytes(car_params_bytes) as cp:
result['brand'] = cp.brand
result['openpilot_longitudinal_control'] = cp.openpilotLongitudinalControl
except Exception as e:
logger.debug(f"Could not parse CarParams: {e}")
return result
def get_settings_context(self):
"""Get context dict for settings condition evaluation."""
now = time.time()
if self._context is None or (now - self._context_time) > CAR_PARAMS_CACHE_TTL:
car_params = self.get_car_params()
self._context = {
'brand': car_params['brand'],
'openpilotLongitudinalControl': car_params['openpilot_longitudinal_control'],
'LITE': os.getenv("LITE") is not None,
'MICI': self._check_mici(),
# Upstream-mirror items gate on these.
'DASHY': True,
'IS_RELEASE': self._is_release_channel(),
}
self._context_time = now
return self._context
def _check_mici(self):
"""Check if device is MICI type."""
try:
return HARDWARE.get_device_type() == "mici"
except Exception:
return False
def _is_release_channel(self):
if _get_build_metadata is None:
return False
try:
return bool(_get_build_metadata().release_channel)
except Exception:
return False
def get_bool_safe(self, key, default=False):
"""Safely get a boolean param with default."""
try:
return self.params.get_bool(key)
except Exception:
return default
def invalidate(self):
"""Invalidate all caches."""
self._car_params = None
self._context = None
self._settings_cache = None
# --- Helper Functions ---
def api_handler(func):
"""Decorator for API handlers with consistent error handling."""
@wraps(func)
async def wrapper(request):
try:
return await func(request)
except web.HTTPException:
raise
except Exception as e:
logger.error(f"{func.__name__} error: {e}", exc_info=True)
return web.json_response({'error': str(e)}, status=500)
return wrapper
def get_safe_path(requested_path):
"""Ensures the requested path is within DEFAULT_DIR."""
combined_path = os.path.join(DEFAULT_DIR, requested_path.lstrip('/'))
safe_path = os.path.realpath(combined_path)
if os.path.commonpath((safe_path, DEFAULT_DIR)) == DEFAULT_DIR:
return safe_path
return None
_CMP_OPS = {
ast.Eq: operator.eq,
ast.NotEq: operator.ne,
ast.Lt: operator.lt,
ast.LtE: operator.le,
ast.Gt: operator.gt,
ast.GtE: operator.ge,
}
def _eval_node(node, context):
"""Evaluate a tightly restricted AST node against a context dict.
Only the operators that SETTINGS conditions actually use are supported:
Name lookup, literal Constants, and / or / not, and the six numeric
comparisons. No function calls, attribute access, subscripts, or
arithmetic — those would re-open the eval-sandbox escape paths.
"""
if isinstance(node, ast.Expression):
return _eval_node(node.body, context)
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Name):
return context.get(node.id, False)
if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not):
return not _eval_node(node.operand, context)
if isinstance(node, ast.BoolOp):
values = [_eval_node(v, context) for v in node.values]
if isinstance(node.op, ast.And):
return all(values)
if isinstance(node.op, ast.Or):
return any(values)
if isinstance(node, ast.Compare) and len(node.ops) == 1 and len(node.comparators) == 1:
op_type = type(node.ops[0])
if op_type in _CMP_OPS:
left = _eval_node(node.left, context)
right = _eval_node(node.comparators[0], context)
return _CMP_OPS[op_type](left, right)
raise ValueError(f"Unsupported node: {type(node).__name__}")
def eval_condition(condition, context):
"""Evaluate a SETTINGS condition expression in a sandboxed AST walker."""
if not condition:
return True
try:
tree = ast.parse(condition, mode='eval')
return bool(_eval_node(tree, context))
except Exception as e:
logger.debug(f"Condition evaluation failed: {condition}, error: {e}")
return False
def resolve_value(value):
"""Resolve callable values (lambdas) for JSON serialization."""
return value() if callable(value) else value
# Map of settings-declared param keys to their setting dict.
# Used as an allowlist for /api/settings/params/{name} read/write so
# LAN clients can only touch keys that the UI knowingly exposes.
def _build_param_setting_map():
out = {}
for section in SETTINGS:
for setting in section.get('settings', []):
key = setting.get('key')
if not key:
continue
# action_item entries use `key` as the action name, not a real
# param — skip so they don't leak into the param read/write
# allowlist.
if setting.get('type') == 'action_item':
continue
out[key] = setting
return out
_PARAM_SETTINGS = _build_param_setting_map()
# Control-tab / one-off params the UI legitimately reads or writes that
# are not part of the SETTINGS schema. Kept as an explicit allowlist so
# the broader 'unknown param' guard still blocks arbitrary writes.
_CONTROL_PARAMS = {
'dp_dev_go_off_road', # Controls tab: force-offroad toggle
'DoReboot', # Controls tab: reboot button
'ExperimentalMode', # Tesla HUD: tap set-speed circle to toggle
}
def _param_allowed(key):
return key in _PARAM_SETTINGS or key in _CONTROL_PARAMS
# --- API Endpoints ---
@api_handler
async def init_api(request):
"""Provide initial data to the client."""
cache: AppCache = request.app['cache']
return web.json_response({
'dp_dev_dashy': cache.get_bool_safe("dp_dev_dashy", True),
'isOffroad': cache.get_bool_safe("IsOffroad", False),
})
@api_handler
async def list_files_api(request):
"""List files and folders."""
path_param = request.query.get('path', '/')
safe_path = get_safe_path(path_param)
if not safe_path or not os.path.isdir(safe_path):
return web.json_response({'error': 'Invalid or Not Found Path'}, status=404)
items = []
for entry in os.listdir(safe_path):
full_path = os.path.join(safe_path, entry)
# Skip entries whose real target escapes DEFAULT_DIR (e.g., symlinks).
# get_safe_path only validates the requested directory itself; each
# child has to be re-checked to prevent listing files outside the tree.
real_full = os.path.realpath(full_path)
if os.path.commonpath((real_full, DEFAULT_DIR)) != DEFAULT_DIR:
continue
try:
stat = os.stat(full_path)
is_dir = os.path.isdir(full_path)
items.append({
'name': entry,
'is_dir': is_dir,
'mtime': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M'),
'size': stat.st_size if not is_dir else 0
})
except FileNotFoundError:
continue
# Sort: directories first (by mtime desc), then files (by mtime desc)
dirs = sorted([i for i in items if i['is_dir']], key=lambda x: x['mtime'], reverse=True)
files = sorted([i for i in items if not i['is_dir']], key=lambda x: x['mtime'], reverse=True)
relative_path = os.path.relpath(safe_path, DEFAULT_DIR)
return web.json_response({
'path': '' if relative_path == '.' else relative_path,
'files': dirs + files
})
@api_handler
async def serve_player_api(request):
"""Serve the HLS player page."""
file_path = request.query.get('file')
if not file_path:
return web.Response(text="File parameter is required.", status=400)
if get_safe_path(file_path) is None:
return web.Response(text="Invalid file path.", status=400)
player_html_path = os.path.join(WEB_DIST_PATH, 'pages', 'player.html')
try:
with open(player_html_path, 'r') as f:
html_template = f.read()
except FileNotFoundError:
return web.Response(text="Player HTML not found.", status=500)
html = html_template.replace('{{FILE_PATH}}', quote(file_path, safe=''))
return web.Response(text=html, content_type='text/html')
@api_handler
async def serve_manifest_api(request):
"""Dynamically generate m3u8 playlist."""
file_path = request.query.get('file', '').lstrip('/')
if not file_path:
return web.Response(text="File parameter is required.", status=400)
if get_safe_path(file_path) is None:
return web.Response(text="Invalid file path.", status=400)
encoded_path = quote(file_path)
manifest = f"#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:60\n#EXT-X-PLAYLIST-TYPE:VOD\n#EXTINF:60.0,\n/media/{encoded_path}\n#EXT-X-ENDLIST\n"
return web.Response(text=manifest, content_type='application/vnd.apple.mpegurl')
@api_handler
async def get_settings_config_api(request):
"""Get the settings configuration from settings.py."""
cache: AppCache = request.app['cache']
# Return cached settings if fresh (2 second TTL)
now = time.time()
if cache._settings_cache is not None and (now - cache._settings_cache_time) < 2:
return web.json_response(cache._settings_cache)
params = cache.params
# Update language if changed
current_lang = params.get("LanguageSetting")
if current_lang:
lang_str = current_lang.decode() if isinstance(current_lang, bytes) else str(current_lang)
lang_str = lang_str.removeprefix("main_")
if lang_str != base_multilang.language and lang_str in base_multilang.languages.values():
base_multilang._language = lang_str
base_multilang.setup()
context = cache.get_settings_context()
settings_with_values = []
for section in SETTINGS:
if not eval_condition(section.get('condition'), context):
continue
section_copy = section.copy()
settings_list = []
for setting in section.get('settings', []):
if not eval_condition(setting.get('condition'), context):
continue
setting_copy = setting.copy()
key = setting['key']
# Resolve callable values
for field in ['title', 'description', 'suffix', 'special_value_text']:
if field in setting_copy:
setting_copy[field] = resolve_value(setting_copy[field])
if 'options' in setting_copy:
setting_copy['options'] = [resolve_value(opt) for opt in setting_copy['options']]
# Get current value based on type
setting_copy['current_value'] = _get_setting_value(params, setting)
settings_list.append(setting_copy)
if settings_list:
section_copy['settings'] = settings_list
settings_with_values.append(section_copy)
response_data = {'settings': settings_with_values}
cache._settings_cache = response_data
cache._settings_cache_time = now
return web.json_response(response_data)
def _get_setting_value(params, setting):
"""Get current value for a setting from Params."""
key = setting['key']
setting_type = setting['type']
default = setting.get('default', 0)
try:
if setting_type == 'toggle_item':
return params.get_bool(key)
elif setting_type == 'double_spin_button_item':
value = params.get(key)
return float(value) if value is not None else float(default)
elif setting_type in ('text_input_item', 'text_display_item'):
value = params.get(key)
if value is None:
return ''
return value.decode('utf-8', errors='replace') if isinstance(value, bytes) else str(value)
elif setting_type == 'action_item':
# Pure action buttons have no stored value; return None so the
# UI treats it as display-only.
return None
else: # spin_button_item, text_spin_button_item
value = params.get(key)
return int(value) if value is not None else int(default)
except Exception as e:
logger.warning(f"Error getting value for {key}: {e}")
if setting_type == 'toggle_item':
return False
elif setting_type == 'double_spin_button_item':
return float(default)
elif setting_type in ('text_input_item', 'text_display_item'):
return ''
elif setting_type == 'action_item':
return None
return int(default)
@api_handler
async def save_param_api(request):
"""Save a single param value.
Usage: POST /api/settings/params/{name}
Body: { "value": <value> }
"""
param_name = request.match_info.get('param_name')
if not param_name:
return web.json_response({'error': 'param_name is required'}, status=400)
if not _param_allowed(param_name):
return web.json_response({'error': 'Unknown param'}, status=403)
setting = _PARAM_SETTINGS.get(param_name)
if setting is not None and setting.get('type') == 'text_display_item':
return web.json_response({'error': 'Read-only param'}, status=403)
cache: AppCache = request.app['cache']
params = cache.params
data = await request.json()
if 'value' not in data:
return web.json_response({'error': 'value is required in body'}, status=400)
_save_param(params, param_name, data['value'])
cache.invalidate()
logger.info(f"Param saved: {param_name}={data['value']}")
return web.json_response({'status': 'success', 'key': param_name, 'value': data['value']})
def _save_param(params, key, value):
"""Save a single param value with proper type handling."""
try:
param_type = params.get_type(key)
if param_type == 1: # BOOL
params.put_bool(key, bool(value))
elif param_type == 2: # INT
params.put(key, int(value))
elif param_type == 3: # FLOAT
params.put(key, float(value))
elif isinstance(value, bool):
params.put_bool(key, value)
else:
params.put(key, str(value) if not isinstance(value, str) else value)
logger.debug(f"Saved {key}={value} (type={param_type})")
except Exception as e:
logger.error(f"Error saving param {key}={value}: {e}")
raise
def _get_param_value(params, key):
"""Get a single param value via its declared setting type, or as a
bool for control-only params that have no SETTINGS entry."""
setting = _PARAM_SETTINGS.get(key)
if setting is not None:
return _get_setting_value(params, setting)
if key in _CONTROL_PARAMS:
try:
return params.get_bool(key)
except Exception:
return False
return None
@api_handler
async def get_param_api(request):
"""Get a single param value."""
param_name = request.match_info.get('param_name')
if not param_name:
return web.json_response({'error': 'param_name is required'}, status=400)
if not _param_allowed(param_name):
return web.json_response({'error': 'Unknown param'}, status=403)
cache: AppCache = request.app['cache']
try:
value = _get_param_value(cache.params, param_name)
except Exception:
value = None
return web.json_response({'key': param_name, 'value': value})
# --- Action endpoints ---
# Named side-effectful operations declared by settings items via the
# `action` field (text_input_item / action_item). Each handler receives
# the parsed JSON body and the AppCache; it returns a dict that is
# serialized as the JSON response. Errors should be raised — the wrapper
# converts them to 502/500 responses.
SSH_KEY_FETCH_TIMEOUT_S = 10
SSH_KEY_MAX_BYTES = 16 * 1024 # plenty for any realistic ~/.ssh/authorized_keys
GITHUB_USERNAME_MAX_LEN = 39 # github's own limit
def _validate_github_username(username):
"""GitHub username: 1-39 chars, alnum or single hyphen, no leading/trailing hyphen."""
if not username or len(username) > GITHUB_USERNAME_MAX_LEN:
return False
if username.startswith('-') or username.endswith('-'):
return False
if '--' in username:
return False
return all(c.isalnum() or c == '-' for c in username)
async def _fetch_github_ssh_keys(username):
"""Fetch https://github.com/{username}.keys. Returns the body text on
HTTP 200; raises web.HTTPException with an upstream-derived status on
failure so the action endpoint surfaces the real reason."""
import aiohttp
url = f"https://github.com/{quote(username, safe='')}.keys"
timeout = aiohttp.ClientTimeout(total=SSH_KEY_FETCH_TIMEOUT_S)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
if resp.status == 404:
raise web.HTTPNotFound(reason=f"GitHub user '{username}' not found")
if resp.status != 200:
raise web.HTTPBadGateway(reason=f"github.com returned HTTP {resp.status}")
body = await resp.content.read(SSH_KEY_MAX_BYTES + 1)
if len(body) > SSH_KEY_MAX_BYTES:
raise web.HTTPBadGateway(reason="SSH key response too large")
return body.decode('utf-8', errors='replace')
async def _action_ssh_key_set(request, payload, cache):
"""Fetch the user's GitHub SSH keys and write both params atomically.
Body: { "value": "<github-username>" }. On success the device's
sshd_config drop-in is updated by openpilot's own SSH manager."""
username = (payload.get('value') or '').strip()
if not _validate_github_username(username):
raise web.HTTPBadRequest(reason="Invalid GitHub username")
keys_body = await _fetch_github_ssh_keys(username)
if not keys_body.strip():
raise web.HTTPBadRequest(reason=f"GitHub user '{username}' has no public SSH keys")
params = cache.params
# Write keys first; only commit the username if keys were stored
# successfully — keeps the two params consistent.
params.put('GithubSshKeys', keys_body)
params.put('GithubUsername', username)
cache.invalidate()
logger.info(f"SSH keys set from github.com/{username} ({len(keys_body)} bytes)")
return {'status': 'ok', 'username': username, 'key_bytes': len(keys_body)}
async def _action_ssh_key_clear(request, payload, cache):
params = cache.params
params.put('GithubSshKeys', '')
params.put('GithubUsername', '')
cache.invalidate()
logger.info("SSH keys cleared")
return {'status': 'ok'}
_ACTION_HANDLERS = {
'ssh_key_set': _action_ssh_key_set,
'ssh_key_clear': _action_ssh_key_clear,
}
@api_handler
async def run_action_api(request):
"""Dispatch /api/action/{name} → registered handler."""
name = request.match_info.get('name', '')
handler = _ACTION_HANDLERS.get(name)
if handler is None:
return web.json_response({'error': f'Unknown action: {name}'}, status=404)
try:
payload = await request.json()
except Exception:
payload = {}
cache: AppCache = request.app['cache']
result = await handler(request, payload, cache)
return web.json_response(result)
@api_handler
async def get_model_list_api(request):
"""Get the model list and current selection."""
cache: AppCache = request.app['cache']
params = cache.params
# Get model list. JSON-typed params come back already-parsed in
# newer dragonpilot; older builds returned bytes/str — handle both.
model_list = {}
try:
raw = params.get("dp_dev_model_list")
if raw:
if isinstance(raw, (bytes, str)):
model_list = json.loads(raw)
elif isinstance(raw, dict):
model_list = raw
except Exception as e:
logger.debug(f"Could not parse dp_dev_model_list: {e}")
# Get current selection
selected_model = ""
try:
selected_raw = params.get("dp_dev_model_selected")
if selected_raw:
selected_model = selected_raw.decode('utf-8') if isinstance(selected_raw, bytes) else str(selected_raw)
except Exception as e:
logger.debug(f"Could not get dp_dev_model_selected: {e}")
return web.json_response({
'model_list': model_list,
'selected_model': selected_model
})
@api_handler
async def save_model_selection_api(request):
"""Save the selected model."""
cache: AppCache = request.app['cache']
params = cache.params
data = await request.json()
selected_model = data.get('selected_model', '')
if not selected_model or selected_model == "[AUTO]":
params.put("dp_dev_model_selected", "")
logger.info("Model selection cleared (AUTO mode)")
else:
params.put("dp_dev_model_selected", selected_model)
logger.info(f"Model selection saved: {selected_model}")
return web.json_response({'status': 'success'})
# --- WebSocket endpoint for data streaming ---
# One shared publisher task polls the dashyState SubMaster and fans out
# to every connected client. The previous per-connection design ran
# blocking ZMQ I/O on the event loop, which starved every other request
# under multi-client load.
async def _publisher_loop(app):
# IMPORTANT: ZMQ sockets are thread-affined. Construct the SubMaster on
# the asyncio main thread and call update() on the same thread — using
# asyncio.to_thread bounces between worker threads and silently breaks
# the receive. The 0-timeout update is cheap enough on the event loop;
# the per-client send is what we actually need to be async for.
try:
sm = messaging.SubMaster(['dashyState'])
except Exception as e:
logger.warning(f"Publisher disabled (SubMaster init failed): {e}")
return
logger.info("dashyState publisher loop started")
while True:
try:
sm.update(0)
if sm.updated['dashyState']:
json_data = sm['dashyState'].json
if isinstance(json_data, bytes):
json_data = json_data.decode('utf-8')
clients = list(app['ws_clients'])
for ws in clients:
if ws.closed:
app['ws_clients'].discard(ws)
continue
try:
await ws.send_str(json_data)
except Exception as e:
logger.debug(f"WebSocket send failed, dropping client: {e}")
app['ws_clients'].discard(ws)
await asyncio.sleep(0.01)
except asyncio.CancelledError:
raise
except Exception as e:
# Don't let a transient error tear down the loop silently.
logger.exception(f"Publisher loop error: {e}")
await asyncio.sleep(0.1)
async def websocket_handler(request):
"""WebSocket endpoint for data-only connections - streams dashyState directly."""
ws = web.WebSocketResponse()
await ws.prepare(request)
logger.info("WebSocket client connected")
request.app['ws_clients'].add(ws)
try:
# Wait until the client disconnects; no inbound traffic expected.
async for _ in ws:
pass
except Exception as e:
logger.warning(f"WebSocket error: {e}")
finally:
request.app['ws_clients'].discard(ws)
logger.info("WebSocket client disconnected")
return ws
# --- No-cache middleware for web assets ---
# Dashy is a same-origin LAN app; no CORS headers are emitted so that
# browsers will block cross-origin JS from mutating settings via the
# JSON endpoints (the preflight will fail for non-simple requests).
@web.middleware
async def no_cache_middleware(request, handler):
response = await handler(request)
path = request.path.lower()
if path.endswith(('.html', '.js', '.css')) or path == '/':
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# --- Application Setup ---
async def on_startup(app):
"""Initialize app-level resources."""
app['cache'] = AppCache()
app['ws_clients'] = set()
app['publisher_task'] = asyncio.create_task(_publisher_loop(app))
logger.info("Dashy server started")
async def on_cleanup(app):
"""Cleanup app-level resources."""
task = app.get('publisher_task')
if task and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
logger.info("Dashy server stopped")
def setup_aiohttp_app(host: str, port: int, debug: bool):
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
app = web.Application(middlewares=[no_cache_middleware])
# API routes
app.router.add_get("/api/init", init_api)
app.router.add_get("/api/files", list_files_api)
app.router.add_get("/api/play", serve_player_api)
app.router.add_get("/api/manifest.m3u8", serve_manifest_api)
app.router.add_get("/api/settings", get_settings_config_api)
app.router.add_get("/api/settings/params/{param_name}", get_param_api)
app.router.add_post("/api/settings/params/{param_name}", save_param_api)
app.router.add_get("/api/models", get_model_list_api)
app.router.add_post("/api/models/select", save_model_selection_api)
app.router.add_post("/api/action/{name}", run_action_api)
app.router.add_get("/api/ws", websocket_handler) # WebSocket for data streaming
# Static files
app.router.add_static('/media', path=DEFAULT_DIR, name='media', show_index=False, follow_symlinks=False)
app.router.add_static('/download', path=DEFAULT_DIR, name='download', show_index=False, follow_symlinks=False)
app.router.add_get("/", lambda r: web.FileResponse(os.path.join(WEB_DIST_PATH, "index.html")))
app.router.add_static("/", path=WEB_DIST_PATH)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
return app
def main():
parser = argparse.ArgumentParser(description="Dashy Server")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to listen on")
parser.add_argument("--port", type=int, default=5088, help="Port to listen on")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
app = setup_aiohttp_app(args.host, args.port, args.debug)
web.run_app(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 145 KiB

+37
View File
@@ -0,0 +1,37 @@
<!DOCTYPE html>
<html lang="en" data-theme="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=no, viewport-fit=cover">
<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">
<meta http-equiv="Pragma" content="no-cache">
<meta http-equiv="Expires" content="0">
<title>Dashy by dragonpilot</title>
<link rel="icon" href="/icons/icon-192x192.png">
<link rel="stylesheet" href="/css/styles.css">
</head>
<body>
<div id="app-container" class="w-full h-full">
<!-- Loading Overlay (shown when server is unreachable, hidden by JS when connected) -->
<div id="loading-screen" style="display: flex" class="fixed inset-0 z-[400] flex-col items-center justify-center bg-black/95">
<img src="/icons/dashy.png" alt="Dashy" class="w-24 h-24 mb-6">
<span class="loading loading-spinner loading-lg text-primary mb-4"></span>
<div id="loading-status" class="text-white/70 text-sm">Connecting to device...</div>
</div>
<!-- HUD Page (full-screen, default when enabled) -->
<div id="hud-page" class="hud-page">
<div id="hud-page-content" class="relative w-full h-full">
<!-- Video element created dynamically by theme if needed -->
<canvas id="uiCanvas" class="absolute inset-0 w-full h-full pointer-events-none z-10"></canvas>
</div>
</div>
</div>
<!-- Main app -->
<script src="/js/app.js"></script>
</body>
</html>
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+25
View File
@@ -0,0 +1,25 @@
<!DOCTYPE html>
<html>
<head>
<title>HLS Player</title>
<style>
body, html { margin: 0; padding: 0; height: 100%; background-color: #000; }
#video { width: 100%; height: 100%; }
</style>
<script src="/lib/hls.js"></script>
</head>
<body>
<video id="video" controls autoplay></video>
<script>
var v = document.getElementById('video');
var s = '/api/manifest.m3u8?file={{FILE_PATH}}';
if (Hls.isSupported()) {
var h = new Hls();
h.loadSource(s);
h.attachMedia(v);
} else if (v.canPlayType('application/vnd.apple.mpegurl')) {
v.src = s;
}
</script>
</body>
</html>
+161
View File
@@ -0,0 +1,161 @@
"""
Copyright (c) 2025, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import time
import numpy as np
from openpilot.common.swaglog import cloudlog
# Configuration parameters
SPEED_RATIO = 0.98 # Must be within 2% over cruise speed
TTC_THRESHOLD = 3.0 # seconds - disable ACM when lead is within this time
# Emergency thresholds - IMMEDIATELY disable ACM
EMERGENCY_TTC = 2.0 # seconds - emergency situation
EMERGENCY_RELATIVE_SPEED = 10.0 # m/s (~36 km/h closing speed - only for rapid closing)
EMERGENCY_DECEL_THRESHOLD = -1.5 # m/s² - if MPC wants this much braking, emergency disable
# Safety cooldown after lead detection
LEAD_COOLDOWN_TIME = 0.5 # seconds - brief cooldown to handle sensor glitches
# Speed-based distance scaling - more practical for real traffic
SPEED_BP = [0., 10., 20., 30.] # m/s (0, 36, 72, 108 km/h)
MIN_DIST_V = [15., 20., 25., 30.] # meters - closer to original 25m baseline
class ACM:
def __init__(self):
self.enabled = False
self._is_speed_over_cruise = False
self._has_lead = False
self._active_prev = False
self._last_lead_time = 0.0 # Track when we last saw a lead
self.active = False
self.just_disabled = False
def _check_emergency_conditions(self, lead, v_ego, current_time):
"""Check for emergency conditions that require immediate ACM disable."""
if not lead or not lead.status:
return False
self.lead_ttc = lead.dRel / max(v_ego, 0.1)
relative_speed = v_ego - lead.vLead # Positive = closing
# Speed-adaptive minimum distance
min_dist_for_speed = np.interp(v_ego, SPEED_BP, MIN_DIST_V)
# Emergency disable conditions - only for truly dangerous situations
# Require BOTH close distance AND (fast closing OR very short TTC)
if lead.dRel < min_dist_for_speed and (
self.lead_ttc < EMERGENCY_TTC or
relative_speed > EMERGENCY_RELATIVE_SPEED):
self._last_lead_time = current_time
if self.active: # Only log if we're actually disabling
cloudlog.warning(f"ACM emergency disable: dRel={lead.dRel:.1f}m, TTC={self.lead_ttc:.1f}s, relSpeed={relative_speed:.1f}m/s")
return True
return False
def _update_lead_status(self, lead, v_ego, current_time):
"""Update lead vehicle detection status."""
if lead and lead.status:
self.lead_ttc = lead.dRel / max(v_ego, 0.1)
if self.lead_ttc < TTC_THRESHOLD:
self._has_lead = True
self._last_lead_time = current_time
else:
self._has_lead = False
else:
self._has_lead = False
self.lead_ttc = float('inf')
def _check_cooldown(self, current_time):
"""Check if we're still in cooldown period after lead detection."""
time_since_lead = current_time - self._last_lead_time
return time_since_lead < LEAD_COOLDOWN_TIME
def _should_activate(self, user_ctrl_lon, v_ego, v_cruise, in_cooldown):
"""Determine if ACM should be active based on all conditions."""
self._is_speed_over_cruise = v_ego > (v_cruise * SPEED_RATIO)
return (not user_ctrl_lon and
not self._has_lead and
not in_cooldown and
self._is_speed_over_cruise)
def update_states(self, cc, rs, user_ctrl_lon, v_ego, v_cruise):
"""Update ACM state with multiple safety checks."""
# Basic validation
if not self.enabled or len(cc.orientationNED) != 3:
self.active = False
return
current_time = time.monotonic()
lead = rs.leadOne
# Check emergency conditions first (highest priority)
if self._check_emergency_conditions(lead, v_ego, current_time):
self.active = False
self._active_prev = self.active
return
# Update normal lead status
self._update_lead_status(lead, v_ego, current_time)
# Check cooldown period
in_cooldown = self._check_cooldown(current_time)
# Determine if ACM should be active
self.active = self._should_activate(user_ctrl_lon, v_ego, v_cruise, in_cooldown)
# Track state changes for logging
self.just_disabled = self._active_prev and not self.active
if self.active and not self._active_prev:
cloudlog.info(f"ACM activated: v_ego={v_ego*3.6:.1f} km/h, v_cruise={v_cruise*3.6:.1f} km/h")
elif self.just_disabled:
cloudlog.info("ACM deactivated")
self._active_prev = self.active
def update_a_desired_trajectory(self, a_desired_trajectory):
"""
Modify acceleration trajectory to allow coasting.
SAFETY: Check for any strong braking request and abort.
"""
if not self.active:
return a_desired_trajectory
# SAFETY CHECK: If MPC wants significant braking, DON'T suppress it
min_accel = np.min(a_desired_trajectory)
if min_accel < EMERGENCY_DECEL_THRESHOLD:
cloudlog.warning(f"ACM aborting: MPC requested {min_accel:.2f} m/s² braking")
self.active = False # Immediately deactivate
return a_desired_trajectory # Return unmodified trajectory
# Only suppress very mild braking (> -1.0 m/s²)
# This allows coasting but preserves any meaningful braking
modified_trajectory = np.copy(a_desired_trajectory)
for i in range(len(modified_trajectory)):
if -1.0 < modified_trajectory[i] < 0:
# Only suppress very gentle braking for cruise control
modified_trajectory[i] = 0.0
# Any braking stronger than -1.0 m/s² is preserved!
return modified_trajectory
+74
View File
@@ -0,0 +1,74 @@
"""
Copyright (c) 2025, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import time
import numpy as np
from openpilot.selfdrive.modeld.constants import ModelConstants
# Cooldown times (how long to stay in experimental mode after trigger)
AEM_COOLDOWN_TTC = 3.0 # seconds - for lead TTC events
AEM_DECEL_FOR_STOP = 2.5 # m/s² - assumed deceleration for stop cooldown calc
AEM_STOP_BUFFER = 2.0 # seconds - extra buffer for model latency
# Stop sign/light detection thresholds
SLOW_DOWN_BP = [0., 2.78, 5.56, 8.34, 11.12, 13.89, 15.28]
SLOW_DOWN_DIST = [10, 30., 50., 70., 80., 90., 120.]
# TTC-based triggering thresholds
TTC_THRESHOLD = 1.8 # seconds - trigger when TTC drops below this
MIN_SPEED_FOR_TTC = 5.0 # m/s (~18 km/h) - TTC meaningless at low speeds
MIN_CLOSING_SPEED = 0.5 # m/s - must be closing at least this fast
class AEM:
def __init__(self):
self._active = False
self._cooldown_end_time = 0.0
def _perform_experimental_mode(self, cooldown: float = AEM_COOLDOWN_TTC):
self._active = True
# Extend cooldown if new trigger comes in
new_end = time.monotonic() + cooldown
self._cooldown_end_time = max(self._cooldown_end_time, new_end)
def get_mode(self, mode):
# override mode
if time.monotonic() < self._cooldown_end_time:
mode = 'blended'
else:
self._active = False
return mode
def update_states(self, model_msg, radar_msg, v_ego):
# Stop sign/light detection - model predicts stopping ahead
# Uses max() so it can't shorten an existing longer cooldown
if len(model_msg.orientation.x) == len(model_msg.position.x) == ModelConstants.IDX_N and \
model_msg.position.x[ModelConstants.IDX_N - 1] < np.interp(v_ego, SLOW_DOWN_BP, SLOW_DOWN_DIST):
self._perform_experimental_mode(v_ego / AEM_DECEL_FOR_STOP + AEM_STOP_BUFFER)
# TTC-based triggering - lead car braking hard
if v_ego > MIN_SPEED_FOR_TTC and radar_msg.leadOne.status:
# vRel is negative when closing in on lead
closing_speed = -radar_msg.leadOne.vRel
if closing_speed > MIN_CLOSING_SPEED:
d_rel = radar_msg.leadOne.dRel
if d_rel > 0:
ttc = d_rel / closing_speed
if ttc < TTC_THRESHOLD:
self._perform_experimental_mode(AEM_COOLDOWN_TTC)
+41
View File
@@ -0,0 +1,41 @@
"""
Copyright (c) 2026, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from cereal import log
# Hysteresis thresholds (km/h -> m/s)
APM_ACTIVATE_SPEED = 60 * 1000 / 3600 # 60 km/h — switch to aggressive below this
APM_DEACTIVATE_SPEED = 70 * 1000 / 3600 # 70 km/h — restore user personality above this
class APM:
def __init__(self):
self._active = False
def get_personality(self, v_ego, personality):
if self._active:
if v_ego > APM_DEACTIVATE_SPEED:
self._active = False
else:
if v_ego < APM_ACTIVATE_SPEED:
self._active = True
if self._active:
return log.LongitudinalPersonality.aggressive
return personality
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
Copyright (c) 2019, rav4kumar, Rick Lan, dragonpilot community, and a number of other of contributors.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import numpy as np
NEARSIDE_PROB = 0.2
EDGE_PROB = 0.35
class RoadEdgeDetector:
def __init__(self, enabled = False):
self._is_enabled = enabled
self.left_edge_detected = False
self.right_edge_detected = False
def update(self, road_edge_stds, lane_line_probs):
if not self._is_enabled:
return
left_road_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
left_lane_nearside_prob = lane_line_probs[0]
right_road_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
right_lane_nearside_prob = lane_line_probs[3]
self.left_edge_detected = bool(
left_road_edge_prob > EDGE_PROB and
left_lane_nearside_prob < NEARSIDE_PROB and
right_lane_nearside_prob >= left_lane_nearside_prob
)
self.right_edge_detected = bool(
right_road_edge_prob > EDGE_PROB and
right_lane_nearside_prob < NEARSIDE_PROB and
left_lane_nearside_prob >= right_lane_nearside_prob
)
def set_enabled(self, enabled):
self._is_enabled = enabled
def is_enabled(self):
return self._is_enabled
+152
View File
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
Copyright (c) 2025, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import subprocess
import time
from cereal import car, messaging
from openpilot.common.realtime import Ratekeeper
import threading
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
class Beepd:
def __init__(self, test=False):
self.current_alert = AudibleAlert.none
self._test = test
self.enable_gpio()
def enable_gpio(self):
try:
if self._test:
print("enabling GPIO")
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
except Exception:
if self._test:
print("GPIO failed to enable")
pass
subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def _beep(self, on):
val = "1" if on else "0"
subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def engage(self):
if self._test:
print("beepd: engage")
self._beep(True)
time.sleep(0.05)
self._beep(False)
def disengage(self):
if self._test:
print("beepd: disengage")
for _ in range(2):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def prompt(self):
if self._test:
print("beepd: prompt")
for _ in range(3):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def warning_immediate(self):
if self._test:
print("beepd: warning_immediate")
for _ in range(5):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def dispatch_beep(self, func):
threading.Thread(target=func, daemon=True).start()
def update_alert(self, new_alert):
current_alert_played_once = self.current_alert == AudibleAlert.none
if self.current_alert != new_alert and (new_alert != AudibleAlert.none or current_alert_played_once):
self.current_alert = new_alert
if new_alert == AudibleAlert.engage:
self.dispatch_beep(self.engage)
if new_alert == AudibleAlert.disengage:
self.dispatch_beep(self.disengage)
if new_alert == AudibleAlert.prompt:
self.dispatch_beep(self.prompt)
if new_alert == AudibleAlert.warningImmediate:
self.dispatch_beep(self.warning_immediate)
def get_audible_alert(self, sm):
if sm.updated['selfdriveState']:
new_alert = sm['selfdriveState'].alertSound.raw
self.update_alert(new_alert)
def test_beepd_thread(self):
frame = 0
rk = Ratekeeper(20)
pm = messaging.PubMaster(['selfdriveState'])
while True:
cs = messaging.new_message('selfdriveState')
if frame == 20:
cs.selfdriveState.alertSound = AudibleAlert.engage
if frame == 40:
cs.selfdriveState.alertSound = AudibleAlert.disengage
if frame == 60:
cs.selfdriveState.alertSound = AudibleAlert.prompt
if frame == 80:
cs.selfdriveState.alertSound = AudibleAlert.warningImmediate
pm.send("selfdriveState", cs)
frame += 1
rk.keep_time()
def beepd_thread(self):
if self._test:
threading.Thread(target=self.test_beepd_thread).start()
sm = messaging.SubMaster(['selfdriveState'])
rk = Ratekeeper(20)
while True:
sm.update(0)
self.get_audible_alert(sm)
rk.keep_time()
def main():
s = Beepd(test=False)
s.beepd_thread()
if __name__ == "__main__":
main()
+93
View File
@@ -0,0 +1,93 @@
import socket
import time
import pyray as rl
import qrcode
import numpy as np
from openpilot.common.swaglog import cloudlog
IP_REFRESH_INTERVAL = 5 # seconds
class DashyQR:
"""Shared QR code generator for dashy web UI."""
def __init__(self):
self._qr_texture: rl.Texture | None = None
self._last_qr_url: str | None = None
self._last_ip_check: float = 0
@property
def texture(self):
return self._qr_texture
@property
def url(self) -> str | None:
return self._last_qr_url
@staticmethod
def get_local_ip() -> str | None:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return None
@staticmethod
def get_web_ui_url() -> str:
ip = DashyQR.get_local_ip()
return f"http://{ip if ip else 'localhost'}:5088"
def _generate_qr_code(self, url: str) -> None:
try:
qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=0)
qr.add_data(url)
qr.make(fit=True)
pil_img = qr.make_image(fill_color="white", back_color="black").convert('RGBA')
img_array = np.array(pil_img, dtype=np.uint8)
if self._qr_texture and self._qr_texture.id != 0:
rl.unload_texture(self._qr_texture)
rl_image = rl.Image()
rl_image.data = rl.ffi.cast("void *", img_array.ctypes.data)
rl_image.width = pil_img.width
rl_image.height = pil_img.height
rl_image.mipmaps = 1
rl_image.format = rl.PixelFormat.PIXELFORMAT_UNCOMPRESSED_R8G8B8A8
self._qr_texture = rl.load_texture_from_image(rl_image)
self._last_qr_url = url
except Exception as e:
cloudlog.warning(f"QR code generation failed: {e}")
self._qr_texture = None
def update(self, force: bool = False) -> bool:
"""Update QR code if needed. Returns True if updated."""
now = time.monotonic()
if not force and now - self._last_ip_check < IP_REFRESH_INTERVAL and self._qr_texture:
return False
self._last_ip_check = now
url = self.get_web_ui_url()
if url != self._last_qr_url:
self._generate_qr_code(url)
return True
return False
def force_update(self):
"""Force immediate IP check and QR regeneration."""
self._last_ip_check = 0
def cleanup(self):
"""Unload texture resources."""
if self._qr_texture and self._qr_texture.id != 0:
rl.unload_texture(self._qr_texture)
self._qr_texture = None
def __del__(self):
self.cleanup()
@@ -0,0 +1,60 @@
import pyray as rl
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.label import UnifiedLabel
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from dragonpilot.selfdrive.ui.dashy_qr import DashyQR
class DashyQRCode(Widget):
def __init__(self):
super().__init__()
self._qr = DashyQR()
self._title_label = UnifiedLabel(tr("scan to access"), font_size=32, font_weight=FontWeight.BOLD,
text_color=rl.WHITE, wrap_text=True)
self._subtitle_label = UnifiedLabel("dashy", font_size=48, font_weight=FontWeight.DISPLAY,
text_color=rl.WHITE)
self._or_label = UnifiedLabel(tr("or open browser"), font_size=24, font_weight=FontWeight.NORMAL,
text_color=rl.GRAY)
self._url_label = UnifiedLabel("", font_size=20, font_weight=FontWeight.NORMAL,
text_color=rl.GRAY, wrap_text=True)
def show_event(self):
self._qr.force_update()
def _render(self, rect: rl.Rectangle):
# Skip if off-screen (scroller renders all items, add small buffer for float precision)
if rect.x + rect.width < 1 or rect.x > gui_app.width - 1:
return
if self._qr.update():
self._url_label.set_text(self._qr.url or "")
# Left side: QR code (square, full height)
if self._qr.texture:
scale = rect.height / self._qr.texture.height
pos = rl.Vector2(rect.x, rect.y)
rl.draw_texture_ex(self._qr.texture, pos, 0.0, scale, rl.WHITE)
# Right side: Text
text_x = rect.x + rect.height + 16
text_width = int(rect.width - text_x)
# Title: "scan to access"
self._title_label.set_max_width(text_width)
self._title_label.set_position(text_x, rect.y)
self._title_label.render()
# Subtitle: "dashy"
self._subtitle_label.set_position(text_x, rect.y + 32)
self._subtitle_label.render()
# "or open browser"
self._or_label.set_position(text_x, rect.y + rect.height - 24 - 28)
self._or_label.render()
# URL
self._url_label.set_max_width(text_width)
self._url_label.set_position(text_x, rect.y + rect.height - 24)
self._url_label.render()
+8
View File
@@ -53,6 +53,9 @@ SECTION_ORDER = [
"Longitudinal",
"UI",
"Device",
# Upstream openpilot toggle mirrors (dashy-only, gated by `condition: "DASHY"`).
"Openpilot",
"Developer",
]
# Brand-gated sections: the whole header + its items are hidden when the
@@ -72,6 +75,11 @@ _KNOWN_ITEM_KEYS = _UI_REQUIRED_KEYS | {
"description", "default", "min_val", "max_val", "step", "suffix",
"special_value_text", "options", "brands", "condition",
"depends_on", "param_name", "callback",
# Dashy-only fields (no factory on the device dp panel; web UI consumes them).
# text_display_item: read-only render of a param's value.
# text_input_item: text field that POSTs typed value to the named action endpoint.
# action_item: button that POSTs to the named action endpoint with no payload.
"action",
# Param-storage fields (consumed by generate_settings.py, ignored by UI)
"flags", "param_type",
}
@@ -0,0 +1,16 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_audible_alert_mode",
"type": "text_spin_button_item",
"title": lambda: tr("Audible Alert"),
"description": lambda: tr("Std.: Stock behaviour.<br>Warning: Only emits sound when there is a warning.<br>Off: Does not emit any sound at all."),
"options": [lambda: tr("Std."), lambda: tr("Warning"), lambda: tr("Off")],
"default": "0",
"condition": "not LITE",
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,14 @@
settings:
- key: dp_dev_audible_alert_mode
type: text_spin_button_item
title: "Audible Alert"
description: "Std.: Stock behaviour.<br>Warning: Only emits sound when there is a warning.<br>Off: Does not emit any sound at all."
category: "Device"
condition: "not LITE"
default: 0
options:
- "Std."
- "Warning"
- "Off"
flags: PERSISTENT
param_type: INT
@@ -0,0 +1,20 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_auto_shutdown_in",
"type": "spin_button_item",
"title": lambda: tr("Auto Shutdown After"),
"description": lambda: tr("0 min = Immediately"),
"default": "-5",
"min_val": -5,
"max_val": 300,
"step": 5,
"suffix": lambda: tr("min"),
"special_value_text": lambda: tr("Off"),
"condition": "not MICI",
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,15 @@
settings:
- key: dp_dev_auto_shutdown_in
type: spin_button_item
title: "Auto Shutdown After"
description: "0 min = Immediately"
category: "Device"
condition: "not MICI"
default: -5
min_val: -5
max_val: 300
step: 5
suffix: "min"
special_value_text: "Off"
flags: PERSISTENT
param_type: INT
@@ -0,0 +1,14 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_dashy",
"type": "toggle_item",
"title": lambda: tr("Enable dashy Visual"),
"description": lambda: tr("dashy - dragonpilot's all-in-one system hub.<br><br>Visit http://&lt;device_ip&gt;:5088 to access.<br><br>Enable this to use Tesla Visual/HUD."),
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,21 @@
settings:
- key: dp_dev_dashy
type: toggle_item
title: "dashy HUD"
description: "dashy - dragonpilot's all-in-one system hub for you.<br><br>Visit http://<device_ip>:5088 to access.<br><br>Enable this to use Tesla HUD."
category: "Device"
flags: PERSISTENT
param_type: BOOL
default: "0"
- key: dp_maa_route
category: "Device"
flags: CLEAR_ON_MANAGER_START
param_type: JSON
- key: dp_maa_destination
category: "Device"
flags: PERSISTENT
param_type: JSON
- key: dp_maa_places
category: "Device"
flags: PERSISTENT
param_type: JSON
@@ -0,0 +1,19 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_delay_loggerd",
"type": "spin_button_item",
"title": lambda: tr("Delay Starting Loggerd for:"),
"description": lambda: tr("Delays the startup of loggerd and its related processes when the device goes on-road.<br>This prevents the initial moments of a drive from being recorded, protecting location privacy at the start of a trip."),
"default": "0",
"min_val": 0,
"max_val": 300,
"step": 5,
"suffix": lambda: tr("sec"),
"special_value_text": lambda: tr("Off"),
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,14 @@
settings:
- key: dp_dev_delay_loggerd
type: spin_button_item
title: "Delay Starting Loggerd for:"
description: "Delays the startup of loggerd and its related processes when the device goes on-road.<br>This prevents the initial moments of a drive from being recorded, protecting location privacy at the start of a trip."
category: "Device"
default: 0
min_val: 0
max_val: 300
step: 5
suffix: "sec"
special_value_text: "Off"
flags: PERSISTENT
param_type: INT
@@ -0,0 +1,14 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_disable_connect",
"type": "toggle_item",
"title": lambda: tr("Disable Comma Connect"),
"description": lambda: tr("Disable Comma connect service if you do not wish to upload / being tracked by the service."),
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,9 @@
settings:
- key: dp_dev_disable_connect
type: toggle_item
title: "Disable Comma Connect"
description: "Disable Comma connect service if you do not wish to upload / being tracked by the service."
category: "Device"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,4 @@
ITEMS = [
{"key": "dp_dev_model_selected", "flags": "PERSISTENT", "param_type": "STRING"},
{"key": "dp_dev_model_list", "flags": "PERSISTENT", "param_type": "JSON"},
]
@@ -0,0 +1,9 @@
settings:
- key: dp_dev_model_selected
category: "Device"
flags: PERSISTENT
param_type: STRING
- key: dp_dev_model_list
category: "Device"
flags: PERSISTENT
param_type: STRING
@@ -0,0 +1,26 @@
{
"settings": [
{
"key": "dp_dev_is_rhd",
"type": "toggle_item",
"title": "Enable Right-Hand Drive Mode",
"description": "Allow openpilot to obey right-hand traffic conventions on right driver seat.",
"category": "Device",
"condition": "LITE",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0"
},
{
"key": "dp_dev_beep",
"type": "toggle_item",
"title": "Enable Beep (Warning)",
"description": "Use Buzzer for audiable alerts.",
"category": "Device",
"condition": "LITE",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0"
}
]
}
@@ -0,0 +1,26 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_is_rhd",
"type": "toggle_item",
"title": lambda: tr("Enable Right-Hand Drive Mode"),
"description": lambda: tr("Allow openpilot to obey right-hand traffic conventions on right driver seat."),
"condition": "LITE",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
{
"section": "Device",
"key": "dp_dev_beep",
"type": "toggle_item",
"title": lambda: tr("Enable Beep (Warning)"),
"description": lambda: tr("Use Buzzer for audiable alerts."),
"condition": "LITE",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,20 @@
settings:
- key: dp_dev_is_rhd
type: toggle_item
title: "Enable Right-Hand Drive Mode"
description: "Allow openpilot to obey right-hand traffic conventions on right driver seat."
category: "Device"
condition: "LITE"
flags: PERSISTENT
param_type: BOOL
default: "0"
- key: dp_dev_beep
type: toggle_item
title: "Enable Beep (Warning)"
description: "Use Buzzer for audiable alerts."
category: "Device"
condition: "LITE"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,14 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_dev_opview",
"type": "toggle_item",
"title": lambda: tr("Enable opview"),
"description": lambda: tr("Broadcasts telemetry to the opview App (available on Android). Requires the companion App to be running on an external display."),
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,9 @@
settings:
- key: dp_dev_opview
type: toggle_item
title: "Enable opview"
description: "Broadcasts telemetry to the opview App (available on Android). Requires the companion App to be running on an external display."
category: "Device"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,3 @@
ITEMS = [
{"key": "dp_dev_tethering", "flags": "PERSISTENT", "param_type": "BOOL", "default": "0"},
]
@@ -0,0 +1,6 @@
settings:
- key: dp_dev_tethering
category: "Device"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Lateral",
"key": "dp_lat_alka",
"type": "toggle_item",
"title": lambda: tr("Always-on Lane Keeping Assist (ALKA)"),
"description": lambda: tr("Enable lateral control even when ACC/cruise is disengaged, using ACC Main or LKAS button to toggle. Vehicle must be moving."),
"brands": ["toyota", "hyundai", "honda", "volkswagen", "subaru", "mazda", "nissan", "ford"],
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,10 @@
settings:
- key: dp_lat_alka
type: toggle_item
title: "Always-on Lane Keeping Assist (ALKA)"
description: "Enable lateral control even when ACC/cruise is disengaged, using ACC Main or LKAS button to toggle. Vehicle must be moving."
category: "Lateral"
brands: ["toyota", "hyundai", "honda", "volkswagen", "subaru", "mazda", "nissan", "ford"]
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,18 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Lateral",
"key": "dp_lat_offset_cm",
"type": "spin_button_item",
"title": lambda: tr("Position Offset"),
"description": lambda: tr("Fine-tune where the car drives within the lane. Positive values move the car left, negative values move right.<br>Recommended to start with small values (±5cm) and adjust based on preference."),
"default": "0",
"min_val": -15,
"max_val": 15,
"step": 1,
"suffix": lambda: tr("cm"),
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,13 @@
settings:
- key: dp_lat_offset_cm
type: spin_button_item
title: "Position Offset"
description: "Fine-tune where the car drives within the lane. Positive values move the car left, negative values move right.<br>Recommended to start with small values (±5cm) and adjust based on preference."
category: "Lateral"
default: 0
min_val: -15
max_val: 15
step: 1
suffix: "cm"
flags: PERSISTENT
param_type: INT
+35
View File
@@ -0,0 +1,35 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Lateral",
"key": "dp_lat_lca_speed",
"type": "spin_button_item",
"title": lambda: tr("Lane Change Assist At:"),
"description": lambda: tr("Off = Disable LCA.<br>1 mph = 1.2 km/h."),
"default": "20",
"min_val": 0,
"max_val": 100,
"step": 5,
"suffix": lambda: tr("mph"),
"special_value_text": lambda: tr("Off"),
"flags": "PERSISTENT",
"param_type": "INT",
},
{
"section": "Lateral",
"key": "dp_lat_lca_auto_sec",
"type": "double_spin_button_item",
"title": lambda: tr("+ Auto Lane Change after:"),
"description": lambda: tr("Off = Disable Auto Lane Change."),
"default": "0.0",
"min_val": 0.0,
"max_val": 5.0,
"step": 0.5,
"suffix": lambda: tr("sec"),
"special_value_text": lambda: tr("Off"),
"depends_on": "dp_lat_lca_speed > 0",
"flags": "PERSISTENT",
"param_type": "FLOAT",
},
]
@@ -0,0 +1,38 @@
settings:
- key: dp_lat_lca_speed
type: spin_button_item
title: "Lane Change Assist At:"
description: "Off = Disable LCA.<br>1 mph = 1.2 km/h."
category: "Lateral"
default: 20
min_val: 0
max_val: 100
step: 5
suffix: "mph"
special_value_text: "Off"
on_change:
- target: dp_lat_lca_auto_sec
action: set_enabled
condition: "value > 0"
flags: PERSISTENT
param_type: INT
default: "20"
- key: dp_lat_lca_auto_sec
type: double_spin_button_item
title: "+ Auto Lane Change after:"
description: "Off = Disable Auto Lane Change."
category: "Lateral"
default: 0.0
min_val: 0.0
max_val: 5.0
step: 0.5
suffix: "sec"
special_value_text: "Off"
initially_enabled_by:
param: dp_lat_lca_speed
condition: "value > 0"
default: 20
flags: PERSISTENT
param_type: FLOAT
default: "0.0"
@@ -0,0 +1,14 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Lateral",
"key": "dp_lat_road_edge_detection",
"type": "toggle_item",
"title": lambda: tr("Road Edge Detection (RED)"),
"description": lambda: tr("Block lane change assist when the system detects the road edge.<br>NOTE: This will show 'Car Detected in Blindspot' warning."),
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,9 @@
settings:
- key: dp_lat_road_edge_detection
type: toggle_item
title: "Road Edge Detection (RED)"
description: "Block lane change assist when the system detects the road edge.<br>NOTE: This will show 'Car Detected in Blindspot' warning."
category: "Lateral"
flags: PERSISTENT
param_type: BOOL
default: "0"
+15
View File
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Longitudinal",
"key": "dp_lon_acm",
"type": "toggle_item",
"title": lambda: tr("Enable Adaptive Coasting Mode (ACM)"),
"description": lambda: tr("Adaptive Coasting Mode (ACM) reduces braking to allow smoother coasting when appropriate."),
"condition": "openpilotLongitudinalControl",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,10 @@
settings:
- key: dp_lon_acm
type: toggle_item
title: "Enable Adaptive Coasting Mode (ACM)"
description: "Adaptive Coasting Mode (ACM) reduces braking to allow smoother coasting when appropriate."
category: "Longitudinal"
condition: "openpilotLongitudinalControl"
flags: PERSISTENT
param_type: BOOL
default: "0"
+15
View File
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Longitudinal",
"key": "dp_lon_aem",
"type": "toggle_item",
"title": lambda: tr("Adaptive Experimental Mode (AEM)"),
"description": lambda: tr("Adaptive mode switcher between ACC and Blended based on driving context."),
"condition": "openpilotLongitudinalControl",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,10 @@
settings:
- key: dp_lon_aem
type: toggle_item
title: "Adaptive Experimental Mode (AEM)"
description: "Adaptive mode switcher between ACC and Blended based on driving context."
category: "Longitudinal"
condition: "openpilotLongitudinalControl"
flags: PERSISTENT
param_type: BOOL
default: "0"
+15
View File
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Longitudinal",
"key": "dp_lon_apm",
"type": "toggle_item",
"title": lambda: tr("Adaptive Personality Mode (APM)"),
"description": lambda: tr("Automatically switches personality to \"Aggressive\" below 30 km/h and restores your selected personality above 40 km/h."),
"condition": "openpilotLongitudinalControl",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,10 @@
settings:
- key: dp_lon_apm
type: toggle_item
title: "Adaptive Personality Mode (APM)"
description: "Automatically switches personality to \"Aggressive\" below 30 km/h and restores your selected personality above 40 km/h."
category: "Longitudinal"
condition: "openpilotLongitudinalControl"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,14 @@
{
"settings": [
{
"key": "dp_lon_ext_radar",
"type": "toggle_item",
"title": "Use External Radar",
"description": "See https://github.com/eFiniLan/openpilot-ext-radar-addon for more information.",
"category": "Device",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0"
}
]
}
@@ -0,0 +1,14 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "Device",
"key": "dp_lon_ext_radar",
"type": "toggle_item",
"title": lambda: tr("Use External Radar"),
"description": lambda: tr("See https://github.com/eFiniLan/openpilot-ext-radar-addon for more information."),
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,9 @@
settings:
- key: dp_lon_ext_radar
type: toggle_item
title: "Use External Radar"
description: "See https://github.com/eFiniLan/openpilot-ext-radar-addon for more information."
category: "Device"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "UI",
"key": "dp_ui_display_mode",
"type": "text_spin_button_item",
"title": lambda: tr("Display Mode"),
"description": lambda: tr("Std.: Stock behavior.<br>MAIN+: ACC MAIN on = Display ON.<br>OP+: OP enabled = Display ON.<br>MAIN-: ACC MAIN on = Display OFF<br>OP-: OP enabled = Display OFF."),
"options": [lambda: tr("Std."), lambda: tr("MAIN+"), lambda: tr("OP+"), lambda: tr("MAIN-"), lambda: tr("OP-")],
"default": "0",
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,15 @@
settings:
- key: dp_ui_display_mode
type: text_spin_button_item
title: "Display Mode"
description: "Std.: Stock behavior.<br>MAIN+: ACC MAIN on = Display ON.<br>OP+: OP enabled = Display ON.<br>MAIN-: ACC MAIN on = Display OFF<br>OP-: OP enabled = Display OFF."
category: "UI"
options:
- "Std."
- "MAIN+"
- "OP+"
- "MAIN-"
- "OP-"
flags: PERSISTENT
param_type: INT
default: "0"
@@ -0,0 +1,20 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "UI",
"key": "dp_ui_hide_hud_speed_kph",
"type": "spin_button_item",
"title": lambda: tr("Hide HUD When Moves above:"),
"description": lambda: tr("To prevent screen burn-in, hide Speed, MAX Speed, and Steering/DM Icons when the car moves.<br>Off = Stock Behavior<br>1 km/h = 0.6 mph"),
"default": "0",
"min_val": 0,
"max_val": 120,
"step": 5,
"suffix": lambda: tr("km/h"),
"special_value_text": lambda: tr("Off"),
"condition": "not MICI",
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,15 @@
settings:
- key: dp_ui_hide_hud_speed_kph
type: spin_button_item
title: "Hide HUD When Moves above:"
description: "To prevent screen burn-in, hide Speed, MAX Speed, and Steering/DM Icons when the car moves.<br>Off = Stock Behavior<br>1 km/h = 0.6 mph"
category: "UI"
condition: "not MICI"
default: 0
min_val: 0
max_val: 120
step: 5
suffix: "km/h"
special_value_text: "Off"
flags: PERSISTENT
param_type: INT
@@ -0,0 +1,16 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "UI",
"key": "dp_ui_lead",
"type": "text_spin_button_item",
"title": lambda: tr("Display Lead Stats"),
"description": lambda: tr("Display the statistics of lead car and/or radar tracking points.<br>Lead: Lead stats only<br>Radar: Radar tracking point stats only<br>All: Lead and Radar stats<br>NOTE: Radar option only works on certain vehicle models."),
"options": [lambda: tr("Off"), lambda: tr("Lead"), lambda: tr("Radar"), lambda: tr("All")],
"default": "0",
"condition": "not MICI",
"flags": "PERSISTENT",
"param_type": "INT",
},
]
@@ -0,0 +1,15 @@
settings:
- key: dp_ui_lead
type: text_spin_button_item
title: "Display Lead Stats"
description: "Display the statistics of lead car and/or radar tracking points.<br>Lead: Lead stats only<br>Radar: Radar tracking point stats only<br>All: Lead and Radar stats<br>NOTE: Radar option only works on certain vehicle models."
category: "UI"
condition: "not MICI"
default: 0
options:
- "Off"
- "Lead"
- "Radar"
- "All"
flags: PERSISTENT
param_type: INT
@@ -0,0 +1,15 @@
from dragonpilot.settings import tr
ITEMS = [
{
"section": "UI",
"key": "dp_ui_rainbow",
"type": "toggle_item",
"title": lambda: tr("Rainbow Driving Path like Tesla"),
"description": lambda: tr("Why not?"),
"condition": "not MICI",
"flags": "PERSISTENT",
"param_type": "BOOL",
"default": "0",
},
]
@@ -0,0 +1,10 @@
settings:
- key: dp_ui_rainbow
type: toggle_item
title: "Rainbow Driving Path like Tesla"
description: "Why not?"
category: "UI"
condition: "not MICI"
flags: PERSISTENT
param_type: BOOL
default: "0"
@@ -0,0 +1,88 @@
"""
Mirror of upstream openpilot's DeveloperLayout settings for the dashy web UI.
See openpilot.toggles.py for the rationale (DASHY-gated, no param_type, etc.).
Notes:
- AlphaLongitudinalEnabled / JoystickDebugMode / LongitudinalManeuverMode are
hidden on release builds in the device UI (DeveloperLayout._update_toggles).
Mirror them with `condition: "DASHY and not IS_RELEASE"` so release-branch
dashy hides them too.
- SSH Keys: rendered as a text_input (GithubUsername) + text_display
(GithubSshKeys) + clear button. The actual github.com fetch is handled by
the dashy action endpoint /api/action/ssh_key_set since it has side effects
(HTTP request, atomic two-param write, error handling) that don't fit a
declarative "set this param" model.
- "Show Last Errors": text_display of dp_dev_last_log — the device modal is
also still available via the existing DeveloperLayout button.
"""
from dragonpilot.settings import tr
from openpilot.selfdrive.ui.layouts.settings.developer import DESCRIPTIONS as _DEV_DESC
_SEC = "Developer"
_DASHY = "DASHY"
_DASHY_ALPHA = "DASHY and not IS_RELEASE"
ITEMS = [
{
"section": _SEC, "key": "AdbEnabled", "type": "toggle_item",
"title": lambda: tr("Enable ADB"),
"description": lambda: tr(_DEV_DESC["enable_adb"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "SshEnabled", "type": "toggle_item",
"title": lambda: tr("Enable SSH"),
"condition": _DASHY,
},
{
"section": _SEC, "key": "JoystickDebugMode", "type": "toggle_item",
"title": lambda: tr("Joystick Debug Mode"),
"condition": _DASHY_ALPHA,
},
{
"section": _SEC, "key": "LongitudinalManeuverMode", "type": "toggle_item",
"title": lambda: tr("Longitudinal Maneuver Mode"),
"condition": _DASHY_ALPHA,
},
{
"section": _SEC, "key": "AlphaLongitudinalEnabled", "type": "toggle_item",
"title": lambda: tr("openpilot Longitudinal Control (Alpha)"),
"description": lambda: tr(_DEV_DESC["alpha_longitudinal"]),
"condition": _DASHY_ALPHA,
},
{
"section": _SEC, "key": "ShowDebugInfo", "type": "toggle_item",
"title": lambda: tr("UI Debug Mode"),
"condition": _DASHY,
},
# SSH Keys — typed input + display + clear. github.com fetch lives in the
# dashy ssh_key_set action; see dragonpilot/dashy/serverd.py.
{
"section": _SEC, "key": "GithubUsername", "type": "text_input_item",
"title": lambda: tr("GitHub Username (SSH Keys)"),
"description": lambda: tr(_DEV_DESC["ssh_key"]),
"action": "ssh_key_set",
"condition": _DASHY,
},
{
"section": _SEC, "key": "GithubSshKeys", "type": "text_display_item",
"title": lambda: tr("Stored SSH Keys"),
"condition": _DASHY,
},
{
"section": _SEC, "key": "ssh_key_clear", "type": "action_item",
"title": lambda: tr("Clear SSH Keys"),
"action": "ssh_key_clear",
"condition": _DASHY,
},
# Last error log — read-only display of dp_dev_last_log (already declared by
# core-feat/panel). Device side still has the "Show Last Errors" modal button.
{
"section": _SEC, "key": "dp_dev_last_log", "type": "text_display_item",
"title": lambda: tr("Last Errors"),
"condition": _DASHY,
},
]
+93
View File
@@ -0,0 +1,93 @@
"""
Mirror of upstream openpilot's TogglesLayout settings for the dashy web UI.
The device's own Toggles panel remains the source of truth for the device touchscreen.
These entries surface the same params in dashy so a user on a small-screen device
(comma4 / mici) can flip them from their phone instead.
Items are gated by `condition: "DASHY"` so they only render in the web context —
dashy's settings-context dict sets DASHY=True; the device dp panel leaves it unset
(falsy), hiding these to avoid duplicating the existing Toggles tab.
No flags/param_type/default on items: the params are owned by upstream openpilot
and already declared in common/params_keys.h. generate_settings.py won't emit
duplicates because these have no flags+param_type pair.
Drift detection: see system/tests/test_openpilot_mirror.py.
"""
from dragonpilot.settings import tr
from openpilot.selfdrive.ui.layouts.settings.toggles import DESCRIPTIONS as _TOGGLES_DESC
_SEC = "Openpilot"
_DASHY = "DASHY"
ITEMS = [
{
"section": _SEC, "key": "OpenpilotEnabledToggle", "type": "toggle_item",
"title": lambda: tr("Enable openpilot"),
"description": lambda: tr(_TOGGLES_DESC["OpenpilotEnabledToggle"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "ExperimentalMode", "type": "toggle_item",
"title": lambda: tr("Experimental Mode"),
"condition": _DASHY,
},
{
"section": _SEC, "key": "DisengageOnAccelerator", "type": "toggle_item",
"title": lambda: tr("Disengage on Accelerator Pedal"),
"description": lambda: tr(_TOGGLES_DESC["DisengageOnAccelerator"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "IsLdwEnabled", "type": "toggle_item",
"title": lambda: tr("Enable Lane Departure Warnings"),
"description": lambda: tr(_TOGGLES_DESC["IsLdwEnabled"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "AlwaysOnDM", "type": "toggle_item",
"title": lambda: tr("Always-On Driver Monitoring"),
"description": lambda: tr(_TOGGLES_DESC["AlwaysOnDM"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "RecordFront", "type": "toggle_item",
"title": lambda: tr("Record and Upload Driver Camera"),
"description": lambda: tr(_TOGGLES_DESC["RecordFront"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "RecordAudio", "type": "toggle_item",
"title": lambda: tr("Record and Upload Microphone Audio"),
"description": lambda: tr(_TOGGLES_DESC["RecordAudio"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "IsMetric", "type": "toggle_item",
"title": lambda: tr("Use Metric System"),
"description": lambda: tr(_TOGGLES_DESC["IsMetric"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "DisableLogging", "type": "toggle_item",
"title": lambda: tr("Disable Logging"),
"description": lambda: tr(_TOGGLES_DESC["DisableLogging"]),
"condition": _DASHY,
},
{
"section": _SEC, "key": "DisableUpdates", "type": "toggle_item",
"title": lambda: tr("Disable Updates"),
"description": lambda: tr(_TOGGLES_DESC["DisableUpdates"]),
"condition": _DASHY,
},
# Longitudinal driving personality — text_spin selector mirroring upstream's multi-button widget.
{
"section": _SEC, "key": "LongitudinalPersonality", "type": "text_spin_button_item",
"title": lambda: tr("Driving Personality"),
"description": lambda: tr(_TOGGLES_DESC["LongitudinalPersonality"]),
"options": [lambda: tr("Aggressive"), lambda: tr("Standard"), lambda: tr("Relaxed")],
"default": "1", # Standard
"condition": _DASHY,
},
]
+77
View File
@@ -28,6 +28,81 @@ function agnos_init {
fi
}
set_tici_hw() {
if grep -q "tici" /sys/firmware/devicetree/base/model 2>/dev/null; then
export TICI_HW=1
echo "Querying panda MCU type..."
# Loop for a maximum of 10 attempts
for attempt in {1..10}; do
# Initial wait or wait between retries
sleep 3
MCU_OUTPUT=$(python -c "from panda_tici import Panda; p = Panda(cli=False); print(p.get_mcu_type()); p.close()" 2>/dev/null)
if [[ "$MCU_OUTPUT" == *"McuType.F4"* ]]; then
echo "TICI (DOS) detected"
mount_nvme
export TICI_DOS=1
return 0 # Success, exit function
elif [[ "$MCU_OUTPUT" == *"McuType.H7"* ]]; then
echo "TICI (TRES) detected"
export TICI_TRES=1
return 0 # Success, exit function
fi
# If we reach here, it was UNKNOWN
echo "TICI (UNKNOWN) detected. Attempt $attempt of 10..."
done
# If the loop finishes without returning, we failed 10 times
echo "TICI (UNKNOWN) detected after 10 attempts, stop processing."
exit 1
fi
}
mount_nvme() {
for i in $(seq 1 10); do
[ -b /dev/nvme0n1p1 ] && break
sleep 1
done
# Returns 0 (success) so the boot process continues without errors
if [ ! -b /dev/nvme0n1p1 ]; then
return 0
fi
# We assume /data/media/0/realdata exists per defaults
if ! mountpoint -q /data/media/0/realdata; then
mount /dev/nvme0n1p1 /data/media/0/realdata
fi
if mountpoint -q /data/media/0/realdata; then
OWNER="$(stat -c '%U' /data/media/0/realdata)"
GROUP="$(stat -c '%G' /data/media/0/realdata)"
PERM="$(stat -c '%a' /data/media/0/realdata)"
if [ "$OWNER" != "comma" ] || [ "$GROUP" != "comma" ]; then
chown comma:comma /data/media/0/realdata
fi
if [ "$PERM" != "755" ]; then
chmod 755 /data/media/0/realdata
fi
fi
}
set_lite_hw() {
if grep -q "tici" /sys/firmware/devicetree/base/model 2>/dev/null; then
output=$(i2cget -y 0 0x10 0x00 2>/dev/null)
if [ -z "$output" ]; then
echo "Lite HW"
export LITE=1
fi
fi
}
function launch {
# Remove orphaned git lock if it exists on boot
[ -f "$DIR/.git/index.lock" ] && rm -f $DIR/.git/index.lock
@@ -72,6 +147,8 @@ function launch {
# hardware specific init
if [ -f /AGNOS ]; then
set_tici_hw
set_lite_hw
agnos_init
fi
+7 -7
View File
@@ -81,9 +81,9 @@ def can_fingerprint(can_recv: CanRecvCallable) -> tuple[str | None, dict[int, di
# **** for use live only ****
def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback,
cached_params: CarParamsT | None) -> tuple[str | None, dict, str, list[CarParams.CarFw], CarParams.FingerprintSource, bool]:
fixed_fingerprint = os.environ.get('FINGERPRINT', "")
def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int,
cached_params: CarParamsT | None, dp_fingerprint: str = "") -> tuple[str | None, dict, str, list[CarParams.CarFw], CarParams.FingerprintSource, bool]:
fixed_fingerprint = os.environ.get('FINGERPRINT', dp_fingerprint)
skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False)
ecu_rx_addrs = set()
@@ -103,8 +103,8 @@ def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_mu
set_obd_multiplexing(True)
# VIN query only reliably works through OBDII
vin_rx_addr, vin_rx_bus, vin = get_vin(can_recv, can_send, (0, 1))
ecu_rx_addrs = get_present_ecus(can_recv, can_send, set_obd_multiplexing)
car_fw = get_fw_versions_ordered(can_recv, can_send, set_obd_multiplexing, vin, ecu_rx_addrs)
ecu_rx_addrs = get_present_ecus(can_recv, can_send, set_obd_multiplexing, num_pandas=num_pandas)
car_fw = get_fw_versions_ordered(can_recv, can_send, set_obd_multiplexing, vin, ecu_rx_addrs, num_pandas=num_pandas)
cached = False
exact_fw_match, fw_candidates = match_fw_to_car(car_fw, vin)
@@ -149,8 +149,8 @@ def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_mu
def get_car(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, alpha_long_allowed: bool,
is_release: bool, dp_params: int = 0, cached_params: CarParamsT | None = None):
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(can_recv, can_send, set_obd_multiplexing, cached_params)
is_release: bool, num_pandas: int = 1, dp_params: int = 0, cached_params: CarParamsT | None = None, dp_fingerprint: str = ""):
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(can_recv, can_send, set_obd_multiplexing, num_pandas, cached_params, dp_fingerprint=dp_fingerprint)
if candidate is None:
carlog.error({"event": "car doesn't match any fingerprints", "fingerprints": repr(fingerprints)})
@@ -113,6 +113,9 @@ class CarState(CarStateBase):
*create_button_events(self.lc_button, prev_lc_button, {1: ButtonType.lkas}),
]
# dp - ALKA: direct tracking - lkas_on follows acc_main (cruiseState.available)
self.lkas_on = ret.cruiseState.available
return ret
@staticmethod
+1
View File
@@ -292,6 +292,7 @@ FW_QUERY_CONFIG = FwQueryConfig(
[StdQueries.TESTER_PRESENT_RESPONSE, StdQueries.MANUFACTURER_SOFTWARE_VERSION_RESPONSE],
whitelist_ecus=[Ecu.abs, Ecu.debug, Ecu.engine, Ecu.eps, Ecu.fwdCamera, Ecu.fwdRadar, Ecu.shiftByWire],
bus=0,
auxiliary=True,
),
*[Request(
[StdQueries.TESTER_PRESENT_REQUEST, ford_asbuilt_block_request(block_id)],
@@ -1,3 +1,4 @@
import copy
from dataclasses import dataclass, field
import struct
from collections.abc import Callable
@@ -89,6 +90,8 @@ class Request:
whitelist_ecus: list[Ecu] = field(default_factory=list)
rx_offset: int = 0x8
bus: int = 1
# Whether this query should be run on the first auxiliary panda (CAN FD cars for example)
auxiliary: bool = False
# FW responses from these queries will not be used for fingerprinting
logging: bool = False
# pandad toggles OBD multiplexing on/off as needed
@@ -127,6 +130,17 @@ class FwQueryConfig:
assert len(request_obj.request) == len(request_obj.response), ("Request and response lengths do not match: " +
f"{request_obj.request} vs. {request_obj.response}")
# No request on the OBD port (bus 1, multiplexed) should be run on an aux panda
assert not (request_obj.auxiliary and request_obj.bus == 1 and request_obj.obd_multiplexing), ("OBD multiplexed request should not " +
f"be marked auxiliary: {request_obj}")
# Add aux requests (second panda) for all requests that are marked as auxiliary
for i in range(len(self.requests)):
if self.requests[i].auxiliary:
new_request = copy.deepcopy(self.requests[i])
new_request.bus += 4
self.requests.append(new_request)
def get_all_ecus(self, offline_fw_versions: OfflineFwVersions,
include_extra_ecus: bool = True) -> set[EcuAddrSubAddr]:
# Add ecus in database + extra ecus
+12 -4
View File
@@ -170,13 +170,17 @@ def match_fw_to_car(fw_versions: list[CarParams.CarFw], vin: str, allow_exact: b
return True, set()
def get_present_ecus(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback) -> set[EcuAddrBusType]:
def get_present_ecus(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]:
# queries are split by OBD multiplexing mode
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []}
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []}
responses: set[EcuAddrBusType] = set()
for brand, config, r in REQUESTS:
# Skip query if no panda available
if r.bus > num_pandas * 4 - 1:
continue
for ecu_type, addr, sub_addr in config.get_all_ecus(VERSIONS[brand]):
# Only query ecus in whitelist if whitelist is not empty
if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus:
@@ -225,7 +229,7 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, list[b
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str,
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, progress: bool = False) -> list[CarParams.CarFw]:
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, progress: bool = False) -> list[CarParams.CarFw]:
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
all_car_fw = []
@@ -238,7 +242,7 @@ def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable
if True not in brand_matches[brand]:
continue
car_fw = get_fw_versions(can_recv, can_send, set_obd_multiplexing, query_brand=brand, timeout=timeout, progress=progress)
car_fw = get_fw_versions(can_recv, can_send, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, progress=progress)
all_car_fw.extend(car_fw)
# If there is a match using this brand's FW alone, finish querying early
@@ -250,7 +254,7 @@ def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str | None = None,
extra: OfflineFwVersions | None = None, timeout: float = 0.1, progress: bool = False) -> list[CarParams.CarFw]:
extra: OfflineFwVersions | None = None, timeout: float = 0.1, num_pandas: int = 1, progress: bool = False) -> list[CarParams.CarFw]:
versions = VERSIONS.copy()
if query_brand is not None:
@@ -287,6 +291,10 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob
for addr_group in tqdm(addrs, disable=not progress): # split by subaddr, if any
for addr_chunk in chunks(addr_group):
for brand, config, r in requests:
# Skip query if no panda available
if r.bus > num_pandas * 4 - 1:
continue
# Toggle OBD multiplexing for each request
if r.bus % 4 == 1:
set_obd_multiplexing(r.obd_multiplexing)
@@ -230,6 +230,9 @@ class CarState(CarStateBase):
*create_button_events(self.cruise_setting, prev_cruise_setting, SETTINGS_BUTTONS_DICT),
]
# dp - ALKA: direct tracking - lkas_on follows acc_main (cruiseState.available)
self.lkas_on = ret.cruiseState.available
return ret
def get_can_parsers(self, CP):
@@ -129,11 +129,11 @@ class CarController(CarControllerBase):
can_sends = []
# HUD messages
sys_warning, sys_state, left_lane_warning, right_lane_warning = process_hud_alert(CC.enabled, self.car_fingerprint,
sys_warning, sys_state, left_lane_warning, right_lane_warning = process_hud_alert(CC.latActive, self.car_fingerprint,
hud_control)
can_sends.append(hyundaican.create_lkas11(self.packer, self.frame, self.CP, apply_torque, apply_steer_req,
torque_fault, CS.lkas11, sys_warning, sys_state, CC.enabled,
torque_fault, CS.lkas11, sys_warning, sys_state, CC.latActive,
hud_control.leftLaneVisible, hud_control.rightLaneVisible,
left_lane_warning, right_lane_warning))
@@ -159,7 +159,7 @@ class CarController(CarControllerBase):
# 20 Hz LFA MFA message
if self.frame % 5 == 0 and self.CP.flags & HyundaiFlags.SEND_LFA.value:
can_sends.append(hyundaican.create_lfahda_mfc(self.packer, CC.enabled))
can_sends.append(hyundaican.create_lfahda_mfc(self.packer, CC.latActive))
# 5 Hz ACC options
if self.frame % 20 == 0 and self.CP.openpilotLongitudinalControl:
@@ -178,7 +178,7 @@ class CarController(CarControllerBase):
lka_steering_long = lka_steering and self.CP.openpilotLongitudinalControl
# steering control
can_sends.extend(hyundaicanfd.create_steering_messages(self.packer, self.CP, self.CAN, CC.enabled, apply_steer_req, apply_torque))
can_sends.extend(hyundaicanfd.create_steering_messages(self.packer, self.CP, self.CAN, CC.latActive, apply_steer_req, apply_torque))
# prevent LFA from activating on LKA steering cars by sending "no lane lines detected" to ADAS ECU
if self.frame % 5 == 0 and lka_steering:
@@ -187,7 +187,7 @@ class CarController(CarControllerBase):
# LFA and HDA icons
if self.frame % 5 == 0 and (not lka_steering or lka_steering_long):
can_sends.append(hyundaicanfd.create_lfahda_cluster(self.packer, self.CAN, CC.enabled))
can_sends.append(hyundaicanfd.create_lfahda_cluster(self.packer, self.CAN, CC.latActive))
# blinkers
if lka_steering and self.CP.flags & HyundaiFlags.CANFD_ENABLE_BLINKERS:
@@ -193,6 +193,9 @@ class CarState(CarStateBase):
*create_button_events(self.main_buttons[-1], prev_main_buttons, {1: ButtonType.mainCruise}),
*create_button_events(self.lda_button, prev_lda_button, {1: ButtonType.lkas})]
# dp - ALKA: direct tracking - lkas_on follows acc_main (cruiseState.available)
self.lkas_on = ret.cruiseState.available
ret.blockPcmEnable = not self.recent_button_interaction()
# low speed steer alert hysteresis logic (only for cars with steer cut off above 10 m/s)
@@ -291,6 +294,13 @@ class CarState(CarStateBase):
*create_button_events(self.main_buttons[-1], prev_main_buttons, {1: ButtonType.mainCruise}),
*create_button_events(self.lda_button, prev_lda_button, {1: ButtonType.lkas})]
# dp - ALKA: direct tracking - lkas_on follows acc_main
if not self.CP.openpilotLongitudinalControl:
cp_cruise_info = cp_cam if self.CP.flags & HyundaiFlags.CANFD_CAMERA_SCC else cp
self.lkas_on = cp_cruise_info.vl["SCC_CONTROL"]["MainMode_ACC"] == 1
else:
self.lkas_on = ret.cruiseState.available
ret.blockPcmEnable = not self.recent_button_interaction()
return ret
@@ -737,11 +737,13 @@ FW_QUERY_CONFIG = FwQueryConfig(
[HYUNDAI_VERSION_REQUEST_LONG],
[HYUNDAI_VERSION_RESPONSE],
bus=0,
auxiliary=True,
),
Request(
[HYUNDAI_VERSION_REQUEST_LONG],
[HYUNDAI_VERSION_RESPONSE],
bus=1,
auxiliary=True,
obd_multiplexing=False,
),
@@ -751,6 +753,7 @@ FW_QUERY_CONFIG = FwQueryConfig(
[HYUNDAI_ECU_MANUFACTURING_DATE],
[HYUNDAI_VERSION_RESPONSE],
bus=0,
auxiliary=True,
logging=True,
),
@@ -759,12 +762,14 @@ FW_QUERY_CONFIG = FwQueryConfig(
[HYUNDAI_VERSION_REQUEST_ALT],
[HYUNDAI_VERSION_RESPONSE],
bus=0,
auxiliary=True,
logging=True,
),
Request(
[HYUNDAI_VERSION_REQUEST_ALT],
[HYUNDAI_VERSION_RESPONSE],
bus=1,
auxiliary=True,
logging=True,
obd_multiplexing=False,
),
+3
View File
@@ -281,6 +281,9 @@ class CarStateBase(ABC):
self.cluster_min_speed = 0.0 # min speed before dropping to 0
self.secoc_key: bytes = b"00" * 16
# dp - ALKA: lkas_on state (mirrors panda's lkas_on for Python-panda sync)
self.lkas_on = False
Q = [[0.0, 0.0], [0.0, 100.0]]
R = 0.3
A = [[1.0, DT_CTRL], [0.0, 1.0]]
@@ -125,6 +125,9 @@ class CarState(CarStateBase):
*create_button_events(self.decel_button, prev_decel_button, {1: ButtonType.decelCruise}),
]
# dp - ALKA: use ACC main state
self.lkas_on = ret.cruiseState.available
return ret
@staticmethod
@@ -61,7 +61,8 @@ class CarController(CarControllerBase):
# Below are the HUD messages. We copy the stock message and modify
if self.CP.carFingerprint != CAR.NISSAN_ALTIMA:
if self.frame % 2 == 0:
can_sends.append(nissancan.create_lkas_hud_msg(self.packer, CS.lkas_hud_msg, CC.enabled, hud_control.leftLaneVisible, hud_control.rightLaneVisible,
# dp - ALKA: use latActive to show HUD when ALKA is active
can_sends.append(nissancan.create_lkas_hud_msg(self.packer, CS.lkas_hud_msg, CC.latActive, hud_control.leftLaneVisible, hud_control.rightLaneVisible,
hud_control.leftLaneDepart, hud_control.rightLaneDepart))
if self.frame % 50 == 0:
@@ -128,6 +128,9 @@ class CarState(CarStateBase):
ret.buttonEvents = create_button_events(self.distance_button, prev_distance_button, {1: ButtonType.gapAdjustCruise})
# dp - ALKA: use ACC main state
self.lkas_on = ret.cruiseState.available
return ret
@staticmethod
+5 -1
View File
@@ -1,7 +1,11 @@
import time
from contextlib import AbstractContextManager
from panda import Panda
import os
if "TICI_DOS" in os.environ:
from panda_tici import Panda
else:
from panda import Panda
from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData
from opendbc.car.structs import CarParams, CarControl
+162
View File
@@ -0,0 +1,162 @@
"""
Copyright (c) 2025, Rick Lan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, and/or sublicense,
for non-commercial purposes only, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
- Commercial use (e.g. use in a product, service, or activity intended to
generate revenue) is prohibited without explicit written permission from
the copyright holder.
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from opendbc.car.interfaces import RadarInterfaceBase
from opendbc.can.parser import CANParser
from opendbc.car.structs import RadarData
from typing import List, Tuple
# car head to radar
DREL_OFFSET = -1.52
# typically max lane width is 3.7m
LANE_WIDTH = 3.8
LANE_WIDTH_HALF = LANE_WIDTH/2
LANE_CENTER_MIN_LAT = 0.
LANE_CENTER_MAX_LAT = LANE_WIDTH_HALF
LANE_CENTER_MIN_DIST = 5.
LANE_SIDE_MIN_LAT = LANE_WIDTH_HALF
LANE_SIDE_MAX_LAT = LANE_WIDTH_HALF + LANE_WIDTH
LANE_SIDE_MIN_DIST = 10.
# lat distance, typically max lane width is 3.7m
MAX_LAT_DIST = 6.
# objects to ignore thats really close to the vehicle (after DREL_OFFSET applied)
MIN_DIST = 5.
# ignore oncoming objects
IGNORE_OBJ_STATE = 2
# ignore objects that we haven't seen for 5 secs
NOT_SEEN_INIT = 33
def _create_radar_parser():
return CANParser('u_radar', [("Status", float('nan')), ("ObjectData", float('nan'))], 1)
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)
self.updated_messages = set()
self.rcp = _create_radar_parser()
self._pts_cache = dict()
self._pts_not_seen = {key: 0 for key in range(255)}
self._should_clear_cache = False
# called by card.py, 100hz
def update(self, can_strings):
vls = self.rcp.update(can_strings)
self.updated_messages.update(vls)
if 1546 in self.updated_messages:
self._should_clear_cache = True
if 1547 in self.updated_messages:
all_objects = zip(
self.rcp.vl_all['ObjectData']['ID'],
self.rcp.vl_all['ObjectData']['DistLong'],
self.rcp.vl_all['ObjectData']['DistLat'],
self.rcp.vl_all['ObjectData']['VRelLong'],
self.rcp.vl_all['ObjectData']['VRelLat'],
self.rcp.vl_all['ObjectData']['DynProp'],
self.rcp.vl_all['ObjectData']['Class'],
self.rcp.vl_all['ObjectData']['RCS'],
)
# clean cache when we see a 0x60a then a 0x60b
if self._should_clear_cache:
self._pts_cache.clear()
self._should_clear_cache = False
for track_id, dist_long, dist_lat, vrel_long, vrel_lat, dyn_prop, obj_class, rcs in all_objects:
d_rel = dist_long + DREL_OFFSET
y_rel = -dist_lat
should_ignore = False
# ignore point (obj_class = 0)
if not should_ignore and int(obj_class) == 0:
should_ignore = True
# ignore oncoming objects
# @todo remove this because it's always 0 ?
if not should_ignore and int(dyn_prop) == IGNORE_OBJ_STATE:
should_ignore = True
# far away lane object, ignore
if not should_ignore and abs(y_rel) > LANE_SIDE_MAX_LAT:
should_ignore = True
# close object, ignore, use vision
if not should_ignore and LANE_CENTER_MIN_LAT > abs(y_rel) > LANE_CENTER_MAX_LAT and d_rel < LANE_CENTER_MIN_DIST:
should_ignore = True
# close object, ignore, use vision
if not should_ignore and LANE_SIDE_MIN_LAT > abs(y_rel) > LANE_SIDE_MAX_LAT and d_rel < LANE_SIDE_MIN_DIST:
should_ignore = True
if not should_ignore and track_id not in self._pts_cache:
self._pts_cache[track_id] = RadarData.RadarPoint()
self._pts_cache[track_id].trackId = track_id
if should_ignore:
self._pts_not_seen[track_id] = -1
else:
self._pts_not_seen[track_id] = NOT_SEEN_INIT
# init cache
if track_id not in self._pts_cache:
self._pts_cache[track_id] = RadarData.RadarPoint()
self._pts_cache[track_id].trackId = track_id
# add/update to cache
self._pts_cache[track_id].dRel = d_rel
self._pts_cache[track_id].yRel = y_rel
self._pts_cache[track_id].vRel = float(vrel_long)
self._pts_cache[track_id].yvRel = float('nan')
self._pts_cache[track_id].aRel = float('nan')
self._pts_cache[track_id].measured = True
self.updated_messages.clear()
# publish to cereal
if self.frame % 3 == 0:
keys_to_remove = [key for key in self.pts if key not in self._pts_cache]
for key in keys_to_remove:
self._pts_not_seen[key] -= 1
if self._pts_not_seen[key] <= 0:
del self.pts[key]
self.pts.update(self._pts_cache)
ret = RadarData()
if not self.rcp.can_valid:
ret.errors.canError = True
ret.points = list(self.pts.values())
return ret
return None
+2
View File
@@ -20,4 +20,6 @@ CarControlT = capnp.lib.capnp._StructModule
CarParamsT = capnp.lib.capnp._StructModule
class DPFlags:
LatALKA = 1
ExtRadar = 2
pass
@@ -98,7 +98,7 @@ class CarController(CarControllerBase):
can_sends.append(subarucan.create_es_dashstatus(self.packer, self.frame // 10, CS.es_dashstatus_msg, CC.enabled,
self.CP.openpilotLongitudinalControl, CC.longActive, hud_control.leadVisible))
can_sends.append(subarucan.create_es_lkas_state(self.packer, self.frame // 10, CS.es_lkas_state_msg, CC.enabled, hud_control.visualAlert,
can_sends.append(subarucan.create_es_lkas_state(self.packer, self.frame // 10, CS.es_lkas_state_msg, CC.latActive, hud_control.visualAlert,
hud_control.leftLaneVisible, hud_control.rightLaneVisible,
hud_control.leftLaneDepart, hud_control.rightLaneDepart))
@@ -137,6 +137,9 @@ class CarState(CarStateBase):
if self.CP.flags & SubaruFlags.SEND_INFOTAINMENT:
self.es_infotainment_msg = copy.copy(cp_cam.vl["ES_Infotainment"])
# dp - ALKA: use ACC main state
self.lkas_on = ret.cruiseState.available
return ret
@staticmethod
@@ -287,9 +287,10 @@ class CarController(CarControllerBase):
send_ui = True
if self.frame % 20 == 0 or send_ui:
# dp - ALKA: use lat_active to show HUD when ALKA is active
can_sends.append(toyotacan.create_ui_command(self.packer, steer_alert, pcm_cancel_cmd, hud_control.leftLaneVisible,
hud_control.rightLaneVisible, hud_control.leftLaneDepart,
hud_control.rightLaneDepart, CC.enabled, CS.lkas_hud))
hud_control.rightLaneDepart, lat_active, CS.lkas_hud))
if (self.frame % 100 == 0 or send_ui) and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
can_sends.append(toyotacan.create_fcw_command(self.packer, fcw_alert))
@@ -200,6 +200,10 @@ class CarState(CarStateBase):
buttonEvents += create_button_events(self.distance_button, prev_distance_button, {1: ButtonType.gapAdjustCruise})
ret.buttonEvents = buttonEvents
# dp - ALKA: Toyota requires main ON to use ACC/LKA, use main as switch
self.lkas_on = ret.cruiseState.available
return ret
@staticmethod
+4 -1
View File
@@ -3,7 +3,7 @@ from opendbc.car.toyota.carstate import CarState
from opendbc.car.toyota.carcontroller import CarController
from opendbc.car.toyota.radar_interface import RadarInterface
from opendbc.car.toyota.values import Ecu, CAR, DBC, ToyotaFlags, CarControllerParams, TSS2_CAR, RADAR_ACC_CAR, MIN_ACC_SPEED, \
EPS_SCALE, ANGLE_CONTROL_CAR, ToyotaSafetyFlags
EPS_SCALE, ANGLE_CONTROL_CAR, ToyotaSafetyFlags, UNSUPPORTED_DSU_CAR
from opendbc.car.disable_ecu import disable_ecu
from opendbc.car.interfaces import CarInterfaceBase
@@ -27,6 +27,9 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.toyota)]
ret.safetyConfigs[0].safetyParam = EPS_SCALE[candidate]
if candidate in UNSUPPORTED_DSU_CAR:
ret.safetyConfigs[0].safetyParam |= ToyotaSafetyFlags.UNSUPPORTED_DSU.value
# BRAKE_MODULE is on a different address for these cars
if DBC[candidate][Bus.pt] == "toyota_new_mc_pt_generated":
ret.safetyConfigs[0].safetyParam |= ToyotaSafetyFlags.ALT_BRAKE.value
@@ -56,6 +56,7 @@ class ToyotaSafetyFlags(IntFlag):
STOCK_LONGITUDINAL = (2 << 8)
LTA = (4 << 8)
SECOC = (8 << 8)
UNSUPPORTED_DSU = (16 << 8) # dp - use DSU_CRUISE (0x365) for ACC main instead of PCM_CRUISE_2 (0x1D3)
class ToyotaFlags(IntFlag):
@@ -137,6 +137,9 @@ class CarState(CarStateBase):
ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo)
# dp - ALKA: use ACC main state
self.lkas_on = ret.cruiseState.available
self.frame += 1
return ret
@@ -228,6 +231,9 @@ class CarState(CarStateBase):
ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo)
# dp - ALKA: use ACC main state
self.lkas_on = ret.cruiseState.available
self.frame += 1
return ret
+85
View File
@@ -0,0 +1,85 @@
VERSION ""
NS_ :
NS_DESC_
CM_
BA_DEF_
BA_
VAL_
CAT_DEF_
CAT_
FILTER
BA_DEF_DEF_
EV_DATA_
ENVVAR_DATA_
SGTYPE_
SGTYPE_VAL_
BA_DEF_SGTYPE_
BA_SGTYPE_
SIG_TYPE_REF_
VAL_TABLE_
SIG_GROUP_
SIG_VALTYPE_
SIGTYPE_VALTYPE_
BO_TX_BU_
BA_DEF_REL_
BA_REL_
BA_DEF_DEF_REL_
BU_SG_REL_
BU_EV_REL_
BU_BO_REL_
SG_MUL_VAL_
BS_:
BU_: RADAR
BO_ 513 RadarState: 8 XXX
SG_ NVMReadStatus : 6|1@0+ (1,0) [0|1] "" XXX
SG_ NVMWriteStatus : 7|1@0+ (1,0) [0|1] "" XXX
SG_ MaxDistanceCfg : 15|10@0+ (2,0) [0|2046] "m" XXX
SG_ RadarPowerCfg : 25|3@0+ (1,0) [0|7] "" XXX
SG_ SensorID : 34|3@0+ (1,0) [0|7] "" XXX
SG_ SortIndex : 38|3@0+ (1,0) [0|7] "" XXX
SG_ CtrlRelayCfg : 41|1@0+ (1,0) [0|1] "" XXX
SG_ OutputTypeCfg : 43|2@0+ (1,0) [0|3] "" XXX
SG_ QualityInfoCfg : 44|1@0+ (1,0) [0|1] "" XXX
SG_ ExtInfoCfg : 45|1@0+ (1,0) [0|1] "" XXX
SG_ CANBaudRate : 55|3@0+ (1,0) [0|7] "" XXX
SG_ InterfaceType : 57|2@0+ (1,0) [0|3] "" XXX
SG_ RCSThreshold : 58|3@1+ (1,0) [0|7] "" XXX
SG_ CalibrationEnabled : 63|2@0+ (1,0) [0|3] "" XXX
VAL_ 513 NVMReadStatus 0 "Failed" 1 "Successful";
VAL_ 513 NVMWriteStatus 0 "Failed" 1 "Successful";
VAL_ 513 RadarPowerCfg 0 "Standard" 1 "-3dB Gain" 2 "-6dB Gain" 3 "-9dB Gain";
VAL_ 513 SortIndex 0 "No Sorting" 1 "Sort By Range" 2 "Sort By RCS";
VAL_ 513 CtrlRelayCfg 0 "Off" 1 "On";
VAL_ 513 OutputTypeCfg 0 "None" 1 "Objects" 2 "Clusters";
VAL_ 513 QualityInfoCfg 0 "Off" 1 "On";
VAL_ 513 ExtInfoCfg 0 "Off" 1 "On";
VAL_ 513 CANBaudRate 0 "500K" 1 "250K" 2 "1M";
VAL_ 513 RCSThreshold 0 "Standard" 1 "High Sensitivity";
VAL_ 513 CalibrationEnabled 1 "Enabled" 2 "Initial Recovery";
BO_ 1546 Status: 8 RADAR
SG_ NoOfObjects : 7|8@0+ (1,0) [0|255] "" XXX
SG_ MeasCount : 15|16@0+ (1,0) [0|65535] "" XXX
SG_ InterfaceVersion : 31|4@0+ (1,0) [0|15] "" XXX
BO_ 1547 ObjectData: 8 RADAR
SG_ ID : 7|8@0+ (1,0) [0|255] "" XXX
SG_ DistLong : 15|13@0+ (0.2,-500) [-500|1138.2] "m" XXX
SG_ DistLat : 18|11@0+ (0.2,-204.6) [-204.6|204.8] "m" XXX
SG_ VRelLong : 39|10@0+ (0.25,-128) [-128|127.75] "m/s" XXX
SG_ VRelLat : 45|9@0+ (0.25,-64) [-64|63.75] "m/s" XXX
SG_ DynProp : 50|3@0+ (1,0) [0|7] "" XXX
SG_ Class : 52|2@0+ (1,0) [0|3] "" XXX
SG_ RCS : 63|8@0+ (0.5,-64) [-64|63.75] "dBm2" XXX
CM_ BO_ 1547 "Object detection and tracking information";
VAL_ 1547 DynProp 0 "moving" 1 "stationary" 2 "oncoming" 3 "crossing_left" 4 "crossing_right" 5 "unknown" 6 "stopped";
VAL_ 1547 Class 0 "point" 1 "vehicle";
+1
View File
@@ -8,3 +8,4 @@ class ALTERNATIVE_EXPERIENCE:
DISABLE_STOCK_AEB = 2
RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8
ALLOW_AEB = 16
ALKA = 1024 # dp - ALKA (Always-on Lane Keeping Assist)
+6 -1
View File
@@ -3,7 +3,12 @@
static const unsigned char dlc_to_len[] = {0U, 1U, 2U, 3U, 4U, 5U, 6U, 7U, 8U, 12U, 16U, 20U, 24U, 32U, 48U, 64U};
#define CANPACKET_HEAD_SIZE 6U // non-data portion of CANPacket_t
#define CANPACKET_DATA_SIZE_MAX 64U
#ifdef CANFD
#define CANPACKET_DATA_SIZE_MAX 64U
#else
#define CANPACKET_DATA_SIZE_MAX 8U
#endif
typedef struct {
unsigned char fd : 1;

Some files were not shown because too many files have changed in this diff Show More