Compare commits

..

20 Commits

Author SHA1 Message Date
royjr 1e8d692804 Merge branch 'master' into visuals-radar-tracks 2026-06-09 00:13:12 -04:00
royjr 5d5dc6c50c yaml 2026-06-09 00:12:06 -04:00
royjr 5c64fe0695 Merge branch 'master' into visuals-radar-tracks 2026-06-08 23:19:24 -04:00
royjr 37eeb121cd Merge branch 'master' into visuals-radar-tracks 2026-03-27 16:27:10 -07:00
royjr 19a5202f4c Merge branch 'master' into visuals-radar-tracks 2026-03-02 02:44:58 -05:00
royjr 920ef49b32 no return 2026-02-05 01:01:07 -05:00
royjr c7a37ca89e Merge branch 'master' into visuals-radar-tracks 2026-02-05 00:52:12 -05:00
royjr 45ca076aef Merge branch 'master' into visuals-radar-tracks 2025-12-31 15:26:12 -05:00
royjr 70cf5e6d52 Update params_metadata.json 2025-12-29 12:03:57 -05:00
royjr f4c71e4918 lint 2025-12-29 10:05:17 -05:00
royjr 8408241495 Update toggles.py 2025-12-29 02:23:10 -05:00
royjr 640988673e mici toggle 2025-12-29 02:22:16 -05:00
royjr 1429975f1e Revert "force enable for now"
This reverts commit efd499448a.
2025-12-29 02:22:01 -05:00
royjr efd499448a force enable for now 2025-12-29 00:42:38 -05:00
royjr f7e636f089 use params 2025-12-29 00:42:19 -05:00
royjr 00187dc59e use checks 2025-12-29 00:37:19 -05:00
royjr cde6368cf7 cleaner 2025-12-29 00:30:15 -05:00
royjr 08b56f9b33 better sp 2025-12-29 00:28:12 -05:00
royjr 78efe3a476 init sp 2025-12-29 00:21:08 -05:00
royjr 54578675c2 init 2025-12-29 00:19:17 -05:00
24 changed files with 381 additions and 115 deletions
+1 -30
View File
@@ -1,34 +1,5 @@
sunnypilot Version 2026.002.000 (2026-06-28)
sunnypilot Version 2026.002.000 (2026-xx-xx)
========================
* What's Changed (sunnypilot/sunnypilot)
* ui: update gates for certain toggles by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1830
* release: ignore upstream IsReleaseBranch by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1831
* manager: disable DEVELOPMENT_ONLY reset by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1833
* sunnylink: fix max time offroad values by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1835
* ui: show default model name by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1837
* sunnylink: add CarParams fallback for brand-specific capabilities by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1839
* sunnylink SDUI: tweak DisableUpdate param for clarity by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1842
* Revert "DM: Lancia Delta HF Integrale model" by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1849
* modeld_v2: safe model validation by @Discountchubbs in https://github.com/sunnypilot/sunnypilot/pull/1855
* Revert "deprecate `carState.brake`" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/sunnypilot/pull/1860
* sunnylink: deprecate legacy params metadata by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1862
* ui: reset Enforce Torque Control and NNLC if both are enabled by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1863
* What's Changed (sunnypilot/opendbc)
* Rivian: suppress ACM hold-the-wheel warning during MADS-only lateral by @lukasloetkolben in https://github.com/sunnypilot/opendbc/pull/465
* Sync: `commaai/opendbc:master``sunnypilot/opendbc:master` by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/479
* safety: add option to ignore frequency check for RX checks by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/480
* Revert "deprecate carState.brake" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/opendbc/pull/481
* New Contributors (sunnypilot/sunnypilot)
* @mvl-boston made their first contribution in https://github.com/sunnypilot/sunnypilot/pull/1860
* Full Changelog: https://github.com/sunnypilot/sunnypilot/compare/v2026.001.007...v2026.002.000
************************
* Synced with commaai's openpilot (v0.11.1)
* master commit 69e2c321e49760e52f7983eaa0a5f77cb95de637 (June 02, 2026)
* New driver monitoring model
* Improved image processing pipeline for driver camera
* Improved thermal policy for comma four
* Acura MDX 2022-24 support thanks to mvl-boston!
* Rivian R1S and R1T 2025 support thanks to lukasloetkolben!
sunnypilot Version 2026.001.000 (2026-05-06)
========================
+2
View File
@@ -80,6 +80,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
{"LiveParameters", {PERSISTENT, JSON}},
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
@@ -177,6 +178,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"OnroadUploads", {PERSISTENT | BACKUP, BOOL, "1"}},
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RadarTracks", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
@@ -21,8 +21,10 @@ class TogglesLayoutMici(NavScroller):
record_front = BigParamControl("record & upload driver camera", "RecordFront", toggle_callback=restart_needed_callback)
record_mic = BigParamControl("record & upload mic audio", "RecordAudio", toggle_callback=restart_needed_callback)
enable_openpilot = BigParamControl("enable sunnypilot", "OpenpilotEnabledToggle", toggle_callback=restart_needed_callback)
radar_tracks = BigParamControl("radar tracks", "RadarTracks")
self._scroller.add_widgets([
radar_tracks,
self._personality_toggle,
self._experimental_btn,
is_metric_toggle,
@@ -35,6 +37,7 @@ class TogglesLayoutMici(NavScroller):
# Toggle lists
self._refresh_toggles = (
("RadarTracks", radar_tracks),
("ExperimentalMode", self._experimental_btn),
("IsMetric", is_metric_toggle),
("IsLdwEnabled", ldw_toggle),
@@ -153,6 +153,9 @@ class ModelRenderer(Widget, ModelRendererSP):
self._draw_lane_lines()
self._draw_path(sm)
if ui_state.radar_tracks and sm.valid['liveTracks'] and sm.recv_frame['liveTracks'] >= ui_state.started_frame:
self.radar_tracks.draw_radar_tracks(sm['liveTracks'], self._map_to_screen, self._path_offset_z, track_size=3)
# if render_lead_indicator and radar_state:
# self._draw_lead_indicator()
+3
View File
@@ -135,6 +135,9 @@ class ModelRenderer(Widget, ChevronMetrics, ModelRendererSP):
self._draw_lane_lines()
self._draw_path(sm)
if ui_state.radar_tracks and sm.valid['liveTracks'] and sm.recv_frame['liveTracks'] >= ui_state.started_frame:
self.radar_tracks.draw_radar_tracks(sm['liveTracks'], self._map_to_screen, self._path_offset_z)
if render_lead_indicator and radar_state:
self._draw_lead_indicator()
self.chevron_metrics.draw_lead_status(sm, radar_state, self._rect, self._lead_vehicles)
@@ -134,11 +134,6 @@ class SteeringLayout(Widget):
enforce_torque_enabled = self._torque_control_toggle.action_item.get_state()
nnlc_enabled = self._nnlc_toggle.action_item.get_state()
if enforce_torque_enabled and nnlc_enabled:
self._torque_control_toggle.action_item.set_state(False)
self._nnlc_toggle.action_item.set_state(False)
enforce_torque_enabled = False
nnlc_enabled = False
self._nnlc_toggle.action_item.set_enabled(ui_state.is_offroad() and torque_allowed and not enforce_torque_enabled)
self._torque_control_toggle.action_item.set_enabled(ui_state.is_offroad() and torque_allowed and not nnlc_enabled)
self._torque_customization_button.action_item.set_enabled(self._torque_control_toggle.action_item.get_state())
@@ -6,9 +6,12 @@ See the LICENSE.md file in the root directory for more details.
"""
from openpilot.selfdrive.ui.sunnypilot.onroad.chevron_metrics import ChevronMetrics
from openpilot.selfdrive.ui.sunnypilot.onroad.rainbow_path import RainbowPath
from openpilot.selfdrive.ui.sunnypilot.onroad.radar_tracks import RadarTracks
class ModelRendererSP:
def __init__(self):
self.rainbow_path = RainbowPath()
self.chevron_metrics = ChevronMetrics()
self.radar_tracks = RadarTracks()
@@ -0,0 +1,23 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import math
import pyray as rl
class RadarTracks:
def draw_radar_tracks(self, live_tracks, map_to_screen, path_offset_z, track_size=6):
for track in live_tracks.points:
d_rel, y_rel, v_rel, a_rel = track.dRel, track.yRel, track.vRel, track.aRel
if not (math.isfinite(d_rel) and math.isfinite(y_rel) and math.isfinite(v_rel) and math.isfinite(a_rel)):
continue
pt = map_to_screen(d_rel, -y_rel, path_offset_z)
if pt is None:
continue
x, y = pt
rl.draw_circle(int(x), int(y), track_size, rl.Color(0, 255, 64, 255))
+1 -4
View File
@@ -169,6 +169,7 @@ class UIStateSP:
self.turn_signals = self.params.get_bool("ShowTurnSignals")
self.boot_offroad_mode = self.params.get("DeviceBootMode", return_default=True)
self.always_offroad = self.params.get_bool("OffroadMode")
self.radar_tracks = self.params.get_bool("RadarTracks")
if not self._sp_initialized:
self._sp_initialized = True
@@ -179,10 +180,6 @@ class UIStateSP:
CP = self.CP
if CP is not None:
if self.params.get_bool("EnforceTorqueControl") and self.params.get_bool("NeuralNetworkLateralControl"):
self.params.put_bool("EnforceTorqueControl", False, block=True)
self.params.put_bool("NeuralNetworkLateralControl", False, block=True)
# Angle steering: no torque-based lateral controls
if CP.steerControlType == car.CarParams.SteerControlType.angle:
self.params.remove("EnforceTorqueControl")
+1
View File
@@ -63,6 +63,7 @@ class UIState(UIStateSP):
"liveParameters",
"testJoystick",
"rawAudioData",
"liveTracks",
] + self.sm_services_ext
)
+6
View File
@@ -1296,6 +1296,12 @@
"title": "Display Turn Signals",
"description": "When enabled, visual turn indicators are drawn on the HUD."
},
{
"key": "RadarTracks",
"widget": "toggle",
"title": "Radar Tracks",
"description": "Show radar tracks"
},
{
"key": "RoadNameToggle",
"widget": "toggle",
@@ -24,6 +24,10 @@ sections:
widget: toggle
title: Display Turn Signals
description: When enabled, visual turn indicators are drawn on the HUD.
- key: RadarTracks
widget: toggle
title: Radar Tracks
description: Show radar tracks
- key: RoadNameToggle
widget: toggle
title: Display Road Name
+41 -1
View File
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
create_connection)
import cereal.messaging as messaging
from cereal import log
from cereal import car, log
from cereal.services import SERVICE_LIST
from openpilot.common.api import Api, get_key_pair
from openpilot.common.utils import CallbackReader, get_upload_stream
@@ -45,6 +45,7 @@ from openpilot.system.hardware.hw import Paths
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {22, } # SSH
WEBRTCD_PORT = 5001
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
@@ -567,6 +568,16 @@ def getSshAuthorizedKeys() -> str:
def getGithubUsername() -> str:
return cast(str, Params().get("GithubUsername") or "")
@dispatcher.add_method
def getNotCar() -> bool:
cp_bytes = Params().get("CarParamsPersistent")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
return CP.notCar
return False
@dispatcher.add_method
def getSimInfo():
return HARDWARE.get_sim_info()
@@ -588,6 +599,35 @@ def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def startStream(sdp: str) -> dict:
from openpilot.system.webrtc.webrtcd import StreamRequestBody
bridge_services_in = []
# get live car params to avoid stale notCar edge case
cp_bytes = Params().get("CarParams")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
if CP.notCar:
bridge_services_in.append("testJoystick")
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
try:
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
json=asdict(body), timeout=10)
if not resp.ok:
try:
error_body = resp.json()
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
except ValueError:
resp.raise_for_status()
return resp.json()
except requests.ConnectTimeout as e:
raise Exception("webrtc took too long to respond. is it on?") from e
except requests.ConnectionError as e:
raise Exception("webrtc is not running. turn on comma body ignition.") from e
@dispatcher.add_method
def takeSnapshot() -> str | dict[str, str] | None:
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
-3
View File
@@ -313,9 +313,6 @@ class Tici(HardwareBase):
continue
gov = 'ondemand' if powersave_enabled else 'performance'
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
if not powersave_enabled:
# cap max core freq to 1689 Mhz
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
# *** IRQ config ***
+1
View File
@@ -26,6 +26,7 @@ public:
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
virtual void encoder_open() = 0;
virtual void encoder_close() = 0;
virtual void set_bitrate(int bitrate) = 0;
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
+4
View File
@@ -72,6 +72,10 @@ void FfmpegEncoder::encoder_close() {
is_open = false;
}
void FfmpegEncoder::set_bitrate(int bitrate) {
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
}
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
assert(buf->width == this->in_width);
assert(buf->height == this->in_height);
+1
View File
@@ -21,6 +21,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int segment_num = -1;
+21
View File
@@ -155,6 +155,8 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
current_bitrate = encoder_settings.bitrate;
adaptive_bitrate = encoder_info.adaptive_bitrate;
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
struct v4l2_format fmt_out = {
@@ -304,6 +306,25 @@ void V4LEncoder::encoder_close() {
this->is_open = false;
}
void V4LEncoder::set_bitrate(int bitrate) {
if (!adaptive_bitrate || bitrate == current_bitrate) return;
if (bitrate <= 0) {
LOGE("invalid livestream encoder bitrate %d", bitrate);
return;
}
struct v4l2_control ctrl = {
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
.value = bitrate,
};
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
return;
}
current_bitrate = bitrate;
}
V4LEncoder::~V4LEncoder() {
encoder_close();
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
+3
View File
@@ -13,6 +13,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int fd;
@@ -20,6 +21,8 @@ private:
bool is_open = false;
int segment_num = -1;
int counter = 0;
int current_bitrate = -1;
bool adaptive_bitrate;
SafeQueue<VisionIpcBufExtra> extras;
+15
View File
@@ -44,11 +44,24 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
}
}
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
static Params params;
std::string val = params.get("LivestreamEncoderBitrate");
if (val.empty()) return;
int bitrate = std::stoi(val);
for (auto &e : encoders) {
e->set_bitrate(bitrate);
}
}
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
util::set_thread_name(cam_info.thread_name);
std::vector<std::unique_ptr<Encoder>> encoders;
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
[](const auto &ei) { return ei.adaptive_bitrate; });
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
std::unique_ptr<JpegEncoder> jpeg_encoder;
@@ -108,6 +121,8 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
++cur_seg;
}
if (has_adaptive) apply_bitrate(encoders);
// encode a frame
for (int i = 0; i < encoders.size(); ++i) {
int out_id = encoders[i]->encode_frame(buf, &extra);
+9 -5
View File
@@ -47,8 +47,8 @@ struct EncoderSettings {
}
static EncoderSettings StreamEncoderSettings() {
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
}
};
@@ -59,6 +59,7 @@ public:
const char *filename = NULL;
bool record = true;
bool include_audio = false;
bool adaptive_bitrate = false;
int frame_width = -1;
int frame_height = -1;
int fps = MAIN_FPS;
@@ -104,6 +105,7 @@ const EncoderInfo stream_road_encoder_info = {
.publish_name = "livestreamRoadEncodeData",
//.thumbnail_name = "thumbnail",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
};
@@ -111,6 +113,7 @@ const EncoderInfo stream_road_encoder_info = {
const EncoderInfo stream_wide_road_encoder_info = {
.publish_name = "livestreamWideRoadEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
};
@@ -118,6 +121,7 @@ const EncoderInfo stream_wide_road_encoder_info = {
const EncoderInfo stream_driver_encoder_info = {
.publish_name = "livestreamDriverEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
};
@@ -153,19 +157,19 @@ const LogCameraInfo driver_camera_info{
const LogCameraInfo stream_road_camera_info{
.thread_name = "road_cam_encoder",
.stream_type = VISION_STREAM_ROAD,
.encoder_infos = {stream_road_encoder_info}
.encoder_infos = {stream_road_encoder_info},
};
const LogCameraInfo stream_wide_road_camera_info{
.thread_name = "wide_road_cam_encoder",
.stream_type = VISION_STREAM_WIDE_ROAD,
.encoder_infos = {stream_wide_road_encoder_info}
.encoder_infos = {stream_wide_road_encoder_info},
};
const LogCameraInfo stream_driver_camera_info{
.thread_name = "driver_cam_encoder",
.stream_type = VISION_STREAM_DRIVER,
.encoder_infos = {stream_driver_encoder_info}
.encoder_infos = {stream_driver_encoder_info},
};
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
+31 -4
View File
@@ -1,4 +1,5 @@
import asyncio
import struct
import time
import av
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
from cereal import messaging
from openpilot.common.realtime import DT_MDL, DT_DMON
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
TIMING_SEI_UUID = bytes([
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
])
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
camera_to_sock_mapping = {
@@ -19,9 +27,30 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
dt = DT_DMON if camera_type == "driver" else DT_MDL
super().__init__(camera_type, dt)
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
self._sock = self._make_sock(camera_type)
self._pts = 0
self._t0_ns = time.monotonic_ns()
self.timing_sei_enabled = False
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
def switch_camera(self, camera_type: str) -> None:
self._sock = self._make_sock(camera_type)
def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
return encode_data.header + encode_data.data
idx = encode_data.idx
sei_nal = _SEI_PREFIX + struct.pack('>4d',
(idx.timestampEof - idx.timestampSof) / 1e6,
(msg.logMonoTime - idx.timestampEof) / 1e6,
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
time.time() * 1000, # noqa: TID251
) + b'\x80'
return encode_data.header + sei_nal + encode_data.data
async def recv(self):
while True:
@@ -30,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
break
await asyncio.sleep(0.005)
evta = getattr(msg, msg.which())
packet = av.Packet(evta.header + evta.data)
packet = av.Packet(self._build_frame_data(msg))
packet.time_base = self._time_base
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
+204 -62
View File
@@ -1,7 +1,11 @@
#!/usr/bin/env python3
from abc import abstractmethod
import os
import time
import argparse
import asyncio
import contextlib
import json
import uuid
import logging
@@ -19,11 +23,39 @@ if TYPE_CHECKING:
from aiortc.rtcdatachannel import RTCDataChannel
from openpilot.system.webrtc.schema import generate_field
from openpilot.common.params import Params
from cereal import messaging, log
class CerealOutgoingMessageProxy:
class AsyncTaskRunner:
def __init__(self):
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
async def stop(self):
if self.task is None:
return
task = self.task
self.task = None
if task.done():
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
@abstractmethod
async def run(self):
pass
class CerealOutgoingMessageProxy(AsyncTaskRunner):
def __init__(self, sm: messaging.SubMaster):
super().__init__()
self.sm = sm
self.channels: list[RTCDataChannel] = []
@@ -55,6 +87,19 @@ class CerealOutgoingMessageProxy:
for channel in self.channels:
channel.send(encoded_msg)
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class CerealIncomingMessageProxy:
def __init__(self, pm: messaging.PubMaster):
@@ -72,37 +117,6 @@ class CerealIncomingMessageProxy:
self.pm.send(msg_type, msg)
class CerealProxyRunner:
def __init__(self, proxy: CerealOutgoingMessageProxy):
self.proxy = proxy
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
def stop(self):
if self.task is None or self.task.done():
return
self.task.cancel()
self.task = None
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.proxy.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class DynamicPubMaster(messaging.PubMaster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -115,21 +129,90 @@ class DynamicPubMaster(messaging.PubMaster):
self.sock[service] = messaging.pub_sock(service)
class LivestreamBitrateController(AsyncTaskRunner):
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
sample_interval = 0.2
high_level = 0.1 # drop immediately
med_level = 0.05 # drop after # of samples
low_level = 0 # raise after # of samples
down_samples = 5 # 1s
param_name = "LivestreamEncoderBitrate"
def __init__(self, peer_connection: Any):
super().__init__()
self.pc = peer_connection
self.params = Params()
self.level = 0
self.prev_lost, self.prev_sent = None, None
self.counter = 0
self.up_samples = 5 # 1s
self._auto = True
async def run(self):
while True:
await asyncio.sleep(self.sample_interval)
if not self._auto:
continue
loss_rate = await self._sample()
if loss_rate is None:
continue
if loss_rate >= self.med_level and self.level > 0:
self.counter += 1
if self.counter >= self.down_samples or loss_rate >= self.high_level:
self.level -= 1
self.up_samples *= 2 # exponential backoff before raising again
self.counter = 0
self._publish(self.bitrates[self.level])
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
self.counter -= 1
if -self.counter >= self.up_samples:
self.level += 1
self.counter = 0
self._publish(self.bitrates[self.level])
async def _sample(self) -> float | None:
report = await self.pc.getStats()
packets_lost = packets_sent = 0
for s in report.values():
if s.type == "remote-inbound-rtp":
packets_lost += s.packetsLost
elif s.type == "outbound-rtp":
packets_sent += s.packetsSent
if self.prev_lost is None:
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return None
lost_delta = max(0, packets_lost - self.prev_lost)
sent_delta = max(0, packets_sent - self.prev_sent)
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return lost_delta / sent_delta if sent_delta else 0.0
def _publish(self, bitrate: float):
self.params.put(self.param_name, bitrate)
def set_quality(self, quality):
if quality in self.label_to_bitrate:
self._publish(self.label_to_bitrate[quality])
self._auto = False
elif quality == "auto":
self._auto = True
class StreamSession:
shared_pub_master = DynamicPubMaster([])
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
from aiortc.mediastreams import VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc import WebRTCAnswerBuilder
from teleoprtc.info import parse_info_from_offer
config = parse_info_from_offer(sdp)
builder = WebRTCAnswerBuilder(sdp)
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
for cam in cameras:
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
builder.add_video_stream(init_camera, self.video_track)
self.stream = builder.stream()
self.identifier = str(uuid.uuid4())
@@ -137,35 +220,60 @@ class StreamSession:
self.incoming_bridge: CerealIncomingMessageProxy | None = None
self.incoming_bridge_services = incoming_services
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
self.outgoing_bridge_runner: CerealProxyRunner | None = None
self.bitrate_controller: LivestreamBitrateController | None = None
if len(incoming_services) > 0:
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
if len(outgoing_services) > 0:
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
self.run_task: asyncio.Task | None = None
self._cleanup_lock = asyncio.Lock()
self._cleanup_done = False
self.logger = logging.getLogger("webrtcd")
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
self.identifier, cameras, incoming_services, outgoing_services)
self.logger.info(
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
self.identifier, init_camera, incoming_services, outgoing_services,
)
def start(self):
self.run_task = asyncio.create_task(self.run())
def stop(self):
if self.run_task.done():
return
self.run_task.cancel()
async def stop(self):
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
self.run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.run_task
self.run_task = None
asyncio.run(self.post_run_cleanup())
await self.post_run_cleanup()
async def get_answer(self):
return await self.stream.start()
async def message_handler(self, message: bytes):
def message_handler(self, message: bytes):
assert self.incoming_bridge is not None
try:
self.incoming_bridge.send(message)
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
if isinstance(payload, dict):
msg_type = payload.get("type")
match msg_type:
case "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
case "livestreamSettings":
self.bitrate_controller.set_quality(payload["data"]["quality"])
case "clockSync":
pong = json.dumps({"type": "clockSync", "data": {
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
}})
self.stream.get_messaging_channel().send(pong)
case "enableTimingSei":
if hasattr(self.video_track, 'timing_sei_enabled'):
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
case _:
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
except Exception:
self.logger.exception("Cereal incoming proxy failure")
@@ -176,29 +284,38 @@ class StreamSession:
if self.incoming_bridge is not None:
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
self.stream.set_message_handler(self.message_handler)
if self.outgoing_bridge_runner is not None:
if self.outgoing_bridge is not None:
channel = self.stream.get_messaging_channel()
self.outgoing_bridge_runner.proxy.add_channel(channel)
self.outgoing_bridge_runner.start()
self.outgoing_bridge.add_channel(channel)
self.outgoing_bridge.start()
self.bitrate_controller.start()
self.logger.info("Stream session (%s) connected", self.identifier)
await self.stream.wait_for_disconnection()
await self.post_run_cleanup()
self.logger.info("Stream session (%s) ended", self.identifier)
except Exception:
self.logger.exception("Stream session failure")
finally:
await self.post_run_cleanup()
async def post_run_cleanup(self):
await self.stream.stop()
if self.outgoing_bridge is not None:
self.outgoing_bridge_runner.stop()
async with self._cleanup_lock:
if self._cleanup_done:
return
self._cleanup_done = True
if self.bitrate_controller is not None:
await self.bitrate_controller.stop()
if self.outgoing_bridge is not None:
await self.outgoing_bridge.stop()
await self.stream.stop()
@dataclass
class StreamRequestBody:
sdp: str
cameras: list[str]
initCamera: str
bridge_services_in: list[str] = field(default_factory=list)
bridge_services_out: list[str] = field(default_factory=list)
@@ -208,11 +325,33 @@ async def get_stream(request: 'web.Request'):
raw_body = await request.json()
body = StreamRequestBody(**raw_body)
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
answer = await session.get_answer()
session.start()
async with request.app['stream_lock']:
# Fully disconnect any other active stream before starting the replacement.
for sid, s in list(stream_dict.items()):
if s.run_task and not s.run_task.done():
try:
ch = s.stream.get_messaging_channel()
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
except Exception:
pass
await s.stop()
del stream_dict[sid]
stream_dict[session.identifier] = session
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
try:
answer = await session.get_answer()
except ValueError as e:
await session.stop()
raise web.HTTPBadRequest(
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
content_type="application/json",
) from e
except Exception:
await session.stop()
raise
session.start()
stream_dict[session.identifier] = session
return web.json_response({"sdp": answer.sdp, "type": answer.type})
@@ -224,6 +363,7 @@ async def get_schema(request: 'web.Request'):
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
return web.json_response(schema_dict)
async def post_notify(request: 'web.Request'):
try:
payload = await request.json()
@@ -239,9 +379,10 @@ async def post_notify(request: 'web.Request'):
return web.Response(status=200, text="OK")
async def on_shutdown(app: 'web.Application'):
for session in app['streams'].values():
session.stop()
await session.stop()
del app['streams']
@@ -254,6 +395,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
app = web.Application()
app['streams'] = dict()
app['stream_lock'] = asyncio.Lock()
app['debug'] = debug
app.on_shutdown.append(on_shutdown)
app.router.add_post("/stream", get_stream)
+1 -1
View File
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
async def offer(request: 'web.Request'):
params = await request.json()
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
body_json = json.dumps(dataclasses.asdict(body))
logger.info("Sending offer to webrtcd...")