remote teleop frame timing support (#38014)

* athenad and webrtcd updates

* remove feature stream services from webrtcd split

* stream encoder thread

* reduce diff

* wire webrtc to livestream camera encoder

* request livestream camera switch service

* add frame timing headers

* rfctr

* diff

* clean

* remove camera list in favour of init camera field

* remove cors

* clean

* remove unused

* remove extra try except

* remove video_tracks unused

* add back exception trace

* add stream road camera info to stream cameras

* fix

* add back assert

* clean diff

* clean diff

* add testJoystick only on body

* fix camera list

* remove reference to future service

* video_tracks list add back

* encode all cameras and swap in video track in webrtc

* clean

* explicitly gate bridge send

* clean leftover

* rearrange video.py

* fix lint
This commit is contained in:
stef
2026-05-22 19:30:48 -07:00
committed by GitHub
parent 9844075bb2
commit 1f227bbe20
2 changed files with 48 additions and 8 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import struct
import time
import av
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
from cereal import messaging
from openpilot.common.realtime import DT_MDL, DT_DMON
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
TIMING_SEI_UUID = bytes([
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
])
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
camera_to_sock_mapping = {
@@ -22,6 +30,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
self._sock = self._make_sock(camera_type)
self._pts = 0
self._t0_ns = time.monotonic_ns()
self.timing_sei_enabled = False
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
@@ -29,6 +38,20 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
def switch_camera(self, camera_type: str) -> None:
self._sock = self._make_sock(camera_type)
def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
return encode_data.header + encode_data.data
idx = encode_data.idx
sei_nal = _SEI_PREFIX + struct.pack('>4d',
(idx.timestampEof - idx.timestampSof) / 1e6,
(msg.logMonoTime - idx.timestampEof) / 1e6,
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
time.time() * 1000, # noqa: TID251
) + b'\x80'
return encode_data.header + sei_nal + encode_data.data
async def recv(self):
while True:
msg = messaging.recv_one_or_none(self._sock)
@@ -36,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
break
await asyncio.sleep(0.005)
evta = getattr(msg, msg.which())
packet = av.Packet(evta.header + evta.data)
packet = av.Packet(self._build_frame_data(msg))
packet.time_base = self._time_base
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import time
import argparse
import asyncio
import contextlib
@@ -173,12 +174,30 @@ class StreamSession:
def message_handler(self, message: bytes):
assert self.incoming_bridge is not None
try:
msg_json = json.loads(message)
if msg_json.get("type") == "livestreamCameraSwitch" and hasattr(self.video_track, "switch_camera"):
self.video_track.switch_camera(msg_json["data"]["camera"])
return
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
if isinstance(payload, dict):
msg_type = payload.get("type")
if msg_json.get("type") not in self.incoming_bridge_services:
if msg_type == "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
return
if msg_type == "clockSync":
data = payload.get("data", {})
pong = json.dumps({"type": "clockSync", "data": {
"action": "pong", "browserSendTime": data.get("browserSendTime"), "deviceTime": time.time() * 1000, # noqa: TID251
}})
self.stream.get_messaging_channel().send(pong)
return
if msg_type == "enableTimingSei":
enabled = bool(payload.get("data", {}).get("enabled"))
for track in self.video_tracks:
if hasattr(track, 'timing_sei_enabled'):
track.timing_sei_enabled = enabled
return
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
except Exception: