Compare commits

..

14 Commits

Author SHA1 Message Date
royjr fb724b9612 Merge branch 'master' into developer-panel-external-storage 2026-06-09 11:44:46 -04:00
royjr 71e4f251d2 Merge branch 'master' into developer-panel-external-storage 2026-03-02 02:42:38 -05:00
royjr dbefa8afbd Merge branch 'master' into developer-panel-external-storage 2026-02-20 22:29:35 -05:00
royjr fb54689300 Merge branch 'master' into developer-panel-external-storage 2025-12-29 09:57:43 -05:00
royjr db8e56687f Update developer.py 2025-12-21 17:00:44 -05:00
royjr 88e5e3d23d Update developer.py 2025-12-21 16:55:16 -05:00
royjr 0b00470999 Merge branch 'master' into developer-panel-external-storage 2025-12-21 16:51:48 -05:00
royjr 65dcbf698e lint 2025-11-27 22:12:18 -05:00
royjr ac99ce017c cleanup 2025-11-27 22:07:03 -05:00
royjr 508abb227c sudo 2025-11-27 22:04:41 -05:00
royjr b609622398 init 2025-11-27 21:38:18 -05:00
discountchubbs c9f2756264 double translate 2025-11-27 12:05:07 -08:00
discountchubbs 3580656d78 comment out 2025-11-27 11:57:03 -08:00
discountchubbs f973b7fdcb ui: developer panel 2025-11-27 11:53:30 -08:00
16 changed files with 600 additions and 107 deletions
+1 -30
View File
@@ -1,34 +1,5 @@
sunnypilot Version 2026.002.000 (2026-06-28)
sunnypilot Version 2026.002.000 (2026-xx-xx)
========================
* What's Changed (sunnypilot/sunnypilot)
* ui: update gates for certain toggles by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1830
* release: ignore upstream IsReleaseBranch by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1831
* manager: disable DEVELOPMENT_ONLY reset by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1833
* sunnylink: fix max time offroad values by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1835
* ui: show default model name by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1837
* sunnylink: add CarParams fallback for brand-specific capabilities by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1839
* sunnylink SDUI: tweak DisableUpdate param for clarity by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1842
* Revert "DM: Lancia Delta HF Integrale model" by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1849
* modeld_v2: safe model validation by @Discountchubbs in https://github.com/sunnypilot/sunnypilot/pull/1855
* Revert "deprecate `carState.brake`" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/sunnypilot/pull/1860
* sunnylink: deprecate legacy params metadata by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1862
* ui: reset Enforce Torque Control and NNLC if both are enabled by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1863
* What's Changed (sunnypilot/opendbc)
* Rivian: suppress ACM hold-the-wheel warning during MADS-only lateral by @lukasloetkolben in https://github.com/sunnypilot/opendbc/pull/465
* Sync: `commaai/opendbc:master``sunnypilot/opendbc:master` by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/479
* safety: add option to ignore frequency check for RX checks by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/480
* Revert "deprecate carState.brake" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/opendbc/pull/481
* New Contributors (sunnypilot/sunnypilot)
* @mvl-boston made their first contribution in https://github.com/sunnypilot/sunnypilot/pull/1860
* Full Changelog: https://github.com/sunnypilot/sunnypilot/compare/v2026.001.007...v2026.002.000
************************
* Synced with commaai's openpilot (v0.11.1)
* master commit 69e2c321e49760e52f7983eaa0a5f77cb95de637 (June 02, 2026)
* New driver monitoring model
* Improved image processing pipeline for driver camera
* Improved thermal policy for comma four
* Acura MDX 2022-24 support thanks to mvl-boston!
* Rivian R1S and R1T 2025 support thanks to lukasloetkolben!
sunnypilot Version 2026.001.000 (2026-05-06)
========================
+1
View File
@@ -80,6 +80,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
{"LiveParameters", {PERSISTENT, JSON}},
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
@@ -20,6 +20,7 @@ from openpilot.system.ui.widgets.list_view import button_item
from openpilot.system.ui.sunnypilot.widgets.html_render import HtmlModalSP
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.external_storage import external_storage_item
PREBUILT_PATH = os.path.join(Paths.comma_home(), "prebuilt") if PC else "/data/openpilot/prebuilt"
@@ -52,7 +53,11 @@ class DeveloperLayoutSP(DeveloperLayout):
self.error_log_btn = button_item(tr("Error Log"), tr("VIEW"), tr("View the error log for sunnypilot crashes."), callback=self._on_error_log_clicked)
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle, self.error_log_btn,]
self.external_storage = external_storage_item(tr("External Storage"), description=tr("Extend your comma device's storage by inserting a USB drive " +
"into the aux port."))
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle,
self.external_storage, self.error_log_btn,]
@staticmethod
def _on_prebuilt_toggled(state):
@@ -0,0 +1,261 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
import threading
import subprocess
import copy
from enum import Enum
from collections.abc import Callable
from openpilot.common.params import Params
from openpilot.system.hardware import PC
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.multilang import tr, tr_noop
from openpilot.system.ui.widgets import DialogResult
from openpilot.system.ui.widgets.button import Button, ButtonStyle
from openpilot.system.ui.widgets.confirm_dialog import alert_dialog, ConfirmDialog
from openpilot.system.ui.widgets.list_view import (
ItemAction,
ListItem,
BUTTON_HEIGHT,
BUTTON_BORDER_RADIUS,
BUTTON_FONT_SIZE,
BUTTON_WIDTH,
)
VALUE_FONT_SIZE = 48
class ExternalStorageState(Enum):
DISABLED = tr_noop("DISABLED")
LOADING = tr_noop("LOADING")
CHECK = tr_noop("CHECK")
MOUNT = tr_noop("MOUNT")
UNMOUNT = tr_noop("UNMOUNT")
FORMAT = tr_noop("FORMAT")
class ExternalStorageAction(ItemAction):
MAX_WIDTH = 500
def __init__(self):
super().__init__(self.MAX_WIDTH, True)
self._params = Params()
self._error_message = ""
self._text_font = gui_app.font(FontWeight.NORMAL)
self._button = Button(
"",
click_callback=self._handle_button_click,
button_style=ButtonStyle.LIST_ACTION,
border_radius=BUTTON_BORDER_RADIUS,
font_size=BUTTON_FONT_SIZE,
)
self._value_text = ""
self._formatting = False
self._refresh_pending = False
self._state = ExternalStorageState.CHECK
self._refresh_state()
self.refresh()
def set_touch_valid_callback(self, callback):
def wrapped():
if self._state == ExternalStorageState.DISABLED:
return False
return callback()
super().set_touch_valid_callback(wrapped)
self._button.set_touch_valid_callback(wrapped)
def _run(self, cmd: str) -> bool:
return subprocess.call(["sh", "-c", cmd]) == 0
def _run_output(self, cmd: str) -> str:
try:
out = subprocess.check_output(["sh", "-c", cmd], universal_newlines=True)
return out.strip()
except Exception:
return ""
def _render(self, rect: rl.Rectangle) -> bool:
if self._error_message:
msg = copy.copy(self._error_message)
gui_app.set_modal_overlay(alert_dialog(msg))
self._error_message = ""
if self._value_text:
text_size = measure_text_cached(self._text_font, self._value_text, VALUE_FONT_SIZE)
rl.draw_text_ex(
self._text_font,
self._value_text,
(rect.x + rect.width - BUTTON_WIDTH - text_size.x - 30,
rect.y + (rect.height - text_size.y) / 2),
VALUE_FONT_SIZE,
1.0,
rl.Color(170, 170, 170, 255),
)
button_rect = rl.Rectangle(
rect.x + rect.width - BUTTON_WIDTH,
rect.y + (rect.height - BUTTON_HEIGHT) / 2,
BUTTON_WIDTH,
BUTTON_HEIGHT
)
self._button.set_rect(button_rect)
self._button.set_text(tr(self._state.value))
self._button.set_enabled(self._state not in (ExternalStorageState.LOADING,
ExternalStorageState.DISABLED))
self._button.render(button_rect)
return False
def _refresh_state(self):
if PC:
self._state = ExternalStorageState.DISABLED
self._button.set_enabled(False)
self._value_text = ""
def debounced_refresh(self):
if self._refresh_pending:
return
self._refresh_pending = True
def _timer():
import time
time.sleep(0.25)
self._refresh_pending = False
self.refresh()
threading.Thread(target=_timer, daemon=True).start()
def refresh(self):
def _work():
is_mounted = self._run("findmnt -n /mnt/external_realdata")
has_drive = self._run("lsblk -f /dev/sdg")
has_fs = self._run("lsblk -f /dev/sdg1 | grep -q ext4")
has_label = self._run("blkid /dev/sdg1 | grep -q 'LABEL=\"openpilot\"'")
info = ""
if is_mounted and has_label:
info = self._run_output(
"df -h /mnt/external_realdata | awk 'NR==2 {print $3 \"/\" $2}'"
)
def apply():
if self._formatting:
self._value_text = tr("formatting")
self._state = ExternalStorageState.FORMAT
return
if not has_drive:
self._value_text = tr("insert drive")
self._state = ExternalStorageState.CHECK
elif not has_fs or not has_label:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
elif is_mounted:
self._value_text = info
self._state = ExternalStorageState.UNMOUNT
else:
self._value_text = tr("drive detected")
self._state = ExternalStorageState.MOUNT
apply()
threading.Thread(target=_work, daemon=True).start()
def _handle_button_click(self):
st = self._state
if st == ExternalStorageState.DISABLED:
return
if st in (ExternalStorageState.CHECK, ExternalStorageState.MOUNT):
self.mount_storage()
elif st == ExternalStorageState.UNMOUNT:
self.unmount_storage()
elif st == ExternalStorageState.FORMAT:
dialog = ConfirmDialog(
tr("Are you sure you want to format this drive? This will erase all data."),
confirm_text=tr("Format"),
cancel_text=tr("Cancel"),
)
gui_app.set_modal_overlay(dialog, callback=self._confirm_format)
def _confirm_format(self, result: DialogResult):
if result == DialogResult.CONFIRM:
self.format_storage()
def mount_storage(self):
self._value_text = tr("mounting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo mount -o remount,rw / &&
sudo mkdir -p /mnt/external_realdata &&
(grep -q '/dev/sdg1 /mnt/external_realdata' /etc/fstab ||
echo '/dev/sdg1 /mnt/external_realdata ext4 defaults,nofail 0 2' >> /etc/fstab) &&
sudo systemctl daemon-reexec &&
sudo mount /mnt/external_realdata &&
sudo chown -R comma:comma /mnt/external_realdata &&
sudo chmod -R 775 /mnt/external_realdata &&
sudo mount -o remount,ro /
"""
subprocess.call(["sh", "-c", cmd])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def unmount_storage(self):
self._value_text = tr("unmounting")
self._state = ExternalStorageState.LOADING
def _work():
subprocess.call(["sh", "-c", "sudo umount /mnt/external_realdata"])
self.debounced_refresh()
threading.Thread(target=_work, daemon=True).start()
def format_storage(self):
self._formatting = True
self._value_text = tr("formatting")
self._state = ExternalStorageState.LOADING
def _work():
cmd = """
sudo wipefs -a /dev/sdg &&
sudo parted -s /dev/sdg mklabel gpt mkpart primary ext4 0% 100% &&
sudo mkfs.ext4 -F -L openpilot /dev/sdg1
"""
exitcode = subprocess.call(["sh", "-c", cmd])
def apply():
self._formatting = False
if exitcode == 0:
self.mount_storage()
else:
self._value_text = tr("needs format")
self._state = ExternalStorageState.FORMAT
apply()
threading.Thread(target=_work, daemon=True).start()
def external_storage_item(title: str | Callable[[], str], description: str | Callable[[], str]) -> ListItem:
return ListItem(
title=title,
description=description,
action_item=ExternalStorageAction()
)
+41 -1
View File
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
create_connection)
import cereal.messaging as messaging
from cereal import log
from cereal import car, log
from cereal.services import SERVICE_LIST
from openpilot.common.api import Api, get_key_pair
from openpilot.common.utils import CallbackReader, get_upload_stream
@@ -45,6 +45,7 @@ from openpilot.system.hardware.hw import Paths
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {22, } # SSH
WEBRTCD_PORT = 5001
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
@@ -567,6 +568,16 @@ def getSshAuthorizedKeys() -> str:
def getGithubUsername() -> str:
return cast(str, Params().get("GithubUsername") or "")
@dispatcher.add_method
def getNotCar() -> bool:
cp_bytes = Params().get("CarParamsPersistent")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
return CP.notCar
return False
@dispatcher.add_method
def getSimInfo():
return HARDWARE.get_sim_info()
@@ -588,6 +599,35 @@ def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def startStream(sdp: str) -> dict:
from openpilot.system.webrtc.webrtcd import StreamRequestBody
bridge_services_in = []
# get live car params to avoid stale notCar edge case
cp_bytes = Params().get("CarParams")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
if CP.notCar:
bridge_services_in.append("testJoystick")
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
try:
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
json=asdict(body), timeout=10)
if not resp.ok:
try:
error_body = resp.json()
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
except ValueError:
resp.raise_for_status()
return resp.json()
except requests.ConnectTimeout as e:
raise Exception("webrtc took too long to respond. is it on?") from e
except requests.ConnectionError as e:
raise Exception("webrtc is not running. turn on comma body ignition.") from e
@dispatcher.add_method
def takeSnapshot() -> str | dict[str, str] | None:
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
-3
View File
@@ -313,9 +313,6 @@ class Tici(HardwareBase):
continue
gov = 'ondemand' if powersave_enabled else 'performance'
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
if not powersave_enabled:
# cap max core freq to 1689 Mhz
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
# *** IRQ config ***
+1
View File
@@ -26,6 +26,7 @@ public:
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
virtual void encoder_open() = 0;
virtual void encoder_close() = 0;
virtual void set_bitrate(int bitrate) = 0;
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
+4
View File
@@ -72,6 +72,10 @@ void FfmpegEncoder::encoder_close() {
is_open = false;
}
void FfmpegEncoder::set_bitrate(int bitrate) {
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
}
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
assert(buf->width == this->in_width);
assert(buf->height == this->in_height);
+1
View File
@@ -21,6 +21,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int segment_num = -1;
+21
View File
@@ -155,6 +155,8 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
current_bitrate = encoder_settings.bitrate;
adaptive_bitrate = encoder_info.adaptive_bitrate;
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
struct v4l2_format fmt_out = {
@@ -304,6 +306,25 @@ void V4LEncoder::encoder_close() {
this->is_open = false;
}
void V4LEncoder::set_bitrate(int bitrate) {
if (!adaptive_bitrate || bitrate == current_bitrate) return;
if (bitrate <= 0) {
LOGE("invalid livestream encoder bitrate %d", bitrate);
return;
}
struct v4l2_control ctrl = {
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
.value = bitrate,
};
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
return;
}
current_bitrate = bitrate;
}
V4LEncoder::~V4LEncoder() {
encoder_close();
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
+3
View File
@@ -13,6 +13,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int fd;
@@ -20,6 +21,8 @@ private:
bool is_open = false;
int segment_num = -1;
int counter = 0;
int current_bitrate = -1;
bool adaptive_bitrate;
SafeQueue<VisionIpcBufExtra> extras;
+15
View File
@@ -44,11 +44,24 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
}
}
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
static Params params;
std::string val = params.get("LivestreamEncoderBitrate");
if (val.empty()) return;
int bitrate = std::stoi(val);
for (auto &e : encoders) {
e->set_bitrate(bitrate);
}
}
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
util::set_thread_name(cam_info.thread_name);
std::vector<std::unique_ptr<Encoder>> encoders;
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
[](const auto &ei) { return ei.adaptive_bitrate; });
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
std::unique_ptr<JpegEncoder> jpeg_encoder;
@@ -108,6 +121,8 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
++cur_seg;
}
if (has_adaptive) apply_bitrate(encoders);
// encode a frame
for (int i = 0; i < encoders.size(); ++i) {
int out_id = encoders[i]->encode_frame(buf, &extra);
+9 -5
View File
@@ -47,8 +47,8 @@ struct EncoderSettings {
}
static EncoderSettings StreamEncoderSettings() {
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
}
};
@@ -59,6 +59,7 @@ public:
const char *filename = NULL;
bool record = true;
bool include_audio = false;
bool adaptive_bitrate = false;
int frame_width = -1;
int frame_height = -1;
int fps = MAIN_FPS;
@@ -104,6 +105,7 @@ const EncoderInfo stream_road_encoder_info = {
.publish_name = "livestreamRoadEncodeData",
//.thumbnail_name = "thumbnail",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
};
@@ -111,6 +113,7 @@ const EncoderInfo stream_road_encoder_info = {
const EncoderInfo stream_wide_road_encoder_info = {
.publish_name = "livestreamWideRoadEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
};
@@ -118,6 +121,7 @@ const EncoderInfo stream_wide_road_encoder_info = {
const EncoderInfo stream_driver_encoder_info = {
.publish_name = "livestreamDriverEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
};
@@ -153,19 +157,19 @@ const LogCameraInfo driver_camera_info{
const LogCameraInfo stream_road_camera_info{
.thread_name = "road_cam_encoder",
.stream_type = VISION_STREAM_ROAD,
.encoder_infos = {stream_road_encoder_info}
.encoder_infos = {stream_road_encoder_info},
};
const LogCameraInfo stream_wide_road_camera_info{
.thread_name = "wide_road_cam_encoder",
.stream_type = VISION_STREAM_WIDE_ROAD,
.encoder_infos = {stream_wide_road_encoder_info}
.encoder_infos = {stream_wide_road_encoder_info},
};
const LogCameraInfo stream_driver_camera_info{
.thread_name = "driver_cam_encoder",
.stream_type = VISION_STREAM_DRIVER,
.encoder_infos = {stream_driver_encoder_info}
.encoder_infos = {stream_driver_encoder_info},
};
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
+31 -4
View File
@@ -1,4 +1,5 @@
import asyncio
import struct
import time
import av
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
from cereal import messaging
from openpilot.common.realtime import DT_MDL, DT_DMON
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
TIMING_SEI_UUID = bytes([
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
])
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
camera_to_sock_mapping = {
@@ -19,9 +27,30 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
dt = DT_DMON if camera_type == "driver" else DT_MDL
super().__init__(camera_type, dt)
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
self._sock = self._make_sock(camera_type)
self._pts = 0
self._t0_ns = time.monotonic_ns()
self.timing_sei_enabled = False
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
def switch_camera(self, camera_type: str) -> None:
self._sock = self._make_sock(camera_type)
def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
return encode_data.header + encode_data.data
idx = encode_data.idx
sei_nal = _SEI_PREFIX + struct.pack('>4d',
(idx.timestampEof - idx.timestampSof) / 1e6,
(msg.logMonoTime - idx.timestampEof) / 1e6,
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
time.time() * 1000, # noqa: TID251
) + b'\x80'
return encode_data.header + sei_nal + encode_data.data
async def recv(self):
while True:
@@ -30,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
break
await asyncio.sleep(0.005)
evta = getattr(msg, msg.which())
packet = av.Packet(evta.header + evta.data)
packet = av.Packet(self._build_frame_data(msg))
packet.time_base = self._time_base
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
+204 -62
View File
@@ -1,7 +1,11 @@
#!/usr/bin/env python3
from abc import abstractmethod
import os
import time
import argparse
import asyncio
import contextlib
import json
import uuid
import logging
@@ -19,11 +23,39 @@ if TYPE_CHECKING:
from aiortc.rtcdatachannel import RTCDataChannel
from openpilot.system.webrtc.schema import generate_field
from openpilot.common.params import Params
from cereal import messaging, log
class CerealOutgoingMessageProxy:
class AsyncTaskRunner:
def __init__(self):
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
async def stop(self):
if self.task is None:
return
task = self.task
self.task = None
if task.done():
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
@abstractmethod
async def run(self):
pass
class CerealOutgoingMessageProxy(AsyncTaskRunner):
def __init__(self, sm: messaging.SubMaster):
super().__init__()
self.sm = sm
self.channels: list[RTCDataChannel] = []
@@ -55,6 +87,19 @@ class CerealOutgoingMessageProxy:
for channel in self.channels:
channel.send(encoded_msg)
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class CerealIncomingMessageProxy:
def __init__(self, pm: messaging.PubMaster):
@@ -72,37 +117,6 @@ class CerealIncomingMessageProxy:
self.pm.send(msg_type, msg)
class CerealProxyRunner:
def __init__(self, proxy: CerealOutgoingMessageProxy):
self.proxy = proxy
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
def stop(self):
if self.task is None or self.task.done():
return
self.task.cancel()
self.task = None
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.proxy.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class DynamicPubMaster(messaging.PubMaster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -115,21 +129,90 @@ class DynamicPubMaster(messaging.PubMaster):
self.sock[service] = messaging.pub_sock(service)
class LivestreamBitrateController(AsyncTaskRunner):
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
sample_interval = 0.2
high_level = 0.1 # drop immediately
med_level = 0.05 # drop after # of samples
low_level = 0 # raise after # of samples
down_samples = 5 # 1s
param_name = "LivestreamEncoderBitrate"
def __init__(self, peer_connection: Any):
super().__init__()
self.pc = peer_connection
self.params = Params()
self.level = 0
self.prev_lost, self.prev_sent = None, None
self.counter = 0
self.up_samples = 5 # 1s
self._auto = True
async def run(self):
while True:
await asyncio.sleep(self.sample_interval)
if not self._auto:
continue
loss_rate = await self._sample()
if loss_rate is None:
continue
if loss_rate >= self.med_level and self.level > 0:
self.counter += 1
if self.counter >= self.down_samples or loss_rate >= self.high_level:
self.level -= 1
self.up_samples *= 2 # exponential backoff before raising again
self.counter = 0
self._publish(self.bitrates[self.level])
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
self.counter -= 1
if -self.counter >= self.up_samples:
self.level += 1
self.counter = 0
self._publish(self.bitrates[self.level])
async def _sample(self) -> float | None:
report = await self.pc.getStats()
packets_lost = packets_sent = 0
for s in report.values():
if s.type == "remote-inbound-rtp":
packets_lost += s.packetsLost
elif s.type == "outbound-rtp":
packets_sent += s.packetsSent
if self.prev_lost is None:
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return None
lost_delta = max(0, packets_lost - self.prev_lost)
sent_delta = max(0, packets_sent - self.prev_sent)
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return lost_delta / sent_delta if sent_delta else 0.0
def _publish(self, bitrate: float):
self.params.put(self.param_name, bitrate)
def set_quality(self, quality):
if quality in self.label_to_bitrate:
self._publish(self.label_to_bitrate[quality])
self._auto = False
elif quality == "auto":
self._auto = True
class StreamSession:
shared_pub_master = DynamicPubMaster([])
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
from aiortc.mediastreams import VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc import WebRTCAnswerBuilder
from teleoprtc.info import parse_info_from_offer
config = parse_info_from_offer(sdp)
builder = WebRTCAnswerBuilder(sdp)
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
for cam in cameras:
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
builder.add_video_stream(init_camera, self.video_track)
self.stream = builder.stream()
self.identifier = str(uuid.uuid4())
@@ -137,35 +220,60 @@ class StreamSession:
self.incoming_bridge: CerealIncomingMessageProxy | None = None
self.incoming_bridge_services = incoming_services
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
self.outgoing_bridge_runner: CerealProxyRunner | None = None
self.bitrate_controller: LivestreamBitrateController | None = None
if len(incoming_services) > 0:
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
if len(outgoing_services) > 0:
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
self.run_task: asyncio.Task | None = None
self._cleanup_lock = asyncio.Lock()
self._cleanup_done = False
self.logger = logging.getLogger("webrtcd")
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
self.identifier, cameras, incoming_services, outgoing_services)
self.logger.info(
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
self.identifier, init_camera, incoming_services, outgoing_services,
)
def start(self):
self.run_task = asyncio.create_task(self.run())
def stop(self):
if self.run_task.done():
return
self.run_task.cancel()
async def stop(self):
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
self.run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.run_task
self.run_task = None
asyncio.run(self.post_run_cleanup())
await self.post_run_cleanup()
async def get_answer(self):
return await self.stream.start()
async def message_handler(self, message: bytes):
def message_handler(self, message: bytes):
assert self.incoming_bridge is not None
try:
self.incoming_bridge.send(message)
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
if isinstance(payload, dict):
msg_type = payload.get("type")
match msg_type:
case "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
case "livestreamSettings":
self.bitrate_controller.set_quality(payload["data"]["quality"])
case "clockSync":
pong = json.dumps({"type": "clockSync", "data": {
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
}})
self.stream.get_messaging_channel().send(pong)
case "enableTimingSei":
if hasattr(self.video_track, 'timing_sei_enabled'):
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
case _:
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
except Exception:
self.logger.exception("Cereal incoming proxy failure")
@@ -176,29 +284,38 @@ class StreamSession:
if self.incoming_bridge is not None:
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
self.stream.set_message_handler(self.message_handler)
if self.outgoing_bridge_runner is not None:
if self.outgoing_bridge is not None:
channel = self.stream.get_messaging_channel()
self.outgoing_bridge_runner.proxy.add_channel(channel)
self.outgoing_bridge_runner.start()
self.outgoing_bridge.add_channel(channel)
self.outgoing_bridge.start()
self.bitrate_controller.start()
self.logger.info("Stream session (%s) connected", self.identifier)
await self.stream.wait_for_disconnection()
await self.post_run_cleanup()
self.logger.info("Stream session (%s) ended", self.identifier)
except Exception:
self.logger.exception("Stream session failure")
finally:
await self.post_run_cleanup()
async def post_run_cleanup(self):
await self.stream.stop()
if self.outgoing_bridge is not None:
self.outgoing_bridge_runner.stop()
async with self._cleanup_lock:
if self._cleanup_done:
return
self._cleanup_done = True
if self.bitrate_controller is not None:
await self.bitrate_controller.stop()
if self.outgoing_bridge is not None:
await self.outgoing_bridge.stop()
await self.stream.stop()
@dataclass
class StreamRequestBody:
sdp: str
cameras: list[str]
initCamera: str
bridge_services_in: list[str] = field(default_factory=list)
bridge_services_out: list[str] = field(default_factory=list)
@@ -208,11 +325,33 @@ async def get_stream(request: 'web.Request'):
raw_body = await request.json()
body = StreamRequestBody(**raw_body)
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
answer = await session.get_answer()
session.start()
async with request.app['stream_lock']:
# Fully disconnect any other active stream before starting the replacement.
for sid, s in list(stream_dict.items()):
if s.run_task and not s.run_task.done():
try:
ch = s.stream.get_messaging_channel()
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
except Exception:
pass
await s.stop()
del stream_dict[sid]
stream_dict[session.identifier] = session
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
try:
answer = await session.get_answer()
except ValueError as e:
await session.stop()
raise web.HTTPBadRequest(
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
content_type="application/json",
) from e
except Exception:
await session.stop()
raise
session.start()
stream_dict[session.identifier] = session
return web.json_response({"sdp": answer.sdp, "type": answer.type})
@@ -224,6 +363,7 @@ async def get_schema(request: 'web.Request'):
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
return web.json_response(schema_dict)
async def post_notify(request: 'web.Request'):
try:
payload = await request.json()
@@ -239,9 +379,10 @@ async def post_notify(request: 'web.Request'):
return web.Response(status=200, text="OK")
async def on_shutdown(app: 'web.Application'):
for session in app['streams'].values():
session.stop()
await session.stop()
del app['streams']
@@ -254,6 +395,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
app = web.Application()
app['streams'] = dict()
app['stream_lock'] = asyncio.Lock()
app['debug'] = debug
app.on_shutdown.append(on_shutdown)
app.router.add_post("/stream", get_stream)
+1 -1
View File
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
async def offer(request: 'web.Request'):
params = await request.json()
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
body_json = json.dumps(dataclasses.asdict(body))
logger.info("Sending offer to webrtcd...")