Compare commits

...

9 Commits

Author SHA1 Message Date
Jason Wen 97b7088a38 Update CHANGELOG.md
(cherry picked from commit da6313dbe9)
2026-06-26 21:33:52 -04:00
Daniel Koepping fb9e8f4fd9 set max cpu core frequency (#38132)
cap cpu core frequency

(cherry picked from commit c01b8be9682f54d7305c79ed7f2c8f2812bcc486)
2026-06-26 21:14:10 -04:00
Jason Wen 1e66d34eb2 Revert "remote body teleop from connect support (#38011)"
This reverts commit aff9f9ffae.
2026-06-26 21:13:26 -04:00
Jason Wen 4fa26af61b Revert "remote teleop multi-stream on 1 video track (#38013)"
This reverts commit 9844075bb2.
2026-06-26 21:13:21 -04:00
Jason Wen f03c9dd5aa Revert "hw encoder: set_bitrate and apply_bitrate based on Param (#38095)"
This reverts commit ad522f8444.
2026-06-26 21:13:04 -04:00
Jason Wen 0846ab723e Revert "remote teleop frame timing support (#38014)"
This reverts commit 1f227bbe20.
2026-06-26 21:11:43 -04:00
Jason Wen 9526c749ce Revert "fix frame timing feature in webrtcd (#38090)"
This reverts commit 5af72796f2.
2026-06-26 21:11:36 -04:00
Jason Wen 0b6fddf995 Revert "webrtcd: turn CerealProxyRunner into more general AsyncTaskRunner (#38093)"
This reverts commit 4cd93f3eee.
2026-06-26 21:11:25 -04:00
Jason Wen 3cc89bc52a Revert "webrtc: livestream bitrate controller (#38120)"
This reverts commit 04e2351246.
2026-06-26 21:11:18 -04:00
14 changed files with 106 additions and 333 deletions
+30 -1
View File
@@ -1,5 +1,34 @@
sunnypilot Version 2026.002.000 (2026-xx-xx)
sunnypilot Version 2026.002.000 (2026-06-28)
========================
* What's Changed (sunnypilot/sunnypilot)
* ui: update gates for certain toggles by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1830
* release: ignore upstream IsReleaseBranch by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1831
* manager: disable DEVELOPMENT_ONLY reset by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1833
* sunnylink: fix max time offroad values by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1835
* ui: show default model name by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1837
* sunnylink: add CarParams fallback for brand-specific capabilities by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1839
* sunnylink SDUI: tweak DisableUpdate param for clarity by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1842
* Revert "DM: Lancia Delta HF Integrale model" by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1849
* modeld_v2: safe model validation by @Discountchubbs in https://github.com/sunnypilot/sunnypilot/pull/1855
* Revert "deprecate `carState.brake`" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/sunnypilot/pull/1860
* sunnylink: deprecate legacy params metadata by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1862
* ui: reset Enforce Torque Control and NNLC if both are enabled by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1863
* What's Changed (sunnypilot/opendbc)
* Rivian: suppress ACM hold-the-wheel warning during MADS-only lateral by @lukasloetkolben in https://github.com/sunnypilot/opendbc/pull/465
* Sync: `commaai/opendbc:master``sunnypilot/opendbc:master` by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/479
* safety: add option to ignore frequency check for RX checks by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/480
* Revert "deprecate carState.brake" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/opendbc/pull/481
* New Contributors (sunnypilot/sunnypilot)
* @mvl-boston made their first contribution in https://github.com/sunnypilot/sunnypilot/pull/1860
* Full Changelog: https://github.com/sunnypilot/sunnypilot/compare/v2026.001.007...v2026.002.000
************************
* Synced with commaai's openpilot (v0.11.1)
* master commit 69e2c321e49760e52f7983eaa0a5f77cb95de637 (June 02, 2026)
* New driver monitoring model
* Improved image processing pipeline for driver camera
* Improved thermal policy for comma four
* Acura MDX 2022-24 support thanks to mvl-boston!
* Rivian R1S and R1T 2025 support thanks to lukasloetkolben!
sunnypilot Version 2026.001.000 (2026-05-06)
========================
-1
View File
@@ -80,7 +80,6 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
{"LiveParameters", {PERSISTENT, JSON}},
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
+1 -41
View File
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
create_connection)
import cereal.messaging as messaging
from cereal import car, log
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.api import Api, get_key_pair
from openpilot.common.utils import CallbackReader, get_upload_stream
@@ -45,7 +45,6 @@ from openpilot.system.hardware.hw import Paths
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {22, } # SSH
WEBRTCD_PORT = 5001
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
@@ -568,16 +567,6 @@ def getSshAuthorizedKeys() -> str:
def getGithubUsername() -> str:
return cast(str, Params().get("GithubUsername") or "")
@dispatcher.add_method
def getNotCar() -> bool:
cp_bytes = Params().get("CarParamsPersistent")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
return CP.notCar
return False
@dispatcher.add_method
def getSimInfo():
return HARDWARE.get_sim_info()
@@ -599,35 +588,6 @@ def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def startStream(sdp: str) -> dict:
from openpilot.system.webrtc.webrtcd import StreamRequestBody
bridge_services_in = []
# get live car params to avoid stale notCar edge case
cp_bytes = Params().get("CarParams")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
if CP.notCar:
bridge_services_in.append("testJoystick")
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
try:
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
json=asdict(body), timeout=10)
if not resp.ok:
try:
error_body = resp.json()
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
except ValueError:
resp.raise_for_status()
return resp.json()
except requests.ConnectTimeout as e:
raise Exception("webrtc took too long to respond. is it on?") from e
except requests.ConnectionError as e:
raise Exception("webrtc is not running. turn on comma body ignition.") from e
@dispatcher.add_method
def takeSnapshot() -> str | dict[str, str] | None:
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
+3
View File
@@ -313,6 +313,9 @@ class Tici(HardwareBase):
continue
gov = 'ondemand' if powersave_enabled else 'performance'
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
if not powersave_enabled:
# cap max core freq to 1689 Mhz
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
# *** IRQ config ***
-1
View File
@@ -26,7 +26,6 @@ public:
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
virtual void encoder_open() = 0;
virtual void encoder_close() = 0;
virtual void set_bitrate(int bitrate) = 0;
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
-4
View File
@@ -72,10 +72,6 @@ void FfmpegEncoder::encoder_close() {
is_open = false;
}
void FfmpegEncoder::set_bitrate(int bitrate) {
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
}
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
assert(buf->width == this->in_width);
assert(buf->height == this->in_height);
-1
View File
@@ -21,7 +21,6 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int segment_num = -1;
-21
View File
@@ -155,8 +155,6 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
current_bitrate = encoder_settings.bitrate;
adaptive_bitrate = encoder_info.adaptive_bitrate;
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
struct v4l2_format fmt_out = {
@@ -306,25 +304,6 @@ void V4LEncoder::encoder_close() {
this->is_open = false;
}
void V4LEncoder::set_bitrate(int bitrate) {
if (!adaptive_bitrate || bitrate == current_bitrate) return;
if (bitrate <= 0) {
LOGE("invalid livestream encoder bitrate %d", bitrate);
return;
}
struct v4l2_control ctrl = {
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
.value = bitrate,
};
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
return;
}
current_bitrate = bitrate;
}
V4LEncoder::~V4LEncoder() {
encoder_close();
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
-3
View File
@@ -13,7 +13,6 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int fd;
@@ -21,8 +20,6 @@ private:
bool is_open = false;
int segment_num = -1;
int counter = 0;
int current_bitrate = -1;
bool adaptive_bitrate;
SafeQueue<VisionIpcBufExtra> extras;
-15
View File
@@ -44,24 +44,11 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
}
}
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
static Params params;
std::string val = params.get("LivestreamEncoderBitrate");
if (val.empty()) return;
int bitrate = std::stoi(val);
for (auto &e : encoders) {
e->set_bitrate(bitrate);
}
}
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
util::set_thread_name(cam_info.thread_name);
std::vector<std::unique_ptr<Encoder>> encoders;
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
[](const auto &ei) { return ei.adaptive_bitrate; });
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
std::unique_ptr<JpegEncoder> jpeg_encoder;
@@ -121,8 +108,6 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
++cur_seg;
}
if (has_adaptive) apply_bitrate(encoders);
// encode a frame
for (int i = 0; i < encoders.size(); ++i) {
int out_id = encoders[i]->encode_frame(buf, &extra);
+5 -9
View File
@@ -47,8 +47,8 @@ struct EncoderSettings {
}
static EncoderSettings StreamEncoderSettings() {
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
}
};
@@ -59,7 +59,6 @@ public:
const char *filename = NULL;
bool record = true;
bool include_audio = false;
bool adaptive_bitrate = false;
int frame_width = -1;
int frame_height = -1;
int fps = MAIN_FPS;
@@ -105,7 +104,6 @@ const EncoderInfo stream_road_encoder_info = {
.publish_name = "livestreamRoadEncodeData",
//.thumbnail_name = "thumbnail",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
};
@@ -113,7 +111,6 @@ const EncoderInfo stream_road_encoder_info = {
const EncoderInfo stream_wide_road_encoder_info = {
.publish_name = "livestreamWideRoadEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
};
@@ -121,7 +118,6 @@ const EncoderInfo stream_wide_road_encoder_info = {
const EncoderInfo stream_driver_encoder_info = {
.publish_name = "livestreamDriverEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
};
@@ -157,19 +153,19 @@ const LogCameraInfo driver_camera_info{
const LogCameraInfo stream_road_camera_info{
.thread_name = "road_cam_encoder",
.stream_type = VISION_STREAM_ROAD,
.encoder_infos = {stream_road_encoder_info},
.encoder_infos = {stream_road_encoder_info}
};
const LogCameraInfo stream_wide_road_camera_info{
.thread_name = "wide_road_cam_encoder",
.stream_type = VISION_STREAM_WIDE_ROAD,
.encoder_infos = {stream_wide_road_encoder_info},
.encoder_infos = {stream_wide_road_encoder_info}
};
const LogCameraInfo stream_driver_camera_info{
.thread_name = "driver_cam_encoder",
.stream_type = VISION_STREAM_DRIVER,
.encoder_infos = {stream_driver_encoder_info},
.encoder_infos = {stream_driver_encoder_info}
};
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
+4 -31
View File
@@ -1,5 +1,4 @@
import asyncio
import struct
import time
import av
@@ -8,13 +7,6 @@ from teleoprtc.tracks import TiciVideoStreamTrack
from cereal import messaging
from openpilot.common.realtime import DT_MDL, DT_DMON
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
TIMING_SEI_UUID = bytes([
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
])
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
camera_to_sock_mapping = {
@@ -27,30 +19,9 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
dt = DT_DMON if camera_type == "driver" else DT_MDL
super().__init__(camera_type, dt)
self._sock = self._make_sock(camera_type)
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
self._pts = 0
self._t0_ns = time.monotonic_ns()
self.timing_sei_enabled = False
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
def switch_camera(self, camera_type: str) -> None:
self._sock = self._make_sock(camera_type)
def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
return encode_data.header + encode_data.data
idx = encode_data.idx
sei_nal = _SEI_PREFIX + struct.pack('>4d',
(idx.timestampEof - idx.timestampSof) / 1e6,
(msg.logMonoTime - idx.timestampEof) / 1e6,
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
time.time() * 1000, # noqa: TID251
) + b'\x80'
return encode_data.header + sei_nal + encode_data.data
async def recv(self):
while True:
@@ -59,7 +30,9 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
break
await asyncio.sleep(0.005)
packet = av.Packet(self._build_frame_data(msg))
evta = getattr(msg, msg.which())
packet = av.Packet(evta.header + evta.data)
packet.time_base = self._time_base
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
+62 -204
View File
@@ -1,11 +1,7 @@
#!/usr/bin/env python3
from abc import abstractmethod
import os
import time
import argparse
import asyncio
import contextlib
import json
import uuid
import logging
@@ -23,39 +19,11 @@ if TYPE_CHECKING:
from aiortc.rtcdatachannel import RTCDataChannel
from openpilot.system.webrtc.schema import generate_field
from openpilot.common.params import Params
from cereal import messaging, log
class AsyncTaskRunner:
def __init__(self):
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
async def stop(self):
if self.task is None:
return
task = self.task
self.task = None
if task.done():
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
@abstractmethod
async def run(self):
pass
class CerealOutgoingMessageProxy(AsyncTaskRunner):
class CerealOutgoingMessageProxy:
def __init__(self, sm: messaging.SubMaster):
super().__init__()
self.sm = sm
self.channels: list[RTCDataChannel] = []
@@ -87,19 +55,6 @@ class CerealOutgoingMessageProxy(AsyncTaskRunner):
for channel in self.channels:
channel.send(encoded_msg)
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class CerealIncomingMessageProxy:
def __init__(self, pm: messaging.PubMaster):
@@ -117,6 +72,37 @@ class CerealIncomingMessageProxy:
self.pm.send(msg_type, msg)
class CerealProxyRunner:
def __init__(self, proxy: CerealOutgoingMessageProxy):
self.proxy = proxy
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
def stop(self):
if self.task is None or self.task.done():
return
self.task.cancel()
self.task = None
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.proxy.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class DynamicPubMaster(messaging.PubMaster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -129,90 +115,21 @@ class DynamicPubMaster(messaging.PubMaster):
self.sock[service] = messaging.pub_sock(service)
class LivestreamBitrateController(AsyncTaskRunner):
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
sample_interval = 0.2
high_level = 0.1 # drop immediately
med_level = 0.05 # drop after # of samples
low_level = 0 # raise after # of samples
down_samples = 5 # 1s
param_name = "LivestreamEncoderBitrate"
def __init__(self, peer_connection: Any):
super().__init__()
self.pc = peer_connection
self.params = Params()
self.level = 0
self.prev_lost, self.prev_sent = None, None
self.counter = 0
self.up_samples = 5 # 1s
self._auto = True
async def run(self):
while True:
await asyncio.sleep(self.sample_interval)
if not self._auto:
continue
loss_rate = await self._sample()
if loss_rate is None:
continue
if loss_rate >= self.med_level and self.level > 0:
self.counter += 1
if self.counter >= self.down_samples or loss_rate >= self.high_level:
self.level -= 1
self.up_samples *= 2 # exponential backoff before raising again
self.counter = 0
self._publish(self.bitrates[self.level])
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
self.counter -= 1
if -self.counter >= self.up_samples:
self.level += 1
self.counter = 0
self._publish(self.bitrates[self.level])
async def _sample(self) -> float | None:
report = await self.pc.getStats()
packets_lost = packets_sent = 0
for s in report.values():
if s.type == "remote-inbound-rtp":
packets_lost += s.packetsLost
elif s.type == "outbound-rtp":
packets_sent += s.packetsSent
if self.prev_lost is None:
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return None
lost_delta = max(0, packets_lost - self.prev_lost)
sent_delta = max(0, packets_sent - self.prev_sent)
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return lost_delta / sent_delta if sent_delta else 0.0
def _publish(self, bitrate: float):
self.params.put(self.param_name, bitrate)
def set_quality(self, quality):
if quality in self.label_to_bitrate:
self._publish(self.label_to_bitrate[quality])
self._auto = False
elif quality == "auto":
self._auto = True
class StreamSession:
shared_pub_master = DynamicPubMaster([])
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
from aiortc.mediastreams import VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc import WebRTCAnswerBuilder
from teleoprtc.info import parse_info_from_offer
config = parse_info_from_offer(sdp)
builder = WebRTCAnswerBuilder(sdp)
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
builder.add_video_stream(init_camera, self.video_track)
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
for cam in cameras:
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
self.stream = builder.stream()
self.identifier = str(uuid.uuid4())
@@ -220,60 +137,35 @@ class StreamSession:
self.incoming_bridge: CerealIncomingMessageProxy | None = None
self.incoming_bridge_services = incoming_services
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
self.bitrate_controller: LivestreamBitrateController | None = None
self.outgoing_bridge_runner: CerealProxyRunner | None = None
if len(incoming_services) > 0:
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
if len(outgoing_services) > 0:
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
self.run_task: asyncio.Task | None = None
self._cleanup_lock = asyncio.Lock()
self._cleanup_done = False
self.logger = logging.getLogger("webrtcd")
self.logger.info(
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
self.identifier, init_camera, incoming_services, outgoing_services,
)
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
self.identifier, cameras, incoming_services, outgoing_services)
def start(self):
self.run_task = asyncio.create_task(self.run())
async def stop(self):
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
self.run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.run_task
def stop(self):
if self.run_task.done():
return
self.run_task.cancel()
self.run_task = None
await self.post_run_cleanup()
asyncio.run(self.post_run_cleanup())
async def get_answer(self):
return await self.stream.start()
def message_handler(self, message: bytes):
async def message_handler(self, message: bytes):
assert self.incoming_bridge is not None
try:
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
if isinstance(payload, dict):
msg_type = payload.get("type")
match msg_type:
case "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
case "livestreamSettings":
self.bitrate_controller.set_quality(payload["data"]["quality"])
case "clockSync":
pong = json.dumps({"type": "clockSync", "data": {
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
}})
self.stream.get_messaging_channel().send(pong)
case "enableTimingSei":
if hasattr(self.video_track, 'timing_sei_enabled'):
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
case _:
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
self.incoming_bridge.send(message)
except Exception:
self.logger.exception("Cereal incoming proxy failure")
@@ -284,38 +176,29 @@ class StreamSession:
if self.incoming_bridge is not None:
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
self.stream.set_message_handler(self.message_handler)
if self.outgoing_bridge is not None:
if self.outgoing_bridge_runner is not None:
channel = self.stream.get_messaging_channel()
self.outgoing_bridge.add_channel(channel)
self.outgoing_bridge.start()
self.bitrate_controller.start()
self.outgoing_bridge_runner.proxy.add_channel(channel)
self.outgoing_bridge_runner.start()
self.logger.info("Stream session (%s) connected", self.identifier)
await self.stream.wait_for_disconnection()
await self.post_run_cleanup()
self.logger.info("Stream session (%s) ended", self.identifier)
except Exception:
self.logger.exception("Stream session failure")
finally:
await self.post_run_cleanup()
async def post_run_cleanup(self):
async with self._cleanup_lock:
if self._cleanup_done:
return
self._cleanup_done = True
if self.bitrate_controller is not None:
await self.bitrate_controller.stop()
if self.outgoing_bridge is not None:
await self.outgoing_bridge.stop()
await self.stream.stop()
await self.stream.stop()
if self.outgoing_bridge is not None:
self.outgoing_bridge_runner.stop()
@dataclass
class StreamRequestBody:
sdp: str
initCamera: str
cameras: list[str]
bridge_services_in: list[str] = field(default_factory=list)
bridge_services_out: list[str] = field(default_factory=list)
@@ -325,33 +208,11 @@ async def get_stream(request: 'web.Request'):
raw_body = await request.json()
body = StreamRequestBody(**raw_body)
async with request.app['stream_lock']:
# Fully disconnect any other active stream before starting the replacement.
for sid, s in list(stream_dict.items()):
if s.run_task and not s.run_task.done():
try:
ch = s.stream.get_messaging_channel()
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
except Exception:
pass
await s.stop()
del stream_dict[sid]
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
answer = await session.get_answer()
session.start()
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
try:
answer = await session.get_answer()
except ValueError as e:
await session.stop()
raise web.HTTPBadRequest(
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
content_type="application/json",
) from e
except Exception:
await session.stop()
raise
session.start()
stream_dict[session.identifier] = session
stream_dict[session.identifier] = session
return web.json_response({"sdp": answer.sdp, "type": answer.type})
@@ -363,7 +224,6 @@ async def get_schema(request: 'web.Request'):
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
return web.json_response(schema_dict)
async def post_notify(request: 'web.Request'):
try:
payload = await request.json()
@@ -379,10 +239,9 @@ async def post_notify(request: 'web.Request'):
return web.Response(status=200, text="OK")
async def on_shutdown(app: 'web.Application'):
for session in app['streams'].values():
await session.stop()
session.stop()
del app['streams']
@@ -395,7 +254,6 @@ def webrtcd_thread(host: str, port: int, debug: bool):
app = web.Application()
app['streams'] = dict()
app['stream_lock'] = asyncio.Lock()
app['debug'] = debug
app.on_shutdown.append(on_shutdown)
app.router.add_post("/stream", get_stream)
+1 -1
View File
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
async def offer(request: 'web.Request'):
params = await request.json()
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
body_json = json.dumps(dataclasses.asdict(body))
logger.info("Sending offer to webrtcd...")