mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-27 22:32:07 +08:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fb724b9612 | |||
| 71e4f251d2 | |||
| dbefa8afbd | |||
| fb54689300 | |||
| db8e56687f | |||
| 88e5e3d23d | |||
| 0b00470999 | |||
| 65dcbf698e | |||
| ac99ce017c | |||
| 508abb227c | |||
| b609622398 | |||
| c9f2756264 | |||
| 3580656d78 | |||
| f973b7fdcb |
+1
-30
@@ -1,34 +1,5 @@
|
||||
sunnypilot Version 2026.002.000 (2026-06-28)
|
||||
sunnypilot Version 2026.002.000 (2026-xx-xx)
|
||||
========================
|
||||
* What's Changed (sunnypilot/sunnypilot)
|
||||
* ui: update gates for certain toggles by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1830
|
||||
* release: ignore upstream IsReleaseBranch by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1831
|
||||
* manager: disable DEVELOPMENT_ONLY reset by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1833
|
||||
* sunnylink: fix max time offroad values by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1835
|
||||
* ui: show default model name by @nayan8teen in https://github.com/sunnypilot/sunnypilot/pull/1837
|
||||
* sunnylink: add CarParams fallback for brand-specific capabilities by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1839
|
||||
* sunnylink SDUI: tweak DisableUpdate param for clarity by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1842
|
||||
* Revert "DM: Lancia Delta HF Integrale model" by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1849
|
||||
* modeld_v2: safe model validation by @Discountchubbs in https://github.com/sunnypilot/sunnypilot/pull/1855
|
||||
* Revert "deprecate `carState.brake`" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/sunnypilot/pull/1860
|
||||
* sunnylink: deprecate legacy params metadata by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1862
|
||||
* ui: reset Enforce Torque Control and NNLC if both are enabled by @sunnyhaibin in https://github.com/sunnypilot/sunnypilot/pull/1863
|
||||
* What's Changed (sunnypilot/opendbc)
|
||||
* Rivian: suppress ACM hold-the-wheel warning during MADS-only lateral by @lukasloetkolben in https://github.com/sunnypilot/opendbc/pull/465
|
||||
* Sync: `commaai/opendbc:master` → `sunnypilot/opendbc:master` by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/479
|
||||
* safety: add option to ignore frequency check for RX checks by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/480
|
||||
* Revert "deprecate carState.brake" for Honda Gas Interceptor by @mvl-boston in https://github.com/sunnypilot/opendbc/pull/481
|
||||
* New Contributors (sunnypilot/sunnypilot)
|
||||
* @mvl-boston made their first contribution in https://github.com/sunnypilot/sunnypilot/pull/1860
|
||||
* Full Changelog: https://github.com/sunnypilot/sunnypilot/compare/v2026.001.007...v2026.002.000
|
||||
************************
|
||||
* Synced with commaai's openpilot (v0.11.1)
|
||||
* master commit 69e2c321e49760e52f7983eaa0a5f77cb95de637 (June 02, 2026)
|
||||
* New driver monitoring model
|
||||
* Improved image processing pipeline for driver camera
|
||||
* Improved thermal policy for comma four
|
||||
* Acura MDX 2022-24 support thanks to mvl-boston!
|
||||
* Rivian R1S and R1T 2025 support thanks to lukasloetkolben!
|
||||
|
||||
sunnypilot Version 2026.001.000 (2026-05-06)
|
||||
========================
|
||||
|
||||
@@ -80,6 +80,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
|
||||
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
|
||||
{"LiveParameters", {PERSISTENT, JSON}},
|
||||
{"LiveParametersV2", {PERSISTENT, BYTES}},
|
||||
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
|
||||
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
|
||||
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
|
||||
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
|
||||
|
||||
@@ -20,6 +20,7 @@ from openpilot.system.ui.widgets.list_view import button_item
|
||||
|
||||
from openpilot.system.ui.sunnypilot.widgets.html_render import HtmlModalSP
|
||||
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp
|
||||
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.external_storage import external_storage_item
|
||||
|
||||
PREBUILT_PATH = os.path.join(Paths.comma_home(), "prebuilt") if PC else "/data/openpilot/prebuilt"
|
||||
|
||||
@@ -52,7 +53,11 @@ class DeveloperLayoutSP(DeveloperLayout):
|
||||
|
||||
self.error_log_btn = button_item(tr("Error Log"), tr("VIEW"), tr("View the error log for sunnypilot crashes."), callback=self._on_error_log_clicked)
|
||||
|
||||
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle, self.error_log_btn,]
|
||||
self.external_storage = external_storage_item(tr("External Storage"), description=tr("Extend your comma device's storage by inserting a USB drive " +
|
||||
"into the aux port."))
|
||||
|
||||
self.items: list = [self.show_advanced_controls, self.enable_github_runner_toggle, self.enable_copyparty_toggle, self.prebuilt_toggle,
|
||||
self.external_storage, self.error_log_btn,]
|
||||
|
||||
@staticmethod
|
||||
def _on_prebuilt_toggled(state):
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
"""
|
||||
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
|
||||
|
||||
This file is part of sunnypilot and is licensed under the MIT License.
|
||||
See the LICENSE.md file in the root directory for more details.
|
||||
"""
|
||||
import pyray as rl
|
||||
import threading
|
||||
import subprocess
|
||||
import copy
|
||||
from enum import Enum
|
||||
from collections.abc import Callable
|
||||
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.system.hardware import PC
|
||||
from openpilot.system.ui.lib.application import gui_app, FontWeight
|
||||
from openpilot.system.ui.lib.text_measure import measure_text_cached
|
||||
from openpilot.system.ui.lib.multilang import tr, tr_noop
|
||||
from openpilot.system.ui.widgets import DialogResult
|
||||
from openpilot.system.ui.widgets.button import Button, ButtonStyle
|
||||
from openpilot.system.ui.widgets.confirm_dialog import alert_dialog, ConfirmDialog
|
||||
from openpilot.system.ui.widgets.list_view import (
|
||||
ItemAction,
|
||||
ListItem,
|
||||
BUTTON_HEIGHT,
|
||||
BUTTON_BORDER_RADIUS,
|
||||
BUTTON_FONT_SIZE,
|
||||
BUTTON_WIDTH,
|
||||
)
|
||||
|
||||
VALUE_FONT_SIZE = 48
|
||||
|
||||
|
||||
class ExternalStorageState(Enum):
|
||||
DISABLED = tr_noop("DISABLED")
|
||||
LOADING = tr_noop("LOADING")
|
||||
CHECK = tr_noop("CHECK")
|
||||
MOUNT = tr_noop("MOUNT")
|
||||
UNMOUNT = tr_noop("UNMOUNT")
|
||||
FORMAT = tr_noop("FORMAT")
|
||||
|
||||
|
||||
class ExternalStorageAction(ItemAction):
|
||||
MAX_WIDTH = 500
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(self.MAX_WIDTH, True)
|
||||
|
||||
self._params = Params()
|
||||
self._error_message = ""
|
||||
self._text_font = gui_app.font(FontWeight.NORMAL)
|
||||
|
||||
self._button = Button(
|
||||
"",
|
||||
click_callback=self._handle_button_click,
|
||||
button_style=ButtonStyle.LIST_ACTION,
|
||||
border_radius=BUTTON_BORDER_RADIUS,
|
||||
font_size=BUTTON_FONT_SIZE,
|
||||
)
|
||||
|
||||
self._value_text = ""
|
||||
self._formatting = False
|
||||
self._refresh_pending = False
|
||||
|
||||
self._state = ExternalStorageState.CHECK
|
||||
self._refresh_state()
|
||||
self.refresh()
|
||||
|
||||
def set_touch_valid_callback(self, callback):
|
||||
def wrapped():
|
||||
if self._state == ExternalStorageState.DISABLED:
|
||||
return False
|
||||
return callback()
|
||||
super().set_touch_valid_callback(wrapped)
|
||||
self._button.set_touch_valid_callback(wrapped)
|
||||
|
||||
def _run(self, cmd: str) -> bool:
|
||||
return subprocess.call(["sh", "-c", cmd]) == 0
|
||||
|
||||
def _run_output(self, cmd: str) -> str:
|
||||
try:
|
||||
out = subprocess.check_output(["sh", "-c", cmd], universal_newlines=True)
|
||||
return out.strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def _render(self, rect: rl.Rectangle) -> bool:
|
||||
if self._error_message:
|
||||
msg = copy.copy(self._error_message)
|
||||
gui_app.set_modal_overlay(alert_dialog(msg))
|
||||
self._error_message = ""
|
||||
|
||||
if self._value_text:
|
||||
text_size = measure_text_cached(self._text_font, self._value_text, VALUE_FONT_SIZE)
|
||||
rl.draw_text_ex(
|
||||
self._text_font,
|
||||
self._value_text,
|
||||
(rect.x + rect.width - BUTTON_WIDTH - text_size.x - 30,
|
||||
rect.y + (rect.height - text_size.y) / 2),
|
||||
VALUE_FONT_SIZE,
|
||||
1.0,
|
||||
rl.Color(170, 170, 170, 255),
|
||||
)
|
||||
|
||||
button_rect = rl.Rectangle(
|
||||
rect.x + rect.width - BUTTON_WIDTH,
|
||||
rect.y + (rect.height - BUTTON_HEIGHT) / 2,
|
||||
BUTTON_WIDTH,
|
||||
BUTTON_HEIGHT
|
||||
)
|
||||
self._button.set_rect(button_rect)
|
||||
self._button.set_text(tr(self._state.value))
|
||||
self._button.set_enabled(self._state not in (ExternalStorageState.LOADING,
|
||||
ExternalStorageState.DISABLED))
|
||||
self._button.render(button_rect)
|
||||
return False
|
||||
|
||||
def _refresh_state(self):
|
||||
if PC:
|
||||
self._state = ExternalStorageState.DISABLED
|
||||
self._button.set_enabled(False)
|
||||
self._value_text = ""
|
||||
|
||||
def debounced_refresh(self):
|
||||
if self._refresh_pending:
|
||||
return
|
||||
self._refresh_pending = True
|
||||
|
||||
def _timer():
|
||||
import time
|
||||
time.sleep(0.25)
|
||||
self._refresh_pending = False
|
||||
self.refresh()
|
||||
|
||||
threading.Thread(target=_timer, daemon=True).start()
|
||||
|
||||
def refresh(self):
|
||||
def _work():
|
||||
is_mounted = self._run("findmnt -n /mnt/external_realdata")
|
||||
has_drive = self._run("lsblk -f /dev/sdg")
|
||||
has_fs = self._run("lsblk -f /dev/sdg1 | grep -q ext4")
|
||||
has_label = self._run("blkid /dev/sdg1 | grep -q 'LABEL=\"openpilot\"'")
|
||||
|
||||
info = ""
|
||||
if is_mounted and has_label:
|
||||
info = self._run_output(
|
||||
"df -h /mnt/external_realdata | awk 'NR==2 {print $3 \"/\" $2}'"
|
||||
)
|
||||
|
||||
def apply():
|
||||
if self._formatting:
|
||||
self._value_text = tr("formatting")
|
||||
self._state = ExternalStorageState.FORMAT
|
||||
return
|
||||
|
||||
if not has_drive:
|
||||
self._value_text = tr("insert drive")
|
||||
self._state = ExternalStorageState.CHECK
|
||||
|
||||
elif not has_fs or not has_label:
|
||||
self._value_text = tr("needs format")
|
||||
self._state = ExternalStorageState.FORMAT
|
||||
|
||||
elif is_mounted:
|
||||
self._value_text = info
|
||||
self._state = ExternalStorageState.UNMOUNT
|
||||
|
||||
else:
|
||||
self._value_text = tr("drive detected")
|
||||
self._state = ExternalStorageState.MOUNT
|
||||
|
||||
apply()
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
|
||||
def _handle_button_click(self):
|
||||
st = self._state
|
||||
|
||||
if st == ExternalStorageState.DISABLED:
|
||||
return
|
||||
|
||||
if st in (ExternalStorageState.CHECK, ExternalStorageState.MOUNT):
|
||||
self.mount_storage()
|
||||
|
||||
elif st == ExternalStorageState.UNMOUNT:
|
||||
self.unmount_storage()
|
||||
|
||||
elif st == ExternalStorageState.FORMAT:
|
||||
dialog = ConfirmDialog(
|
||||
tr("Are you sure you want to format this drive? This will erase all data."),
|
||||
confirm_text=tr("Format"),
|
||||
cancel_text=tr("Cancel"),
|
||||
)
|
||||
gui_app.set_modal_overlay(dialog, callback=self._confirm_format)
|
||||
|
||||
def _confirm_format(self, result: DialogResult):
|
||||
if result == DialogResult.CONFIRM:
|
||||
self.format_storage()
|
||||
|
||||
def mount_storage(self):
|
||||
self._value_text = tr("mounting")
|
||||
self._state = ExternalStorageState.LOADING
|
||||
|
||||
def _work():
|
||||
cmd = """
|
||||
sudo mount -o remount,rw / &&
|
||||
sudo mkdir -p /mnt/external_realdata &&
|
||||
(grep -q '/dev/sdg1 /mnt/external_realdata' /etc/fstab ||
|
||||
echo '/dev/sdg1 /mnt/external_realdata ext4 defaults,nofail 0 2' >> /etc/fstab) &&
|
||||
sudo systemctl daemon-reexec &&
|
||||
sudo mount /mnt/external_realdata &&
|
||||
sudo chown -R comma:comma /mnt/external_realdata &&
|
||||
sudo chmod -R 775 /mnt/external_realdata &&
|
||||
sudo mount -o remount,ro /
|
||||
"""
|
||||
subprocess.call(["sh", "-c", cmd])
|
||||
self.debounced_refresh()
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
|
||||
def unmount_storage(self):
|
||||
self._value_text = tr("unmounting")
|
||||
self._state = ExternalStorageState.LOADING
|
||||
|
||||
def _work():
|
||||
subprocess.call(["sh", "-c", "sudo umount /mnt/external_realdata"])
|
||||
self.debounced_refresh()
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
|
||||
def format_storage(self):
|
||||
self._formatting = True
|
||||
self._value_text = tr("formatting")
|
||||
self._state = ExternalStorageState.LOADING
|
||||
|
||||
def _work():
|
||||
cmd = """
|
||||
sudo wipefs -a /dev/sdg &&
|
||||
sudo parted -s /dev/sdg mklabel gpt mkpart primary ext4 0% 100% &&
|
||||
sudo mkfs.ext4 -F -L openpilot /dev/sdg1
|
||||
"""
|
||||
exitcode = subprocess.call(["sh", "-c", cmd])
|
||||
|
||||
def apply():
|
||||
self._formatting = False
|
||||
if exitcode == 0:
|
||||
self.mount_storage()
|
||||
else:
|
||||
self._value_text = tr("needs format")
|
||||
self._state = ExternalStorageState.FORMAT
|
||||
|
||||
apply()
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
|
||||
def external_storage_item(title: str | Callable[[], str], description: str | Callable[[], str]) -> ListItem:
|
||||
return ListItem(
|
||||
title=title,
|
||||
description=description,
|
||||
action_item=ExternalStorageAction()
|
||||
)
|
||||
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
|
||||
create_connection)
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
from cereal import car, log
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.common.api import Api, get_key_pair
|
||||
from openpilot.common.utils import CallbackReader, get_upload_stream
|
||||
@@ -45,6 +45,7 @@ from openpilot.system.hardware.hw import Paths
|
||||
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
|
||||
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
|
||||
LOCAL_PORT_WHITELIST = {22, } # SSH
|
||||
WEBRTCD_PORT = 5001
|
||||
|
||||
LOG_ATTR_NAME = 'user.upload'
|
||||
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
|
||||
@@ -567,6 +568,16 @@ def getSshAuthorizedKeys() -> str:
|
||||
def getGithubUsername() -> str:
|
||||
return cast(str, Params().get("GithubUsername") or "")
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getNotCar() -> bool:
|
||||
cp_bytes = Params().get("CarParamsPersistent")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
return CP.notCar
|
||||
return False
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getSimInfo():
|
||||
return HARDWARE.get_sim_info()
|
||||
@@ -588,6 +599,35 @@ def getNetworks():
|
||||
return HARDWARE.get_networks()
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def startStream(sdp: str) -> dict:
|
||||
from openpilot.system.webrtc.webrtcd import StreamRequestBody
|
||||
bridge_services_in = []
|
||||
|
||||
# get live car params to avoid stale notCar edge case
|
||||
cp_bytes = Params().get("CarParams")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
if CP.notCar:
|
||||
bridge_services_in.append("testJoystick")
|
||||
|
||||
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
|
||||
try:
|
||||
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
|
||||
json=asdict(body), timeout=10)
|
||||
if not resp.ok:
|
||||
try:
|
||||
error_body = resp.json()
|
||||
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
|
||||
except ValueError:
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.ConnectTimeout as e:
|
||||
raise Exception("webrtc took too long to respond. is it on?") from e
|
||||
except requests.ConnectionError as e:
|
||||
raise Exception("webrtc is not running. turn on comma body ignition.") from e
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def takeSnapshot() -> str | dict[str, str] | None:
|
||||
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
|
||||
|
||||
@@ -313,9 +313,6 @@ class Tici(HardwareBase):
|
||||
continue
|
||||
gov = 'ondemand' if powersave_enabled else 'performance'
|
||||
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
|
||||
if not powersave_enabled:
|
||||
# cap max core freq to 1689 Mhz
|
||||
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
|
||||
|
||||
# *** IRQ config ***
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ public:
|
||||
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
|
||||
virtual void encoder_open() = 0;
|
||||
virtual void encoder_close() = 0;
|
||||
virtual void set_bitrate(int bitrate) = 0;
|
||||
|
||||
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
|
||||
|
||||
|
||||
@@ -72,6 +72,10 @@ void FfmpegEncoder::encoder_close() {
|
||||
is_open = false;
|
||||
}
|
||||
|
||||
void FfmpegEncoder::set_bitrate(int bitrate) {
|
||||
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
|
||||
}
|
||||
|
||||
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
|
||||
assert(buf->width == this->in_width);
|
||||
assert(buf->height == this->in_height);
|
||||
|
||||
@@ -21,6 +21,7 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int segment_num = -1;
|
||||
|
||||
@@ -155,6 +155,8 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
|
||||
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
|
||||
|
||||
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
|
||||
current_bitrate = encoder_settings.bitrate;
|
||||
adaptive_bitrate = encoder_info.adaptive_bitrate;
|
||||
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
|
||||
|
||||
struct v4l2_format fmt_out = {
|
||||
@@ -304,6 +306,25 @@ void V4LEncoder::encoder_close() {
|
||||
this->is_open = false;
|
||||
}
|
||||
|
||||
void V4LEncoder::set_bitrate(int bitrate) {
|
||||
if (!adaptive_bitrate || bitrate == current_bitrate) return;
|
||||
if (bitrate <= 0) {
|
||||
LOGE("invalid livestream encoder bitrate %d", bitrate);
|
||||
return;
|
||||
}
|
||||
|
||||
struct v4l2_control ctrl = {
|
||||
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
|
||||
.value = bitrate,
|
||||
};
|
||||
|
||||
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
|
||||
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
|
||||
return;
|
||||
}
|
||||
current_bitrate = bitrate;
|
||||
}
|
||||
|
||||
V4LEncoder::~V4LEncoder() {
|
||||
encoder_close();
|
||||
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
|
||||
|
||||
@@ -13,6 +13,7 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int fd;
|
||||
@@ -20,6 +21,8 @@ private:
|
||||
bool is_open = false;
|
||||
int segment_num = -1;
|
||||
int counter = 0;
|
||||
int current_bitrate = -1;
|
||||
bool adaptive_bitrate;
|
||||
|
||||
SafeQueue<VisionIpcBufExtra> extras;
|
||||
|
||||
|
||||
@@ -44,11 +44,24 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
|
||||
}
|
||||
}
|
||||
|
||||
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
|
||||
static Params params;
|
||||
std::string val = params.get("LivestreamEncoderBitrate");
|
||||
if (val.empty()) return;
|
||||
int bitrate = std::stoi(val);
|
||||
for (auto &e : encoders) {
|
||||
e->set_bitrate(bitrate);
|
||||
}
|
||||
}
|
||||
|
||||
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
util::set_thread_name(cam_info.thread_name);
|
||||
|
||||
std::vector<std::unique_ptr<Encoder>> encoders;
|
||||
|
||||
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
|
||||
[](const auto &ei) { return ei.adaptive_bitrate; });
|
||||
|
||||
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
|
||||
|
||||
std::unique_ptr<JpegEncoder> jpeg_encoder;
|
||||
@@ -108,6 +121,8 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
++cur_seg;
|
||||
}
|
||||
|
||||
if (has_adaptive) apply_bitrate(encoders);
|
||||
|
||||
// encode a frame
|
||||
for (int i = 0; i < encoders.size(); ++i) {
|
||||
int out_id = encoders[i]->encode_frame(buf, &extra);
|
||||
|
||||
@@ -47,8 +47,8 @@ struct EncoderSettings {
|
||||
}
|
||||
|
||||
static EncoderSettings StreamEncoderSettings() {
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
|
||||
}
|
||||
};
|
||||
|
||||
@@ -59,6 +59,7 @@ public:
|
||||
const char *filename = NULL;
|
||||
bool record = true;
|
||||
bool include_audio = false;
|
||||
bool adaptive_bitrate = false;
|
||||
int frame_width = -1;
|
||||
int frame_height = -1;
|
||||
int fps = MAIN_FPS;
|
||||
@@ -104,6 +105,7 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
.publish_name = "livestreamRoadEncodeData",
|
||||
//.thumbnail_name = "thumbnail",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
|
||||
};
|
||||
@@ -111,6 +113,7 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
const EncoderInfo stream_wide_road_encoder_info = {
|
||||
.publish_name = "livestreamWideRoadEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
|
||||
};
|
||||
@@ -118,6 +121,7 @@ const EncoderInfo stream_wide_road_encoder_info = {
|
||||
const EncoderInfo stream_driver_encoder_info = {
|
||||
.publish_name = "livestreamDriverEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
|
||||
};
|
||||
@@ -153,19 +157,19 @@ const LogCameraInfo driver_camera_info{
|
||||
const LogCameraInfo stream_road_camera_info{
|
||||
.thread_name = "road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_ROAD,
|
||||
.encoder_infos = {stream_road_encoder_info}
|
||||
.encoder_infos = {stream_road_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_wide_road_camera_info{
|
||||
.thread_name = "wide_road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_WIDE_ROAD,
|
||||
.encoder_infos = {stream_wide_road_encoder_info}
|
||||
.encoder_infos = {stream_wide_road_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_driver_camera_info{
|
||||
.thread_name = "driver_cam_encoder",
|
||||
.stream_type = VISION_STREAM_DRIVER,
|
||||
.encoder_infos = {stream_driver_encoder_info}
|
||||
.encoder_infos = {stream_driver_encoder_info},
|
||||
};
|
||||
|
||||
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import struct
|
||||
import time
|
||||
|
||||
import av
|
||||
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
|
||||
from cereal import messaging
|
||||
from openpilot.common.realtime import DT_MDL, DT_DMON
|
||||
|
||||
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
|
||||
TIMING_SEI_UUID = bytes([
|
||||
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
|
||||
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
|
||||
])
|
||||
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
|
||||
|
||||
|
||||
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
camera_to_sock_mapping = {
|
||||
@@ -19,9 +27,30 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
dt = DT_DMON if camera_type == "driver" else DT_MDL
|
||||
super().__init__(camera_type, dt)
|
||||
|
||||
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
self._sock = self._make_sock(camera_type)
|
||||
self._pts = 0
|
||||
self._t0_ns = time.monotonic_ns()
|
||||
self.timing_sei_enabled = False
|
||||
|
||||
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
|
||||
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
|
||||
def switch_camera(self, camera_type: str) -> None:
|
||||
self._sock = self._make_sock(camera_type)
|
||||
|
||||
def _build_frame_data(self, msg) -> bytes:
|
||||
encode_data = getattr(msg, msg.which())
|
||||
if not self.timing_sei_enabled:
|
||||
return encode_data.header + encode_data.data
|
||||
|
||||
idx = encode_data.idx
|
||||
sei_nal = _SEI_PREFIX + struct.pack('>4d',
|
||||
(idx.timestampEof - idx.timestampSof) / 1e6,
|
||||
(msg.logMonoTime - idx.timestampEof) / 1e6,
|
||||
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
|
||||
time.time() * 1000, # noqa: TID251
|
||||
) + b'\x80'
|
||||
return encode_data.header + sei_nal + encode_data.data
|
||||
|
||||
async def recv(self):
|
||||
while True:
|
||||
@@ -30,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
break
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
evta = getattr(msg, msg.which())
|
||||
|
||||
packet = av.Packet(evta.header + evta.data)
|
||||
packet = av.Packet(self._build_frame_data(msg))
|
||||
packet.time_base = self._time_base
|
||||
|
||||
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
|
||||
|
||||
+204
-62
@@ -1,7 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from abc import abstractmethod
|
||||
import os
|
||||
import time
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import uuid
|
||||
import logging
|
||||
@@ -19,11 +23,39 @@ if TYPE_CHECKING:
|
||||
from aiortc.rtcdatachannel import RTCDataChannel
|
||||
|
||||
from openpilot.system.webrtc.schema import generate_field
|
||||
from openpilot.common.params import Params
|
||||
from cereal import messaging, log
|
||||
|
||||
|
||||
class CerealOutgoingMessageProxy:
|
||||
class AsyncTaskRunner:
|
||||
def __init__(self):
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
async def stop(self):
|
||||
if self.task is None:
|
||||
return
|
||||
task = self.task
|
||||
self.task = None
|
||||
if task.done():
|
||||
return
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
@abstractmethod
|
||||
async def run(self):
|
||||
pass
|
||||
|
||||
|
||||
class CerealOutgoingMessageProxy(AsyncTaskRunner):
|
||||
def __init__(self, sm: messaging.SubMaster):
|
||||
super().__init__()
|
||||
self.sm = sm
|
||||
self.channels: list[RTCDataChannel] = []
|
||||
|
||||
@@ -55,6 +87,19 @@ class CerealOutgoingMessageProxy:
|
||||
for channel in self.channels:
|
||||
channel.send(encoded_msg)
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class CerealIncomingMessageProxy:
|
||||
def __init__(self, pm: messaging.PubMaster):
|
||||
@@ -72,37 +117,6 @@ class CerealIncomingMessageProxy:
|
||||
self.pm.send(msg_type, msg)
|
||||
|
||||
|
||||
class CerealProxyRunner:
|
||||
def __init__(self, proxy: CerealOutgoingMessageProxy):
|
||||
self.proxy = proxy
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
def stop(self):
|
||||
if self.task is None or self.task.done():
|
||||
return
|
||||
self.task.cancel()
|
||||
self.task = None
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.proxy.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class DynamicPubMaster(messaging.PubMaster):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@@ -115,21 +129,90 @@ class DynamicPubMaster(messaging.PubMaster):
|
||||
self.sock[service] = messaging.pub_sock(service)
|
||||
|
||||
|
||||
class LivestreamBitrateController(AsyncTaskRunner):
|
||||
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
|
||||
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
|
||||
sample_interval = 0.2
|
||||
high_level = 0.1 # drop immediately
|
||||
med_level = 0.05 # drop after # of samples
|
||||
low_level = 0 # raise after # of samples
|
||||
down_samples = 5 # 1s
|
||||
param_name = "LivestreamEncoderBitrate"
|
||||
|
||||
def __init__(self, peer_connection: Any):
|
||||
super().__init__()
|
||||
self.pc = peer_connection
|
||||
self.params = Params()
|
||||
|
||||
self.level = 0
|
||||
self.prev_lost, self.prev_sent = None, None
|
||||
self.counter = 0
|
||||
self.up_samples = 5 # 1s
|
||||
self._auto = True
|
||||
|
||||
async def run(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.sample_interval)
|
||||
if not self._auto:
|
||||
continue
|
||||
|
||||
loss_rate = await self._sample()
|
||||
if loss_rate is None:
|
||||
continue
|
||||
if loss_rate >= self.med_level and self.level > 0:
|
||||
self.counter += 1
|
||||
if self.counter >= self.down_samples or loss_rate >= self.high_level:
|
||||
self.level -= 1
|
||||
self.up_samples *= 2 # exponential backoff before raising again
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
|
||||
self.counter -= 1
|
||||
if -self.counter >= self.up_samples:
|
||||
self.level += 1
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
|
||||
async def _sample(self) -> float | None:
|
||||
report = await self.pc.getStats()
|
||||
packets_lost = packets_sent = 0
|
||||
for s in report.values():
|
||||
if s.type == "remote-inbound-rtp":
|
||||
packets_lost += s.packetsLost
|
||||
elif s.type == "outbound-rtp":
|
||||
packets_sent += s.packetsSent
|
||||
|
||||
if self.prev_lost is None:
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return None
|
||||
lost_delta = max(0, packets_lost - self.prev_lost)
|
||||
sent_delta = max(0, packets_sent - self.prev_sent)
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return lost_delta / sent_delta if sent_delta else 0.0
|
||||
|
||||
def _publish(self, bitrate: float):
|
||||
self.params.put(self.param_name, bitrate)
|
||||
|
||||
def set_quality(self, quality):
|
||||
if quality in self.label_to_bitrate:
|
||||
self._publish(self.label_to_bitrate[quality])
|
||||
self._auto = False
|
||||
elif quality == "auto":
|
||||
self._auto = True
|
||||
|
||||
|
||||
class StreamSession:
|
||||
shared_pub_master = DynamicPubMaster([])
|
||||
|
||||
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
from aiortc.mediastreams import VideoStreamTrack
|
||||
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
|
||||
from teleoprtc import WebRTCAnswerBuilder
|
||||
from teleoprtc.info import parse_info_from_offer
|
||||
|
||||
config = parse_info_from_offer(sdp)
|
||||
builder = WebRTCAnswerBuilder(sdp)
|
||||
|
||||
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
|
||||
for cam in cameras:
|
||||
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
|
||||
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
|
||||
builder.add_video_stream(init_camera, self.video_track)
|
||||
|
||||
self.stream = builder.stream()
|
||||
self.identifier = str(uuid.uuid4())
|
||||
@@ -137,35 +220,60 @@ class StreamSession:
|
||||
self.incoming_bridge: CerealIncomingMessageProxy | None = None
|
||||
self.incoming_bridge_services = incoming_services
|
||||
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
|
||||
self.outgoing_bridge_runner: CerealProxyRunner | None = None
|
||||
self.bitrate_controller: LivestreamBitrateController | None = None
|
||||
if len(incoming_services) > 0:
|
||||
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
|
||||
if len(outgoing_services) > 0:
|
||||
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
|
||||
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
|
||||
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
|
||||
|
||||
self.run_task: asyncio.Task | None = None
|
||||
self._cleanup_lock = asyncio.Lock()
|
||||
self._cleanup_done = False
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, cameras, incoming_services, outgoing_services)
|
||||
self.logger.info(
|
||||
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, init_camera, incoming_services, outgoing_services,
|
||||
)
|
||||
|
||||
def start(self):
|
||||
self.run_task = asyncio.create_task(self.run())
|
||||
|
||||
def stop(self):
|
||||
if self.run_task.done():
|
||||
return
|
||||
self.run_task.cancel()
|
||||
async def stop(self):
|
||||
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
|
||||
self.run_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self.run_task
|
||||
self.run_task = None
|
||||
asyncio.run(self.post_run_cleanup())
|
||||
await self.post_run_cleanup()
|
||||
|
||||
async def get_answer(self):
|
||||
return await self.stream.start()
|
||||
|
||||
async def message_handler(self, message: bytes):
|
||||
def message_handler(self, message: bytes):
|
||||
assert self.incoming_bridge is not None
|
||||
try:
|
||||
self.incoming_bridge.send(message)
|
||||
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
|
||||
if isinstance(payload, dict):
|
||||
msg_type = payload.get("type")
|
||||
|
||||
match msg_type:
|
||||
case "livestreamCameraSwitch":
|
||||
self.video_track.switch_camera(payload["data"]["camera"])
|
||||
case "livestreamSettings":
|
||||
self.bitrate_controller.set_quality(payload["data"]["quality"])
|
||||
case "clockSync":
|
||||
pong = json.dumps({"type": "clockSync", "data": {
|
||||
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
|
||||
}})
|
||||
self.stream.get_messaging_channel().send(pong)
|
||||
case "enableTimingSei":
|
||||
if hasattr(self.video_track, 'timing_sei_enabled'):
|
||||
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
|
||||
case _:
|
||||
if payload.get("type") not in self.incoming_bridge_services:
|
||||
return
|
||||
self.incoming_bridge.send(message)
|
||||
except Exception:
|
||||
self.logger.exception("Cereal incoming proxy failure")
|
||||
|
||||
@@ -176,29 +284,38 @@ class StreamSession:
|
||||
if self.incoming_bridge is not None:
|
||||
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
|
||||
self.stream.set_message_handler(self.message_handler)
|
||||
if self.outgoing_bridge_runner is not None:
|
||||
if self.outgoing_bridge is not None:
|
||||
channel = self.stream.get_messaging_channel()
|
||||
self.outgoing_bridge_runner.proxy.add_channel(channel)
|
||||
self.outgoing_bridge_runner.start()
|
||||
self.outgoing_bridge.add_channel(channel)
|
||||
self.outgoing_bridge.start()
|
||||
self.bitrate_controller.start()
|
||||
|
||||
self.logger.info("Stream session (%s) connected", self.identifier)
|
||||
|
||||
await self.stream.wait_for_disconnection()
|
||||
await self.post_run_cleanup()
|
||||
|
||||
self.logger.info("Stream session (%s) ended", self.identifier)
|
||||
except Exception:
|
||||
self.logger.exception("Stream session failure")
|
||||
finally:
|
||||
await self.post_run_cleanup()
|
||||
|
||||
async def post_run_cleanup(self):
|
||||
await self.stream.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
self.outgoing_bridge_runner.stop()
|
||||
async with self._cleanup_lock:
|
||||
if self._cleanup_done:
|
||||
return
|
||||
self._cleanup_done = True
|
||||
if self.bitrate_controller is not None:
|
||||
await self.bitrate_controller.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
await self.outgoing_bridge.stop()
|
||||
await self.stream.stop()
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamRequestBody:
|
||||
sdp: str
|
||||
cameras: list[str]
|
||||
initCamera: str
|
||||
bridge_services_in: list[str] = field(default_factory=list)
|
||||
bridge_services_out: list[str] = field(default_factory=list)
|
||||
|
||||
@@ -208,11 +325,33 @@ async def get_stream(request: 'web.Request'):
|
||||
raw_body = await request.json()
|
||||
body = StreamRequestBody(**raw_body)
|
||||
|
||||
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
answer = await session.get_answer()
|
||||
session.start()
|
||||
async with request.app['stream_lock']:
|
||||
# Fully disconnect any other active stream before starting the replacement.
|
||||
for sid, s in list(stream_dict.items()):
|
||||
if s.run_task and not s.run_task.done():
|
||||
try:
|
||||
ch = s.stream.get_messaging_channel()
|
||||
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
|
||||
except Exception:
|
||||
pass
|
||||
await s.stop()
|
||||
del stream_dict[sid]
|
||||
|
||||
stream_dict[session.identifier] = session
|
||||
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
try:
|
||||
answer = await session.get_answer()
|
||||
except ValueError as e:
|
||||
await session.stop()
|
||||
raise web.HTTPBadRequest(
|
||||
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
|
||||
content_type="application/json",
|
||||
) from e
|
||||
except Exception:
|
||||
await session.stop()
|
||||
raise
|
||||
session.start()
|
||||
|
||||
stream_dict[session.identifier] = session
|
||||
|
||||
return web.json_response({"sdp": answer.sdp, "type": answer.type})
|
||||
|
||||
@@ -224,6 +363,7 @@ async def get_schema(request: 'web.Request'):
|
||||
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
|
||||
return web.json_response(schema_dict)
|
||||
|
||||
|
||||
async def post_notify(request: 'web.Request'):
|
||||
try:
|
||||
payload = await request.json()
|
||||
@@ -239,9 +379,10 @@ async def post_notify(request: 'web.Request'):
|
||||
|
||||
return web.Response(status=200, text="OK")
|
||||
|
||||
|
||||
async def on_shutdown(app: 'web.Application'):
|
||||
for session in app['streams'].values():
|
||||
session.stop()
|
||||
await session.stop()
|
||||
del app['streams']
|
||||
|
||||
|
||||
@@ -254,6 +395,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
|
||||
app = web.Application()
|
||||
|
||||
app['streams'] = dict()
|
||||
app['stream_lock'] = asyncio.Lock()
|
||||
app['debug'] = debug
|
||||
app.on_shutdown.append(on_shutdown)
|
||||
app.router.add_post("/stream", get_stream)
|
||||
|
||||
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
|
||||
|
||||
async def offer(request: 'web.Request'):
|
||||
params = await request.json()
|
||||
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
|
||||
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
|
||||
body_json = json.dumps(dataclasses.asdict(body))
|
||||
|
||||
logger.info("Sending offer to webrtcd...")
|
||||
|
||||
Reference in New Issue
Block a user