mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-29 17:52:07 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 10f33ae9ea | |||
| 5553cafba5 | |||
| 64280eafb0 | |||
| 97b7088a38 | |||
| fb9e8f4fd9 | |||
| 1e66d34eb2 | |||
| 4fa26af61b | |||
| f03c9dd5aa | |||
| 0846ab723e | |||
| 9526c749ce | |||
| 0b6fddf995 | |||
| 3cc89bc52a |
@@ -35,36 +35,18 @@ jobs:
|
||||
- name: Init sunnypilot opendbc submodule
|
||||
run: git submodule update --init --depth 1 opendbc_repo
|
||||
|
||||
- name: Checkout upstream openpilot
|
||||
- name: Checkout upstream openpilot cereal
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
repository: 'commaai/openpilot'
|
||||
path: upstream_openpilot
|
||||
sparse-checkout: cereal
|
||||
ref: "refs/heads/master"
|
||||
|
||||
- name: Init upstream opendbc submodule
|
||||
working-directory: upstream_openpilot
|
||||
run: git submodule update --init --depth 1 opendbc_repo
|
||||
|
||||
- name: Locate upstream capnp paths
|
||||
id: locate-capnp
|
||||
run: |
|
||||
CEREAL_DIR=$(find upstream_openpilot -maxdepth 4 -name log.capnp -path '*/cereal/log.capnp' -printf '%h\n' -quit)
|
||||
if [ -z "$CEREAL_DIR" ]; then
|
||||
echo "::error::Could not locate cereal/log.capnp in upstream openpilot"
|
||||
exit 1
|
||||
fi
|
||||
echo "cereal_dir=$CEREAL_DIR" >> "$GITHUB_OUTPUT"
|
||||
echo "Found upstream cereal at: $CEREAL_DIR"
|
||||
|
||||
IMPORT_ARGS=""
|
||||
CAR_CAPNP=$(find upstream_openpilot -maxdepth 5 -name car.capnp -path '*/opendbc/car/car.capnp' -printf '%h\n' -quit)
|
||||
if [ -n "$CAR_CAPNP" ]; then
|
||||
IMPORT_ARGS="-I $CAR_CAPNP"
|
||||
echo "Found car.capnp at: $CAR_CAPNP"
|
||||
fi
|
||||
echo "import_args=$IMPORT_ARGS" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Install uv
|
||||
run: pip install uv
|
||||
|
||||
@@ -80,5 +62,4 @@ jobs:
|
||||
PYCAPNP_VER=$(python3 -c "import re; m=re.search(r'name = \"pycapnp\"\nversion = \"([^\"]+)\"', open('uv.lock').read()); print(m.group(1))")
|
||||
uv run --isolated --with "pycapnp==${PYCAPNP_VER}" \
|
||||
python3 cereal/messaging/tests/validate_sp_cereal_upstream.py \
|
||||
-r -f /tmp/sp_schema.json --cereal-dir ${{ steps.locate-capnp.outputs.cereal_dir }} \
|
||||
${{ steps.locate-capnp.outputs.import_args }}
|
||||
-r -f /tmp/sp_schema.json --cereal-dir upstream_openpilot/cereal
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
sunnypilot Version 2026.002.001 (2026-06-28)
|
||||
========================
|
||||
* What's Changed
|
||||
* safety: ignore frequency check for Toyota UNSUPPORTED_DSU cars by @sunnyhaibin in https://github.com/sunnypilot/opendbc/pull/489
|
||||
|
||||
sunnypilot Version 2026.002.000 (2026-06-28)
|
||||
========================
|
||||
* What's Changed (sunnypilot/sunnypilot)
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Validate sunnypilot routes are parseable by stock commaai/openpilot.
|
||||
"""Schema-level cereal compat check between sunnypilot and upstream openpilot.
|
||||
|
||||
Cap'n Proto is wire-compatible across renames, type relocations, and
|
||||
additive fields. The only breaking change is a union variant that
|
||||
upstream doesn't recognize — an unknown discriminant makes the entire
|
||||
union unreadable.
|
||||
|
||||
This script checks: for every struct with a union that exists in both
|
||||
schemas, does sunnypilot introduce union variants upstream doesn't have?
|
||||
Rules (per struct matched across sides by typeId):
|
||||
R1 shared ordinal must reference the same type.
|
||||
R2 sunnypilot-only ordinal in a union -> FAIL (unknown discriminant upstream).
|
||||
R3 sunnypilot-only ordinal on a regular field -> OK (additive struct evolution).
|
||||
R4 upstream-only ordinal -> OK.
|
||||
R5 sunnypilot-only struct referenced via an upstream-shared field -> FAIL.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -25,19 +24,46 @@ def hex_id(value: int) -> str:
|
||||
return f"0x{value:016x}"
|
||||
|
||||
|
||||
def encode_type(type_node: Any) -> dict:
|
||||
which = type_node.which()
|
||||
if which == "struct":
|
||||
return {"kind": "struct", "typeId": hex_id(type_node.struct.typeId)}
|
||||
if which == "enum":
|
||||
return {"kind": "enum", "typeId": hex_id(type_node.enum.typeId)}
|
||||
if which == "interface":
|
||||
return {"kind": "interface", "typeId": hex_id(type_node.interface.typeId)}
|
||||
if which == "list":
|
||||
return {"kind": "list", "element": encode_type(type_node.list.elementType)}
|
||||
if which == "anyPointer":
|
||||
return {"kind": "anyPointer"}
|
||||
return {"kind": which}
|
||||
|
||||
|
||||
def encode_field(name: str, field: Any) -> dict:
|
||||
proto = field.proto
|
||||
ordinal = proto.ordinal.explicit if proto.ordinal.which() == "explicit" else None
|
||||
discriminant = proto.discriminantValue if proto.discriminantValue != NO_DISCRIMINANT else None
|
||||
|
||||
if proto.which() == "group":
|
||||
type_desc = {"kind": "group", "typeId": hex_id(proto.group.typeId)}
|
||||
else:
|
||||
type_desc = encode_type(proto.slot.type)
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"ordinal": ordinal,
|
||||
"discriminant": discriminant,
|
||||
"type": type_desc,
|
||||
}
|
||||
|
||||
|
||||
def encode_struct(schema: Any) -> dict:
|
||||
node = schema.node
|
||||
fields = []
|
||||
for name, field in schema.fields.items():
|
||||
proto = field.proto
|
||||
ordinal = proto.ordinal.explicit if proto.ordinal.which() == "explicit" else None
|
||||
discriminant = proto.discriminantValue if proto.discriminantValue != NO_DISCRIMINANT else None
|
||||
fields.append({"name": name, "ordinal": ordinal, "discriminant": discriminant})
|
||||
return {
|
||||
"typeId": hex_id(node.id),
|
||||
"displayName": node.displayName,
|
||||
"hasUnion": node.struct.discriminantCount > 0,
|
||||
"fields": fields,
|
||||
"fields": [encode_field(name, field) for name, field in schema.fields.items()],
|
||||
}
|
||||
|
||||
|
||||
@@ -79,16 +105,15 @@ def collect_schema(root: Any) -> dict[str, dict]:
|
||||
return structs
|
||||
|
||||
|
||||
def load_log(cereal_dir: str, extra_imports: list[str] | None = None) -> Any:
|
||||
def load_log(cereal_dir: str) -> Any:
|
||||
import capnp
|
||||
cereal_dir = os.path.abspath(cereal_dir)
|
||||
capnp.remove_import_hook()
|
||||
imports = [cereal_dir] + [os.path.abspath(p) for p in (extra_imports or [])]
|
||||
return capnp.load(os.path.join(cereal_dir, "log.capnp"), imports=imports)
|
||||
return capnp.load(os.path.join(cereal_dir, "log.capnp"), imports=[cereal_dir])
|
||||
|
||||
|
||||
def dump_schema(cereal_dir: str, path: str, extra_imports: list[str] | None = None) -> None:
|
||||
log = load_log(cereal_dir, extra_imports)
|
||||
def dump_schema(cereal_dir: str, path: str) -> None:
|
||||
log = load_log(cereal_dir)
|
||||
payload = {
|
||||
"root": hex_id(log.Event.schema.node.id),
|
||||
"structs": collect_schema(log.Event.schema),
|
||||
@@ -98,37 +123,100 @@ def dump_schema(cereal_dir: str, path: str, extra_imports: list[str] | None = No
|
||||
print(f"wrote schema dump with {len(payload['structs'])} structs to {path}")
|
||||
|
||||
|
||||
def types_equal(a: dict, b: dict) -> bool:
|
||||
if a.get("kind") != b.get("kind"):
|
||||
return False
|
||||
kind = a["kind"]
|
||||
if kind in ("struct", "enum", "interface", "group"):
|
||||
return a.get("typeId") == b.get("typeId")
|
||||
if kind == "list":
|
||||
return types_equal(a["element"], b["element"])
|
||||
return True
|
||||
|
||||
|
||||
def type_repr(t: dict) -> str:
|
||||
kind = t.get("kind", "?")
|
||||
if kind in ("struct", "enum", "interface", "group"):
|
||||
return f"{kind}({t.get('typeId')})"
|
||||
if kind == "list":
|
||||
return f"list<{type_repr(t['element'])}>"
|
||||
return kind
|
||||
|
||||
|
||||
def field_is_union_variant(field: dict) -> bool:
|
||||
return field.get("discriminant") is not None
|
||||
|
||||
|
||||
def index_fields_by_ordinal(struct: dict) -> dict[int, dict]:
|
||||
indexed: dict[int, dict] = {}
|
||||
for field in struct["fields"]:
|
||||
ordinal = field.get("ordinal")
|
||||
if ordinal is None:
|
||||
continue
|
||||
indexed[ordinal] = field
|
||||
return indexed
|
||||
|
||||
|
||||
def compare(sunnypilot_dump: dict, upstream_dump: dict) -> list[str]:
|
||||
violations: list[str] = []
|
||||
sunnypilot_structs = sunnypilot_dump["structs"]
|
||||
upstream_structs = upstream_dump["structs"]
|
||||
sunnypilot_structs: dict[str, dict] = sunnypilot_dump["structs"]
|
||||
upstream_structs: dict[str, dict] = upstream_dump["structs"]
|
||||
|
||||
for type_id, sp_struct in sunnypilot_structs.items():
|
||||
if not sp_struct["hasUnion"]:
|
||||
continue
|
||||
up_struct = upstream_structs.get(type_id)
|
||||
if up_struct is None:
|
||||
sunnypilot_struct_referenced_from_shared: set[str] = set()
|
||||
|
||||
for type_id, sunnypilot_struct in sunnypilot_structs.items():
|
||||
upstream_struct = upstream_structs.get(type_id)
|
||||
if upstream_struct is None:
|
||||
continue
|
||||
|
||||
up_ordinals = {f["ordinal"] for f in up_struct["fields"] if f.get("discriminant") is not None}
|
||||
display = sp_struct["displayName"]
|
||||
sunnypilot_fields = index_fields_by_ordinal(sunnypilot_struct)
|
||||
upstream_fields = index_fields_by_ordinal(upstream_struct)
|
||||
display = sunnypilot_struct["displayName"]
|
||||
|
||||
for field in sp_struct["fields"]:
|
||||
if field.get("discriminant") is None:
|
||||
for ordinal, sunnypilot_field in sunnypilot_fields.items():
|
||||
upstream_field = upstream_fields.get(ordinal)
|
||||
if upstream_field is None:
|
||||
if field_is_union_variant(sunnypilot_field):
|
||||
violations.append(
|
||||
f"[R2] {display} @{ordinal} ('{sunnypilot_field['name']}', {type_repr(sunnypilot_field['type'])}): "
|
||||
f"union variant not present upstream. upstream cannot parse this discriminant."
|
||||
)
|
||||
continue
|
||||
if field["ordinal"] not in up_ordinals:
|
||||
|
||||
if not types_equal(sunnypilot_field["type"], upstream_field["type"]):
|
||||
violations.append(
|
||||
f"{display} @{field['ordinal']} '{field['name']}': "
|
||||
f"union variant not present upstream (discriminant={field['discriminant']})"
|
||||
f"[R1] {display} @{ordinal}: type mismatch. "
|
||||
f"sunnypilot='{sunnypilot_field['name']}' {type_repr(sunnypilot_field['type'])} vs "
|
||||
f"upstream='{upstream_field['name']}' {type_repr(upstream_field['type'])}."
|
||||
)
|
||||
continue
|
||||
|
||||
cursor = sunnypilot_field["type"]
|
||||
while cursor.get("kind") == "list":
|
||||
cursor = cursor["element"]
|
||||
if cursor.get("kind") in ("struct", "group", "interface") and cursor.get("typeId"):
|
||||
sunnypilot_struct_referenced_from_shared.add(cursor["typeId"])
|
||||
|
||||
for type_id, sunnypilot_struct in sunnypilot_structs.items():
|
||||
if type_id in upstream_structs:
|
||||
continue
|
||||
if type_id in sunnypilot_struct_referenced_from_shared:
|
||||
violations.append(
|
||||
f"[R5] struct {sunnypilot_struct['displayName']} ({type_id}) exists only on sunnypilot "
|
||||
f"but is referenced from an upstream-shared field. upstream cannot resolve this type."
|
||||
)
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def run_read(cereal_dir: str, peer_path: str, extra_imports: list[str] | None = None) -> int:
|
||||
log = load_log(cereal_dir, extra_imports)
|
||||
with open(peer_path, "r", encoding="utf-8") as f:
|
||||
peer_dump = json.load(f)
|
||||
def load_peer(path: str) -> dict:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return json.load(handle)
|
||||
|
||||
|
||||
def run_read(cereal_dir: str, peer_path: str) -> int:
|
||||
log = load_log(cereal_dir)
|
||||
peer_dump = load_peer(peer_path)
|
||||
local_dump = {
|
||||
"root": hex_id(log.Event.schema.node.id),
|
||||
"structs": collect_schema(log.Event.schema),
|
||||
@@ -136,29 +224,32 @@ def run_read(cereal_dir: str, peer_path: str, extra_imports: list[str] | None =
|
||||
violations = compare(sunnypilot_dump=peer_dump, upstream_dump=local_dump)
|
||||
|
||||
if not violations:
|
||||
print("cereal compat OK: upstream can parse sunnypilot routes.")
|
||||
print("cereal compat OK: upstream openpilot can parse sunnypilot routes "
|
||||
"(no leaked structs, no ordinal collisions).")
|
||||
return 0
|
||||
|
||||
print(f"cereal compat FAIL ({len(violations)} leaked union variant(s)):")
|
||||
print(f"cereal compat FAIL: upstream openpilot would misparse sunnypilot routes "
|
||||
f"({len(violations)} violation(s)):")
|
||||
for v in violations:
|
||||
print(f" {v}")
|
||||
return 1
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="sunnypilot cereal upstream compat check")
|
||||
parser = argparse.ArgumentParser(
|
||||
description="sunnypilot <-> upstream cereal compatibility validator (schema-level)."
|
||||
)
|
||||
mode = parser.add_mutually_exclusive_group(required=True)
|
||||
mode.add_argument("-g", "--generate", action="store_true", help="dump local schema to JSON")
|
||||
mode.add_argument("-r", "--read", action="store_true", help="validate against peer schema")
|
||||
parser.add_argument("-f", "--file", default="schema.json", help="JSON file path")
|
||||
parser.add_argument("--cereal-dir", required=True, help="path to cereal directory")
|
||||
parser.add_argument("-I", "--import-path", action="append", default=[], help="extra capnp import paths")
|
||||
mode.add_argument("-r", "--read", action="store_true", help="load peer JSON and diff against local")
|
||||
parser.add_argument("-f", "--file", default="schema.json", help="JSON file path (default: schema.json)")
|
||||
parser.add_argument("--cereal-dir", required=True, help="path to cereal directory containing log.capnp")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.generate:
|
||||
dump_schema(args.cereal_dir, args.file, args.import_path)
|
||||
dump_schema(args.cereal_dir, args.file)
|
||||
return 0
|
||||
return run_read(args.cereal_dir, args.file, args.import_path)
|
||||
return run_read(args.cereal_dir, args.file)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -80,7 +80,6 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
|
||||
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
|
||||
{"LiveParameters", {PERSISTENT, JSON}},
|
||||
{"LiveParametersV2", {PERSISTENT, BYTES}},
|
||||
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
|
||||
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
|
||||
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
|
||||
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
|
||||
|
||||
+1
-1
Submodule opendbc_repo updated: b9712d20ef...380abecc99
@@ -1 +1 @@
|
||||
#define SUNNYPILOT_VERSION "2026.002.000"
|
||||
#define SUNNYPILOT_VERSION "2026.002.001"
|
||||
|
||||
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
|
||||
create_connection)
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import car, log
|
||||
from cereal import log
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.common.api import Api, get_key_pair
|
||||
from openpilot.common.utils import CallbackReader, get_upload_stream
|
||||
@@ -45,7 +45,6 @@ from openpilot.system.hardware.hw import Paths
|
||||
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
|
||||
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
|
||||
LOCAL_PORT_WHITELIST = {22, } # SSH
|
||||
WEBRTCD_PORT = 5001
|
||||
|
||||
LOG_ATTR_NAME = 'user.upload'
|
||||
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
|
||||
@@ -568,16 +567,6 @@ def getSshAuthorizedKeys() -> str:
|
||||
def getGithubUsername() -> str:
|
||||
return cast(str, Params().get("GithubUsername") or "")
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getNotCar() -> bool:
|
||||
cp_bytes = Params().get("CarParamsPersistent")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
return CP.notCar
|
||||
return False
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getSimInfo():
|
||||
return HARDWARE.get_sim_info()
|
||||
@@ -599,35 +588,6 @@ def getNetworks():
|
||||
return HARDWARE.get_networks()
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def startStream(sdp: str) -> dict:
|
||||
from openpilot.system.webrtc.webrtcd import StreamRequestBody
|
||||
bridge_services_in = []
|
||||
|
||||
# get live car params to avoid stale notCar edge case
|
||||
cp_bytes = Params().get("CarParams")
|
||||
if cp_bytes is not None:
|
||||
with car.CarParams.from_bytes(cp_bytes) as CP:
|
||||
if CP.notCar:
|
||||
bridge_services_in.append("testJoystick")
|
||||
|
||||
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
|
||||
try:
|
||||
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
|
||||
json=asdict(body), timeout=10)
|
||||
if not resp.ok:
|
||||
try:
|
||||
error_body = resp.json()
|
||||
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
|
||||
except ValueError:
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.ConnectTimeout as e:
|
||||
raise Exception("webrtc took too long to respond. is it on?") from e
|
||||
except requests.ConnectionError as e:
|
||||
raise Exception("webrtc is not running. turn on comma body ignition.") from e
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def takeSnapshot() -> str | dict[str, str] | None:
|
||||
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
|
||||
|
||||
@@ -313,6 +313,9 @@ class Tici(HardwareBase):
|
||||
continue
|
||||
gov = 'ondemand' if powersave_enabled else 'performance'
|
||||
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
|
||||
if not powersave_enabled:
|
||||
# cap max core freq to 1689 Mhz
|
||||
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
|
||||
|
||||
# *** IRQ config ***
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ public:
|
||||
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
|
||||
virtual void encoder_open() = 0;
|
||||
virtual void encoder_close() = 0;
|
||||
virtual void set_bitrate(int bitrate) = 0;
|
||||
|
||||
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
|
||||
|
||||
|
||||
@@ -72,10 +72,6 @@ void FfmpegEncoder::encoder_close() {
|
||||
is_open = false;
|
||||
}
|
||||
|
||||
void FfmpegEncoder::set_bitrate(int bitrate) {
|
||||
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
|
||||
}
|
||||
|
||||
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
|
||||
assert(buf->width == this->in_width);
|
||||
assert(buf->height == this->in_height);
|
||||
|
||||
@@ -21,7 +21,6 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int segment_num = -1;
|
||||
|
||||
@@ -155,8 +155,6 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
|
||||
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
|
||||
|
||||
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
|
||||
current_bitrate = encoder_settings.bitrate;
|
||||
adaptive_bitrate = encoder_info.adaptive_bitrate;
|
||||
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
|
||||
|
||||
struct v4l2_format fmt_out = {
|
||||
@@ -306,25 +304,6 @@ void V4LEncoder::encoder_close() {
|
||||
this->is_open = false;
|
||||
}
|
||||
|
||||
void V4LEncoder::set_bitrate(int bitrate) {
|
||||
if (!adaptive_bitrate || bitrate == current_bitrate) return;
|
||||
if (bitrate <= 0) {
|
||||
LOGE("invalid livestream encoder bitrate %d", bitrate);
|
||||
return;
|
||||
}
|
||||
|
||||
struct v4l2_control ctrl = {
|
||||
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
|
||||
.value = bitrate,
|
||||
};
|
||||
|
||||
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
|
||||
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
|
||||
return;
|
||||
}
|
||||
current_bitrate = bitrate;
|
||||
}
|
||||
|
||||
V4LEncoder::~V4LEncoder() {
|
||||
encoder_close();
|
||||
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
|
||||
|
||||
@@ -13,7 +13,6 @@ public:
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open();
|
||||
void encoder_close();
|
||||
void set_bitrate(int bitrate);
|
||||
|
||||
private:
|
||||
int fd;
|
||||
@@ -21,8 +20,6 @@ private:
|
||||
bool is_open = false;
|
||||
int segment_num = -1;
|
||||
int counter = 0;
|
||||
int current_bitrate = -1;
|
||||
bool adaptive_bitrate;
|
||||
|
||||
SafeQueue<VisionIpcBufExtra> extras;
|
||||
|
||||
|
||||
@@ -44,24 +44,11 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
|
||||
}
|
||||
}
|
||||
|
||||
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
|
||||
static Params params;
|
||||
std::string val = params.get("LivestreamEncoderBitrate");
|
||||
if (val.empty()) return;
|
||||
int bitrate = std::stoi(val);
|
||||
for (auto &e : encoders) {
|
||||
e->set_bitrate(bitrate);
|
||||
}
|
||||
}
|
||||
|
||||
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
util::set_thread_name(cam_info.thread_name);
|
||||
|
||||
std::vector<std::unique_ptr<Encoder>> encoders;
|
||||
|
||||
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
|
||||
[](const auto &ei) { return ei.adaptive_bitrate; });
|
||||
|
||||
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
|
||||
|
||||
std::unique_ptr<JpegEncoder> jpeg_encoder;
|
||||
@@ -121,8 +108,6 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
++cur_seg;
|
||||
}
|
||||
|
||||
if (has_adaptive) apply_bitrate(encoders);
|
||||
|
||||
// encode a frame
|
||||
for (int i = 0; i < encoders.size(); ++i) {
|
||||
int out_id = encoders[i]->encode_frame(buf, &extra);
|
||||
|
||||
@@ -47,8 +47,8 @@ struct EncoderSettings {
|
||||
}
|
||||
|
||||
static EncoderSettings StreamEncoderSettings() {
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
|
||||
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
|
||||
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
|
||||
}
|
||||
};
|
||||
|
||||
@@ -59,7 +59,6 @@ public:
|
||||
const char *filename = NULL;
|
||||
bool record = true;
|
||||
bool include_audio = false;
|
||||
bool adaptive_bitrate = false;
|
||||
int frame_width = -1;
|
||||
int frame_height = -1;
|
||||
int fps = MAIN_FPS;
|
||||
@@ -105,7 +104,6 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
.publish_name = "livestreamRoadEncodeData",
|
||||
//.thumbnail_name = "thumbnail",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
|
||||
};
|
||||
@@ -113,7 +111,6 @@ const EncoderInfo stream_road_encoder_info = {
|
||||
const EncoderInfo stream_wide_road_encoder_info = {
|
||||
.publish_name = "livestreamWideRoadEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
|
||||
};
|
||||
@@ -121,7 +118,6 @@ const EncoderInfo stream_wide_road_encoder_info = {
|
||||
const EncoderInfo stream_driver_encoder_info = {
|
||||
.publish_name = "livestreamDriverEncodeData",
|
||||
.record = false,
|
||||
.adaptive_bitrate = true,
|
||||
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
|
||||
};
|
||||
@@ -157,19 +153,19 @@ const LogCameraInfo driver_camera_info{
|
||||
const LogCameraInfo stream_road_camera_info{
|
||||
.thread_name = "road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_ROAD,
|
||||
.encoder_infos = {stream_road_encoder_info},
|
||||
.encoder_infos = {stream_road_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_wide_road_camera_info{
|
||||
.thread_name = "wide_road_cam_encoder",
|
||||
.stream_type = VISION_STREAM_WIDE_ROAD,
|
||||
.encoder_infos = {stream_wide_road_encoder_info},
|
||||
.encoder_infos = {stream_wide_road_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_driver_camera_info{
|
||||
.thread_name = "driver_cam_encoder",
|
||||
.stream_type = VISION_STREAM_DRIVER,
|
||||
.encoder_infos = {stream_driver_encoder_info},
|
||||
.encoder_infos = {stream_driver_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import struct
|
||||
import time
|
||||
|
||||
import av
|
||||
@@ -8,13 +7,6 @@ from teleoprtc.tracks import TiciVideoStreamTrack
|
||||
from cereal import messaging
|
||||
from openpilot.common.realtime import DT_MDL, DT_DMON
|
||||
|
||||
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
|
||||
TIMING_SEI_UUID = bytes([
|
||||
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
|
||||
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
|
||||
])
|
||||
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
|
||||
|
||||
|
||||
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
camera_to_sock_mapping = {
|
||||
@@ -27,30 +19,9 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
dt = DT_DMON if camera_type == "driver" else DT_MDL
|
||||
super().__init__(camera_type, dt)
|
||||
|
||||
self._sock = self._make_sock(camera_type)
|
||||
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
self._pts = 0
|
||||
self._t0_ns = time.monotonic_ns()
|
||||
self.timing_sei_enabled = False
|
||||
|
||||
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
|
||||
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
|
||||
|
||||
def switch_camera(self, camera_type: str) -> None:
|
||||
self._sock = self._make_sock(camera_type)
|
||||
|
||||
def _build_frame_data(self, msg) -> bytes:
|
||||
encode_data = getattr(msg, msg.which())
|
||||
if not self.timing_sei_enabled:
|
||||
return encode_data.header + encode_data.data
|
||||
|
||||
idx = encode_data.idx
|
||||
sei_nal = _SEI_PREFIX + struct.pack('>4d',
|
||||
(idx.timestampEof - idx.timestampSof) / 1e6,
|
||||
(msg.logMonoTime - idx.timestampEof) / 1e6,
|
||||
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
|
||||
time.time() * 1000, # noqa: TID251
|
||||
) + b'\x80'
|
||||
return encode_data.header + sei_nal + encode_data.data
|
||||
|
||||
async def recv(self):
|
||||
while True:
|
||||
@@ -59,7 +30,9 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
|
||||
break
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
packet = av.Packet(self._build_frame_data(msg))
|
||||
evta = getattr(msg, msg.which())
|
||||
|
||||
packet = av.Packet(evta.header + evta.data)
|
||||
packet.time_base = self._time_base
|
||||
|
||||
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
|
||||
|
||||
+62
-204
@@ -1,11 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from abc import abstractmethod
|
||||
import os
|
||||
import time
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import uuid
|
||||
import logging
|
||||
@@ -23,39 +19,11 @@ if TYPE_CHECKING:
|
||||
from aiortc.rtcdatachannel import RTCDataChannel
|
||||
|
||||
from openpilot.system.webrtc.schema import generate_field
|
||||
from openpilot.common.params import Params
|
||||
from cereal import messaging, log
|
||||
|
||||
|
||||
class AsyncTaskRunner:
|
||||
def __init__(self):
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
async def stop(self):
|
||||
if self.task is None:
|
||||
return
|
||||
task = self.task
|
||||
self.task = None
|
||||
if task.done():
|
||||
return
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
@abstractmethod
|
||||
async def run(self):
|
||||
pass
|
||||
|
||||
|
||||
class CerealOutgoingMessageProxy(AsyncTaskRunner):
|
||||
class CerealOutgoingMessageProxy:
|
||||
def __init__(self, sm: messaging.SubMaster):
|
||||
super().__init__()
|
||||
self.sm = sm
|
||||
self.channels: list[RTCDataChannel] = []
|
||||
|
||||
@@ -87,19 +55,6 @@ class CerealOutgoingMessageProxy(AsyncTaskRunner):
|
||||
for channel in self.channels:
|
||||
channel.send(encoded_msg)
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class CerealIncomingMessageProxy:
|
||||
def __init__(self, pm: messaging.PubMaster):
|
||||
@@ -117,6 +72,37 @@ class CerealIncomingMessageProxy:
|
||||
self.pm.send(msg_type, msg)
|
||||
|
||||
|
||||
class CerealProxyRunner:
|
||||
def __init__(self, proxy: CerealOutgoingMessageProxy):
|
||||
self.proxy = proxy
|
||||
self.is_running = False
|
||||
self.task = None
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
|
||||
def start(self):
|
||||
assert self.task is None
|
||||
self.task = asyncio.create_task(self.run())
|
||||
|
||||
def stop(self):
|
||||
if self.task is None or self.task.done():
|
||||
return
|
||||
self.task.cancel()
|
||||
self.task = None
|
||||
|
||||
async def run(self):
|
||||
from aiortc.exceptions import InvalidStateError
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.proxy.update()
|
||||
except InvalidStateError:
|
||||
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Cereal outgoing proxy failure")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
class DynamicPubMaster(messaging.PubMaster):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@@ -129,90 +115,21 @@ class DynamicPubMaster(messaging.PubMaster):
|
||||
self.sock[service] = messaging.pub_sock(service)
|
||||
|
||||
|
||||
class LivestreamBitrateController(AsyncTaskRunner):
|
||||
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
|
||||
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
|
||||
sample_interval = 0.2
|
||||
high_level = 0.1 # drop immediately
|
||||
med_level = 0.05 # drop after # of samples
|
||||
low_level = 0 # raise after # of samples
|
||||
down_samples = 5 # 1s
|
||||
param_name = "LivestreamEncoderBitrate"
|
||||
|
||||
def __init__(self, peer_connection: Any):
|
||||
super().__init__()
|
||||
self.pc = peer_connection
|
||||
self.params = Params()
|
||||
|
||||
self.level = 0
|
||||
self.prev_lost, self.prev_sent = None, None
|
||||
self.counter = 0
|
||||
self.up_samples = 5 # 1s
|
||||
self._auto = True
|
||||
|
||||
async def run(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.sample_interval)
|
||||
if not self._auto:
|
||||
continue
|
||||
|
||||
loss_rate = await self._sample()
|
||||
if loss_rate is None:
|
||||
continue
|
||||
if loss_rate >= self.med_level and self.level > 0:
|
||||
self.counter += 1
|
||||
if self.counter >= self.down_samples or loss_rate >= self.high_level:
|
||||
self.level -= 1
|
||||
self.up_samples *= 2 # exponential backoff before raising again
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
|
||||
self.counter -= 1
|
||||
if -self.counter >= self.up_samples:
|
||||
self.level += 1
|
||||
self.counter = 0
|
||||
self._publish(self.bitrates[self.level])
|
||||
|
||||
async def _sample(self) -> float | None:
|
||||
report = await self.pc.getStats()
|
||||
packets_lost = packets_sent = 0
|
||||
for s in report.values():
|
||||
if s.type == "remote-inbound-rtp":
|
||||
packets_lost += s.packetsLost
|
||||
elif s.type == "outbound-rtp":
|
||||
packets_sent += s.packetsSent
|
||||
|
||||
if self.prev_lost is None:
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return None
|
||||
lost_delta = max(0, packets_lost - self.prev_lost)
|
||||
sent_delta = max(0, packets_sent - self.prev_sent)
|
||||
self.prev_lost, self.prev_sent = packets_lost, packets_sent
|
||||
return lost_delta / sent_delta if sent_delta else 0.0
|
||||
|
||||
def _publish(self, bitrate: float):
|
||||
self.params.put(self.param_name, bitrate)
|
||||
|
||||
def set_quality(self, quality):
|
||||
if quality in self.label_to_bitrate:
|
||||
self._publish(self.label_to_bitrate[quality])
|
||||
self._auto = False
|
||||
elif quality == "auto":
|
||||
self._auto = True
|
||||
|
||||
|
||||
class StreamSession:
|
||||
shared_pub_master = DynamicPubMaster([])
|
||||
|
||||
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
|
||||
from aiortc.mediastreams import VideoStreamTrack
|
||||
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
|
||||
from teleoprtc import WebRTCAnswerBuilder
|
||||
from teleoprtc.info import parse_info_from_offer
|
||||
|
||||
config = parse_info_from_offer(sdp)
|
||||
builder = WebRTCAnswerBuilder(sdp)
|
||||
|
||||
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
|
||||
builder.add_video_stream(init_camera, self.video_track)
|
||||
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
|
||||
for cam in cameras:
|
||||
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
|
||||
|
||||
self.stream = builder.stream()
|
||||
self.identifier = str(uuid.uuid4())
|
||||
@@ -220,60 +137,35 @@ class StreamSession:
|
||||
self.incoming_bridge: CerealIncomingMessageProxy | None = None
|
||||
self.incoming_bridge_services = incoming_services
|
||||
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
|
||||
self.bitrate_controller: LivestreamBitrateController | None = None
|
||||
self.outgoing_bridge_runner: CerealProxyRunner | None = None
|
||||
if len(incoming_services) > 0:
|
||||
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
|
||||
if len(outgoing_services) > 0:
|
||||
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
|
||||
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
|
||||
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
|
||||
|
||||
self.run_task: asyncio.Task | None = None
|
||||
self._cleanup_lock = asyncio.Lock()
|
||||
self._cleanup_done = False
|
||||
self.logger = logging.getLogger("webrtcd")
|
||||
self.logger.info(
|
||||
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, init_camera, incoming_services, outgoing_services,
|
||||
)
|
||||
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
|
||||
self.identifier, cameras, incoming_services, outgoing_services)
|
||||
|
||||
def start(self):
|
||||
self.run_task = asyncio.create_task(self.run())
|
||||
|
||||
async def stop(self):
|
||||
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
|
||||
self.run_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self.run_task
|
||||
def stop(self):
|
||||
if self.run_task.done():
|
||||
return
|
||||
self.run_task.cancel()
|
||||
self.run_task = None
|
||||
await self.post_run_cleanup()
|
||||
asyncio.run(self.post_run_cleanup())
|
||||
|
||||
async def get_answer(self):
|
||||
return await self.stream.start()
|
||||
|
||||
def message_handler(self, message: bytes):
|
||||
async def message_handler(self, message: bytes):
|
||||
assert self.incoming_bridge is not None
|
||||
try:
|
||||
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
|
||||
if isinstance(payload, dict):
|
||||
msg_type = payload.get("type")
|
||||
|
||||
match msg_type:
|
||||
case "livestreamCameraSwitch":
|
||||
self.video_track.switch_camera(payload["data"]["camera"])
|
||||
case "livestreamSettings":
|
||||
self.bitrate_controller.set_quality(payload["data"]["quality"])
|
||||
case "clockSync":
|
||||
pong = json.dumps({"type": "clockSync", "data": {
|
||||
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
|
||||
}})
|
||||
self.stream.get_messaging_channel().send(pong)
|
||||
case "enableTimingSei":
|
||||
if hasattr(self.video_track, 'timing_sei_enabled'):
|
||||
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
|
||||
case _:
|
||||
if payload.get("type") not in self.incoming_bridge_services:
|
||||
return
|
||||
self.incoming_bridge.send(message)
|
||||
self.incoming_bridge.send(message)
|
||||
except Exception:
|
||||
self.logger.exception("Cereal incoming proxy failure")
|
||||
|
||||
@@ -284,38 +176,29 @@ class StreamSession:
|
||||
if self.incoming_bridge is not None:
|
||||
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
|
||||
self.stream.set_message_handler(self.message_handler)
|
||||
if self.outgoing_bridge is not None:
|
||||
if self.outgoing_bridge_runner is not None:
|
||||
channel = self.stream.get_messaging_channel()
|
||||
self.outgoing_bridge.add_channel(channel)
|
||||
self.outgoing_bridge.start()
|
||||
self.bitrate_controller.start()
|
||||
|
||||
self.outgoing_bridge_runner.proxy.add_channel(channel)
|
||||
self.outgoing_bridge_runner.start()
|
||||
self.logger.info("Stream session (%s) connected", self.identifier)
|
||||
|
||||
await self.stream.wait_for_disconnection()
|
||||
await self.post_run_cleanup()
|
||||
|
||||
self.logger.info("Stream session (%s) ended", self.identifier)
|
||||
except Exception:
|
||||
self.logger.exception("Stream session failure")
|
||||
finally:
|
||||
await self.post_run_cleanup()
|
||||
|
||||
async def post_run_cleanup(self):
|
||||
async with self._cleanup_lock:
|
||||
if self._cleanup_done:
|
||||
return
|
||||
self._cleanup_done = True
|
||||
if self.bitrate_controller is not None:
|
||||
await self.bitrate_controller.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
await self.outgoing_bridge.stop()
|
||||
await self.stream.stop()
|
||||
await self.stream.stop()
|
||||
if self.outgoing_bridge is not None:
|
||||
self.outgoing_bridge_runner.stop()
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamRequestBody:
|
||||
sdp: str
|
||||
initCamera: str
|
||||
cameras: list[str]
|
||||
bridge_services_in: list[str] = field(default_factory=list)
|
||||
bridge_services_out: list[str] = field(default_factory=list)
|
||||
|
||||
@@ -325,33 +208,11 @@ async def get_stream(request: 'web.Request'):
|
||||
raw_body = await request.json()
|
||||
body = StreamRequestBody(**raw_body)
|
||||
|
||||
async with request.app['stream_lock']:
|
||||
# Fully disconnect any other active stream before starting the replacement.
|
||||
for sid, s in list(stream_dict.items()):
|
||||
if s.run_task and not s.run_task.done():
|
||||
try:
|
||||
ch = s.stream.get_messaging_channel()
|
||||
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
|
||||
except Exception:
|
||||
pass
|
||||
await s.stop()
|
||||
del stream_dict[sid]
|
||||
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
answer = await session.get_answer()
|
||||
session.start()
|
||||
|
||||
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
|
||||
try:
|
||||
answer = await session.get_answer()
|
||||
except ValueError as e:
|
||||
await session.stop()
|
||||
raise web.HTTPBadRequest(
|
||||
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
|
||||
content_type="application/json",
|
||||
) from e
|
||||
except Exception:
|
||||
await session.stop()
|
||||
raise
|
||||
session.start()
|
||||
|
||||
stream_dict[session.identifier] = session
|
||||
stream_dict[session.identifier] = session
|
||||
|
||||
return web.json_response({"sdp": answer.sdp, "type": answer.type})
|
||||
|
||||
@@ -363,7 +224,6 @@ async def get_schema(request: 'web.Request'):
|
||||
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
|
||||
return web.json_response(schema_dict)
|
||||
|
||||
|
||||
async def post_notify(request: 'web.Request'):
|
||||
try:
|
||||
payload = await request.json()
|
||||
@@ -379,10 +239,9 @@ async def post_notify(request: 'web.Request'):
|
||||
|
||||
return web.Response(status=200, text="OK")
|
||||
|
||||
|
||||
async def on_shutdown(app: 'web.Application'):
|
||||
for session in app['streams'].values():
|
||||
await session.stop()
|
||||
session.stop()
|
||||
del app['streams']
|
||||
|
||||
|
||||
@@ -395,7 +254,6 @@ def webrtcd_thread(host: str, port: int, debug: bool):
|
||||
app = web.Application()
|
||||
|
||||
app['streams'] = dict()
|
||||
app['stream_lock'] = asyncio.Lock()
|
||||
app['debug'] = debug
|
||||
app.on_shutdown.append(on_shutdown)
|
||||
app.router.add_post("/stream", get_stream)
|
||||
|
||||
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
|
||||
|
||||
async def offer(request: 'web.Request'):
|
||||
params = await request.json()
|
||||
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
|
||||
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
|
||||
body_json = json.dumps(dataclasses.asdict(body))
|
||||
|
||||
logger.info("Sending offer to webrtcd...")
|
||||
|
||||
Reference in New Issue
Block a user