mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-27 11:42:10 +08:00
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2ac48084a8 | |||
| 978366d817 | |||
| e97dd7f9cd | |||
| d3c05b2ef8 | |||
| 74f4f0f10e | |||
| 0491242b4a | |||
| 1b89608ccc | |||
| 53a24655d2 | |||
| c9f92a8c76 | |||
| 10b1d673c9 | |||
| 7080167daf | |||
| c7a1c70504 | |||
| c6a6caf6ff | |||
| 8d49a44f52 | |||
| 3434ca9d3e | |||
| e4f8a5edd1 | |||
| 1f4f9bd4bd | |||
| 455e730c4c | |||
| b243d4e356 | |||
| de0550d47b |
+1
-30
@@ -1,34 +1,5 @@
|
||||
sunnypilot Version 2026.002.000 (2026-06-28)
|
||||
sunnypilot Version 2026.002.000 (2026-xx-xx)
|
||||
========================
|
||||
* What's Changed (sunnypilot/sunnypilot)
|
||||
* ui: update gates for certain toggles by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1830
|
||||
* release: ignore upstream IsReleaseBranch by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1831
|
||||
* manager: disable DEVELOPMENT_ONLY reset by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1833
|
||||
* sunnylink: fix max time offroad values by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1835
|
||||
* ui: show default model name by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1837
|
||||
* sunnylink: add CarParams fallback for brand-specific capabilities by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1839
|
||||
* sunnylink SDUI: tweak DisableUpdate param for clarity by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1842
|
||||
* Revert "DM: Lancia Delta HF Integrale model" by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1849
|
||||
* modeld_v2: safe model validation by @Discountchubbs in https://github.com/sunnypilot/sunnypilot/pull/1855
|
||||
* Revert "deprecate `carState.brake`" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/sunnypilot/pull/1860
|
||||
* sunnylink: deprecate legacy params metadata by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1862
|
||||
* ui: reset Enforce Torque Control and NNLC if both are enabled by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1863
|
||||
* What's Changed (sunnypilot/opendbc)
|
||||
* Rivian: suppress ACM hold-the-wheel warning during MADS-only lateral by @lukasloetkolben in https://github.com/sunnypilot/opendbc/pull/465
|
||||
* Sync: `commaai/opendbc:master` → `sunnypilot/opendbc:master` by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/479
|
||||
* safety: add option to ignore frequency check for RX checks by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/480
|
||||
* Revert "deprecate carState.brake" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/opendbc/pull/481
|
||||
* New Contributors (sunnypilot/sunnypilot)
|
||||
* @mvl-boston made their first contribution in https://github.com/sunnypilot/sunnypilot/pull/1860
|
||||
* Full Changelog: https://github.com/sunnypilot/sunnypilot/compare/v2026.001.007...v2026.002.000
|
||||
************************
|
||||
* Synced with commaai's openpilot (v0.11.1)
|
||||
* master commit 69e2c321e49760e52f7983eaa0a5f77cb95de637 (June 02, 2026)
|
||||
* New driver monitoring model
|
||||
* Improved image processing pipeline for driver camera
|
||||
* Improved thermal policy for comma four
|
||||
* Acura MDX 2022-24 support thanks to mvl-boston!
|
||||
* Rivian R1S and R1T 2025 support thanks to lukasloetkolben!
|
||||
|
||||
sunnypilot Version 2026.001.000 (2026-05-06)
|
||||
========================
|
||||
|
||||
@@ -54,6 +54,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
|
||||
{"GsmRoaming", {PERSISTENT | BACKUP, BOOL}},
|
||||
{"HardwareSerial", {PERSISTENT, STRING}},
|
||||
{"HasAcceptedTerms", {PERSISTENT, STRING, "0"}},
|
||||
{"HideCamera", {PERSISTENT | BACKUP, BOOL, "0"}},
|
||||
{"InstallDate", {PERSISTENT, TIME}},
|
||||
{"IsDriverViewEnabled", {CLEAR_ON_MANAGER_START, BOOL}},
|
||||
{"IsEngaged", {PERSISTENT, BOOL}},
|
||||
@@ -80,6 +81,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
|
||||
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
|
||||
{"LiveParameters", {PERSISTENT, JSON}},
|
||||
{"LiveParametersV2", {PERSISTENT, BYTES}},
|
||||
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
|
||||
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
|
||||
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
|
||||
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
|
||||
|
||||
@@ -213,6 +213,8 @@ class AugmentedRoadView(CameraView):
|
||||
|
||||
# Render the base camera view
|
||||
super()._render(self._content_rect)
|
||||
if ui_state.hide_camera:
|
||||
rl.draw_rectangle_rec(self._content_rect, rl.BLACK)
|
||||
|
||||
# Draw all UI overlays
|
||||
self._model_renderer.render(self._content_rect)
|
||||
|
||||
@@ -86,6 +86,8 @@ class AugmentedRoadView(CameraView, AugmentedRoadViewSP):
|
||||
|
||||
# Render the base camera view
|
||||
super()._render(rect)
|
||||
if ui_state.hide_camera:
|
||||
rl.draw_rectangle_rec(self._content_rect, rl.BLACK)
|
||||
|
||||
# Draw all UI overlays
|
||||
self.model_renderer.render(self._content_rect)
|
||||
|
||||
@@ -93,6 +93,11 @@ class VisualsLayout(Widget):
|
||||
"This displays what the car is currently doing, not what the planner is requesting."),
|
||||
None,
|
||||
),
|
||||
"HideCamera": (
|
||||
lambda: tr("Hide Camera"),
|
||||
tr("Hide the camera live view from the driving screen."),
|
||||
None,
|
||||
),
|
||||
}
|
||||
self._toggles = {}
|
||||
for param, (title, desc, callback) in self._toggle_defs.items():
|
||||
|
||||
@@ -150,6 +150,7 @@ class UIStateSP:
|
||||
self.chevron_metrics = self.params.get("ChevronInfo")
|
||||
self.custom_interactive_timeout = self.params.get("InteractivityTimeout", return_default=True)
|
||||
self.developer_ui = self.params.get("DevUIInfo")
|
||||
self.hide_camera = self.params.get_bool("HideCamera")
|
||||
self.hide_v_ego_ui = self.params.get_bool("HideVEgoUI")
|
||||
self.onroad_brightness = int(float(self.params.get("OnroadScreenOffBrightness", return_default=True)))
|
||||
self.onroad_brightness_timer_param = self.params.get("OnroadScreenOffTimer", return_default=True)
|
||||
|
||||
@@ -1296,6 +1296,12 @@
|
||||
"title": "Display Turn Signals",
|
||||
"description": "When enabled, visual turn indicators are drawn on the HUD."
|
||||
},
|
||||
{
|
||||
"key": "HideCamera",
|
||||
"widget": "toggle",
|
||||
"title": "Hide Camera",
|
||||
"description": "Hide the camera live view from the driving screen."
|
||||
},
|
||||
{
|
||||
"key": "RoadNameToggle",
|
||||
"widget": "toggle",
|
||||
|
||||
@@ -24,6 +24,10 @@ sections:
|
||||
widget: toggle
|
||||
title: Display Turn Signals
|
||||
description: When enabled, visual turn indicators are drawn on the HUD.
|
||||
- key: HideCamera
|
||||
widget: toggle
|
||||
title: Hide Camera
|
||||
description: Hide the camera live view from the driving screen.
|
||||
- key: RoadNameToggle
|
||||
widget: toggle
|
||||
title: Display Road Name
|
||||
|
||||
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
|
||||
create_connection)
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
from cereal import car, log
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.common.api import Api, get_key_pair
|
||||
from openpilot.common.utils import CallbackReader, get_upload_stream
|
||||
@@ -45,6 +45,7 @@ from openpilot.system.hardware.hw import Paths
|
||||
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
|
||||
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
|
||||
LOCAL_PORT_WHITELIST = {22, } # SSH
|
||||
WEBRTCD_PORT = 5001
|
||||
|
||||
LOG_ATTR_NAME = 'user.upload'
|
||||
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
|
||||
@@ -567,6 +568,16 @@ def getSshAuthorizedKeys() -> str:
|
||||
def getGithubUsername() -> str:
|
||||
return cast(str, Params().get("GithubUsername") or "")
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getNotCar() -> bool:
|
||||
cp_bytes = Params().get("CarParamsPersistent")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
return CP.notCar
|
||||
return False
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getSimInfo():
|
||||
return HARDWARE.get_sim_info()
|
||||
@@ -588,6 +599,35 @@ def getNetworks():
|
||||
return HARDWARE.get_networks()
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def startStream(sdp: str) -> dict:
|
||||
from openpilot.system.webrtc.webrtcd import StreamRequestBody
|
||||
bridge_services_in = []
|
||||
|
||||
# get live car params to avoid stale notCar edge case
|
||||
cp_bytes = Params().get("CarParams")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
if CP.notCar:
|
||||
bridge_services_in.append("testJoystick")
|
||||
|
||||
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
|
||||
try:
|
||||
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
|
||||
json=asdict(body), timeout=10)
|
||||
if not resp.ok:
|
||||
try:
|
||||
error_body = resp.json()
|
||||
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
|
||||
except ValueError:
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.ConnectTimeout as e:
|
||||
raise Exception("webrtc took too long to respond. is it on?") from e
|
||||
except requests.ConnectionError as e:
|
||||
raise Exception("webrtc is not running. turn on comma body ignition.") from e
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def takeSnapshot() -> str | dict[str, str] | None:
|
||||
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
|
||||
|
||||
@@ -313,9 +313,6 @@ class Tici(HardwareBase):
|
||||
continue
|
||||
gov = 'ondemand' if powersave_enabled else 'performance'
|
||||
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
|
||||
if not powersave_enabled:
|
||||
# cap max core freq to 1689 Mhz
|
||||
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
|
||||
|
||||
# *** IRQ config ***
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ public:
|
||||
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
|
||||
virtual void encoder_open() = 0;
|
||||
virtual void encoder_close() = 0;
|
||||
virtual void set_bitrate(int bitrate) = 0;
|
||||
|
||||
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
|
||||
|
||||
|
||||
@@ -72,6 +72,10 @@ void FfmpegEncoder::encoder_close() {
|
||||
is_open = false;
|
||||
}
|
||||
|
||||
void FfmpegEncoder::set_bitrate(int bitrate) {
|
||||
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
|
||||
}
|
||||
|
||||
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
|
||||
assert(buf->width == this->in_width);
|
||||
assert(buf->height == this->in_height);
|
||||
|
||||
@@ -21,6 +21,7 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int segment_num = -1;
|
||||
|
||||
@@ -155,6 +155,8 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
|
||||
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
|
||||
|
||||
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
|
||||
current_bitrate = encoder_settings.bitrate;
|
||||
adaptive_bitrate = encoder_info.adaptive_bitrate;
|
||||
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
|
||||
|
||||
struct v4l2_format fmt_out = {
|
||||
@@ -304,6 +306,25 @@ void V4LEncoder::encoder_close() {
|
||||
this->is_open = false;
|
||||
}
|
||||
|
||||
void V4LEncoder::set_bitrate(int bitrate) {
|
||||
if (!adaptive_bitrate || bitrate == current_bitrate) return;
|
||||
if (bitrate <= 0) {
|
||||
LOGE("invalid livestream encoder bitrate %d", bitrate);
|
||||
return;
|
||||
}
|
||||
|
||||
struct v4l2_control ctrl = {
|
||||
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
|
||||
.value = bitrate,
|
||||
};
|
||||
|
||||
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
|
||||
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
|
||||
return;
|
||||
}
|
||||
current_bitrate = bitrate;
|
||||
}
|
||||
|
||||
V4LEncoder::~V4LEncoder() {
|
||||
encoder_close();
|
||||
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
|
||||
|
||||
@@ -13,6 +13,7 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int fd;
|
||||
@@ -20,6 +21,8 @@ private:
|
||||
bool is_open = false;
|
||||
int segment_num = -1;
|
||||
int counter = 0;
|
||||
int current_bitrate = -1;
|
||||
bool adaptive_bitrate;
|
||||
|
||||
SafeQueue<VisionIpcBufExtra> extras;
|
||||
|
||||
|
||||
@@ -44,11 +44,24 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
|
||||
}
|
||||
}
|
||||
|
||||
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
|
||||
static Params params;
|
||||
std::string val = params.get("LivestreamEncoderBitrate");
|
||||
if (val.empty()) return;
|
||||
int bitrate = std::stoi(val);
|
||||
for (auto &e : encoders) {
|
||||
e->set_bitrate(bitrate);
|
||||
}
|
||||
}
|
||||
|
||||
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
util::set_thread_name(cam_info.thread_name);
|
||||
|
||||
std::vector<std::unique_ptr<Encoder>> encoders;
|
||||
|
||||
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
|
||||
[](const auto &ei) { return ei.adaptive_bitrate; });
|
||||
|
||||
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
|
||||
|
||||
std::unique_ptr<JpegEncoder> jpeg_encoder;
|
||||
@@ -108,6 +121,8 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
++cur_seg;
|
||||
}
|
||||
|
||||
if (has_adaptive) apply_bitrate(encoders);
|
||||
|
||||
// encode a frame
|
||||
for (int i = 0; i < encoders.size(); ++i) {
|
||||
int out_id = encoders[i]->encode_frame(buf, &extra);
|
||||
|
||||
@@ -47,8 +47,8 @@ struct EncoderSettings {
|
||||
}
|
||||
|
||||
static EncoderSettings StreamEncoderSettings() {
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
|
||||
}
|
||||
};
|
||||
|
||||
@@ -59,6 +59,7 @@ public:
|
||||
const char *filename = NULL;
|
||||
bool record = true;
|
||||
bool include_audio = false;
|
||||
bool adaptive_bitrate = false;
|
||||
int frame_width = -1;
|
||||
int frame_height = -1;
|
||||
int fps = MAIN_FPS;
|
||||
@@ -104,6 +105,7 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
.publish_name = "livestreamRoadEncodeData",
|
||||
//.thumbnail_name = "thumbnail",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
|
||||
};
|
||||
@@ -111,6 +113,7 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
const EncoderInfo stream_wide_road_encoder_info = {
|
||||
.publish_name = "livestreamWideRoadEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
|
||||
};
|
||||
@@ -118,6 +121,7 @@ const EncoderInfo stream_wide_road_encoder_info = {
|
||||
const EncoderInfo stream_driver_encoder_info = {
|
||||
.publish_name = "livestreamDriverEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
|
||||
};
|
||||
@@ -153,19 +157,19 @@ const LogCameraInfo driver_camera_info{
|
||||
const LogCameraInfo stream_road_camera_info{
|
||||
.thread_name = "road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_ROAD,
|
||||
.encoder_infos = {stream_road_encoder_info}
|
||||
.encoder_infos = {stream_road_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_wide_road_camera_info{
|
||||
.thread_name = "wide_road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_WIDE_ROAD,
|
||||
.encoder_infos = {stream_wide_road_encoder_info}
|
||||
.encoder_infos = {stream_wide_road_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_driver_camera_info{
|
||||
.thread_name = "driver_cam_encoder",
|
||||
.stream_type = VISION_STREAM_DRIVER,
|
||||
.encoder_infos = {stream_driver_encoder_info}
|
||||
.encoder_infos = {stream_driver_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import struct
|
||||
import time
|
||||
|
||||
import av
|
||||
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
|
||||
from cereal import messaging
|
||||
from openpilot.common.realtime import DT_MDL, DT_DMON
|
||||
|
||||
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
|
||||
TIMING_SEI_UUID = bytes([
|
||||
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
|
||||
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
|
||||
])
|
||||
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
|
||||
|
||||
|
||||
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
camera_to_sock_mapping = {
|
||||
@@ -19,9 +27,30 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
dt = DT_DMON if camera_type == "driver" else DT_MDL
|
||||
super().__init__(camera_type, dt)
|
||||
|
||||
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
self._sock = self._make_sock(camera_type)
|
||||
self._pts = 0
|
||||
self._t0_ns = time.monotonic_ns()
|
||||
self.timing_sei_enabled = False
|
||||
|
||||
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
|
||||
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
|
||||
def switch_camera(self, camera_type: str) -> None:
|
||||
self._sock = self._make_sock(camera_type)
|
||||
|
||||
def _build_frame_data(self, msg) -> bytes:
|
||||
encode_data = getattr(msg, msg.which())
|
||||
if not self.timing_sei_enabled:
|
||||
return encode_data.header + encode_data.data
|
||||
|
||||
idx = encode_data.idx
|
||||
sei_nal = _SEI_PREFIX + struct.pack('>4d',
|
||||
(idx.timestampEof - idx.timestampSof) / 1e6,
|
||||
(msg.logMonoTime - idx.timestampEof) / 1e6,
|
||||
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
|
||||
time.time() * 1000, # noqa: TID251
|
||||
) + b'\x80'
|
||||
return encode_data.header + sei_nal + encode_data.data
|
||||
|
||||
async def recv(self):
|
||||
while True:
|
||||
@@ -30,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
break
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
evta = getattr(msg, msg.which())
|
||||
|
||||
packet = av.Packet(evta.header + evta.data)
|
||||
packet = av.Packet(self._build_frame_data(msg))
|
||||
packet.time_base = self._time_base
|
||||
|
||||
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
|
||||
|
||||
+204
-62
@@ -1,7 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from abc import abstractmethod
|
||||
import os
|
||||
import time
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import uuid
|
||||
import logging
|
||||
@@ -19,11 +23,39 @@ if TYPE_CHECKING:
|
||||
from aiortc.rtcdatachannel import RTCDataChannel
|
||||
|
||||
from openpilot.system.webrtc.schema import generate_field
|
||||
from openpilot.common.params import Params
|
||||
from cereal import messaging, log
|
||||
|
||||
|
||||
class CerealOutgoingMessageProxy:
|
||||
class AsyncTaskRunner:
|
||||
def __init__(self):
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
async def stop(self):
|
||||
if self.task is None:
|
||||
return
|
||||
task = self.task
|
||||
self.task = None
|
||||
if task.done():
|
||||
return
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
@abstractmethod
|
||||
async def run(self):
|
||||
pass
|
||||
|
||||
|
||||
class CerealOutgoingMessageProxy(AsyncTaskRunner):
|
||||
def __init__(self, sm: messaging.SubMaster):
|
||||
super().__init__()
|
||||
self.sm = sm
|
||||
self.channels: list[RTCDataChannel] = []
|
||||
|
||||
@@ -55,6 +87,19 @@ class CerealOutgoingMessageProxy:
|
||||
for channel in self.channels:
|
||||
channel.send(encoded_msg)
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class CerealIncomingMessageProxy:
|
||||
def __init__(self, pm: messaging.PubMaster):
|
||||
@@ -72,37 +117,6 @@ class CerealIncomingMessageProxy:
|
||||
self.pm.send(msg_type, msg)
|
||||
|
||||
|
||||
class CerealProxyRunner:
|
||||
def __init__(self, proxy: CerealOutgoingMessageProxy):
|
||||
self.proxy = proxy
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
def stop(self):
|
||||
if self.task is None or self.task.done():
|
||||
return
|
||||
self.task.cancel()
|
||||
self.task = None
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.proxy.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class DynamicPubMaster(messaging.PubMaster):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@@ -115,21 +129,90 @@ class DynamicPubMaster(messaging.PubMaster):
|
||||
self.sock[service] = messaging.pub_sock(service)
|
||||
|
||||
|
||||
class LivestreamBitrateController(AsyncTaskRunner):
|
||||
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
|
||||
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
|
||||
sample_interval = 0.2
|
||||
high_level = 0.1 # drop immediately
|
||||
med_level = 0.05 # drop after # of samples
|
||||
low_level = 0 # raise after # of samples
|
||||
down_samples = 5 # 1s
|
||||
param_name = "LivestreamEncoderBitrate"
|
||||
|
||||
def __init__(self, peer_connection: Any):
|
||||
super().__init__()
|
||||
self.pc = peer_connection
|
||||
self.params = Params()
|
||||
|
||||
self.level = 0
|
||||
self.prev_lost, self.prev_sent = None, None
|
||||
self.counter = 0
|
||||
self.up_samples = 5 # 1s
|
||||
self._auto = True
|
||||
|
||||
async def run(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.sample_interval)
|
||||
if not self._auto:
|
||||
continue
|
||||
|
||||
loss_rate = await self._sample()
|
||||
if loss_rate is None:
|
||||
continue
|
||||
if loss_rate >= self.med_level and self.level > 0:
|
||||
self.counter += 1
|
||||
if self.counter >= self.down_samples or loss_rate >= self.high_level:
|
||||
self.level -= 1
|
||||
self.up_samples *= 2 # exponential backoff before raising again
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
|
||||
self.counter -= 1
|
||||
if -self.counter >= self.up_samples:
|
||||
self.level += 1
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
|
||||
async def _sample(self) -> float | None:
|
||||
report = await self.pc.getStats()
|
||||
packets_lost = packets_sent = 0
|
||||
for s in report.values():
|
||||
if s.type == "remote-inbound-rtp":
|
||||
packets_lost += s.packetsLost
|
||||
elif s.type == "outbound-rtp":
|
||||
packets_sent += s.packetsSent
|
||||
|
||||
if self.prev_lost is None:
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return None
|
||||
lost_delta = max(0, packets_lost - self.prev_lost)
|
||||
sent_delta = max(0, packets_sent - self.prev_sent)
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return lost_delta / sent_delta if sent_delta else 0.0
|
||||
|
||||
def _publish(self, bitrate: float):
|
||||
self.params.put(self.param_name, bitrate)
|
||||
|
||||
def set_quality(self, quality):
|
||||
if quality in self.label_to_bitrate:
|
||||
self._publish(self.label_to_bitrate[quality])
|
||||
self._auto = False
|
||||
elif quality == "auto":
|
||||
self._auto = True
|
||||
|
||||
|
||||
class StreamSession:
|
||||
shared_pub_master = DynamicPubMaster([])
|
||||
|
||||
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
from aiortc.mediastreams import VideoStreamTrack
|
||||
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
|
||||
from teleoprtc import WebRTCAnswerBuilder
|
||||
from teleoprtc.info import parse_info_from_offer
|
||||
|
||||
config = parse_info_from_offer(sdp)
|
||||
builder = WebRTCAnswerBuilder(sdp)
|
||||
|
||||
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
|
||||
for cam in cameras:
|
||||
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
|
||||
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
|
||||
builder.add_video_stream(init_camera, self.video_track)
|
||||
|
||||
self.stream = builder.stream()
|
||||
self.identifier = str(uuid.uuid4())
|
||||
@@ -137,35 +220,60 @@ class StreamSession:
|
||||
self.incoming_bridge: CerealIncomingMessageProxy | None = None
|
||||
self.incoming_bridge_services = incoming_services
|
||||
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
|
||||
self.outgoing_bridge_runner: CerealProxyRunner | None = None
|
||||
self.bitrate_controller: LivestreamBitrateController | None = None
|
||||
if len(incoming_services) > 0:
|
||||
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
|
||||
if len(outgoing_services) > 0:
|
||||
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
|
||||
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
|
||||
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
|
||||
|
||||
self.run_task: asyncio.Task | None = None
|
||||
self._cleanup_lock = asyncio.Lock()
|
||||
self._cleanup_done = False
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, cameras, incoming_services, outgoing_services)
|
||||
self.logger.info(
|
||||
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, init_camera, incoming_services, outgoing_services,
|
||||
)
|
||||
|
||||
def start(self):
|
||||
self.run_task = asyncio.create_task(self.run())
|
||||
|
||||
def stop(self):
|
||||
if self.run_task.done():
|
||||
return
|
||||
self.run_task.cancel()
|
||||
async def stop(self):
|
||||
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
|
||||
self.run_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self.run_task
|
||||
self.run_task = None
|
||||
asyncio.run(self.post_run_cleanup())
|
||||
await self.post_run_cleanup()
|
||||
|
||||
async def get_answer(self):
|
||||
return await self.stream.start()
|
||||
|
||||
async def message_handler(self, message: bytes):
|
||||
def message_handler(self, message: bytes):
|
||||
assert self.incoming_bridge is not None
|
||||
try:
|
||||
self.incoming_bridge.send(message)
|
||||
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
|
||||
if isinstance(payload, dict):
|
||||
msg_type = payload.get("type")
|
||||
|
||||
match msg_type:
|
||||
case "livestreamCameraSwitch":
|
||||
self.video_track.switch_camera(payload["data"]["camera"])
|
||||
case "livestreamSettings":
|
||||
self.bitrate_controller.set_quality(payload["data"]["quality"])
|
||||
case "clockSync":
|
||||
pong = json.dumps({"type": "clockSync", "data": {
|
||||
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
|
||||
}})
|
||||
self.stream.get_messaging_channel().send(pong)
|
||||
case "enableTimingSei":
|
||||
if hasattr(self.video_track, 'timing_sei_enabled'):
|
||||
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
|
||||
case _:
|
||||
if payload.get("type") not in self.incoming_bridge_services:
|
||||
return
|
||||
self.incoming_bridge.send(message)
|
||||
except Exception:
|
||||
self.logger.exception("Cereal incoming proxy failure")
|
||||
|
||||
@@ -176,29 +284,38 @@ class StreamSession:
|
||||
if self.incoming_bridge is not None:
|
||||
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
|
||||
self.stream.set_message_handler(self.message_handler)
|
||||
if self.outgoing_bridge_runner is not None:
|
||||
if self.outgoing_bridge is not None:
|
||||
channel = self.stream.get_messaging_channel()
|
||||
self.outgoing_bridge_runner.proxy.add_channel(channel)
|
||||
self.outgoing_bridge_runner.start()
|
||||
self.outgoing_bridge.add_channel(channel)
|
||||
self.outgoing_bridge.start()
|
||||
self.bitrate_controller.start()
|
||||
|
||||
self.logger.info("Stream session (%s) connected", self.identifier)
|
||||
|
||||
await self.stream.wait_for_disconnection()
|
||||
await self.post_run_cleanup()
|
||||
|
||||
self.logger.info("Stream session (%s) ended", self.identifier)
|
||||
except Exception:
|
||||
self.logger.exception("Stream session failure")
|
||||
finally:
|
||||
await self.post_run_cleanup()
|
||||
|
||||
async def post_run_cleanup(self):
|
||||
await self.stream.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
self.outgoing_bridge_runner.stop()
|
||||
async with self._cleanup_lock:
|
||||
if self._cleanup_done:
|
||||
return
|
||||
self._cleanup_done = True
|
||||
if self.bitrate_controller is not None:
|
||||
await self.bitrate_controller.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
await self.outgoing_bridge.stop()
|
||||
await self.stream.stop()
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamRequestBody:
|
||||
sdp: str
|
||||
cameras: list[str]
|
||||
initCamera: str
|
||||
bridge_services_in: list[str] = field(default_factory=list)
|
||||
bridge_services_out: list[str] = field(default_factory=list)
|
||||
|
||||
@@ -208,11 +325,33 @@ async def get_stream(request: 'web.Request'):
|
||||
raw_body = await request.json()
|
||||
body = StreamRequestBody(**raw_body)
|
||||
|
||||
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
answer = await session.get_answer()
|
||||
session.start()
|
||||
async with request.app['stream_lock']:
|
||||
# Fully disconnect any other active stream before starting the replacement.
|
||||
for sid, s in list(stream_dict.items()):
|
||||
if s.run_task and not s.run_task.done():
|
||||
try:
|
||||
ch = s.stream.get_messaging_channel()
|
||||
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
|
||||
except Exception:
|
||||
pass
|
||||
await s.stop()
|
||||
del stream_dict[sid]
|
||||
|
||||
stream_dict[session.identifier] = session
|
||||
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
try:
|
||||
answer = await session.get_answer()
|
||||
except ValueError as e:
|
||||
await session.stop()
|
||||
raise web.HTTPBadRequest(
|
||||
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
|
||||
content_type="application/json",
|
||||
) from e
|
||||
except Exception:
|
||||
await session.stop()
|
||||
raise
|
||||
session.start()
|
||||
|
||||
stream_dict[session.identifier] = session
|
||||
|
||||
return web.json_response({"sdp": answer.sdp, "type": answer.type})
|
||||
|
||||
@@ -224,6 +363,7 @@ async def get_schema(request: 'web.Request'):
|
||||
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
|
||||
return web.json_response(schema_dict)
|
||||
|
||||
|
||||
async def post_notify(request: 'web.Request'):
|
||||
try:
|
||||
payload = await request.json()
|
||||
@@ -239,9 +379,10 @@ async def post_notify(request: 'web.Request'):
|
||||
|
||||
return web.Response(status=200, text="OK")
|
||||
|
||||
|
||||
async def on_shutdown(app: 'web.Application'):
|
||||
for session in app['streams'].values():
|
||||
session.stop()
|
||||
await session.stop()
|
||||
del app['streams']
|
||||
|
||||
|
||||
@@ -254,6 +395,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
|
||||
app = web.Application()
|
||||
|
||||
app['streams'] = dict()
|
||||
app['stream_lock'] = asyncio.Lock()
|
||||
app['debug'] = debug
|
||||
app.on_shutdown.append(on_shutdown)
|
||||
app.router.add_post("/stream", get_stream)
|
||||
|
||||
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
|
||||
|
||||
async def offer(request: 'web.Request'):
|
||||
params = await request.json()
|
||||
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
|
||||
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
|
||||
body_json = json.dumps(dataclasses.asdict(body))
|
||||
|
||||
logger.info("Sending offer to webrtcd...")
|
||||
|
||||
Reference in New Issue
Block a user