Compare commits

..

169 Commits

Author SHA1 Message Date
royjr 9fca585f2a Merge branch 'master' into ccnc-port 2026-06-30 22:14:33 -04:00
royjr bb1259303e Update opendbc_repo 2026-06-30 22:14:23 -04:00
Jason Wen 31dc4d8e52 ci: fix cereal validation for upstream directory restructure (#1869) 2026-06-28 22:56:17 -04:00
Jason Wen da6313dbe9 Update CHANGELOG.md 2026-06-26 21:33:25 -04:00
royjr e135051ca8 Merge branch 'master' into ccnc-port 2026-06-13 22:38:22 -04:00
royjr 12bef55d8a Update opendbc_repo 2026-06-13 22:12:00 -04:00
royjr 2f7a45e6c8 Merge branch 'master' into ccnc-port 2026-06-08 22:01:57 -04:00
royjr 936ebfc12b Update opendbc_repo 2026-06-08 22:01:44 -04:00
royjr aa0c9dc0eb Merge branch 'master' into ccnc-port 2026-06-04 09:52:35 -04:00
royjr 7476a866e7 Update opendbc_repo 2026-06-04 09:52:06 -04:00
royjr 610d857e33 Merge branch 'master' into ccnc-port 2026-05-28 08:30:40 -04:00
royjr d2f47407d0 Update opendbc_repo 2026-05-12 21:03:18 -04:00
royjr db75ec76ea Merge branch 'master' into ccnc-port 2026-05-12 20:44:57 -04:00
royjr 24066465d7 Update opendbc_repo 2026-05-12 20:44:48 -04:00
royjr a7abbd6e25 Update opendbc_repo 2026-04-20 18:22:12 -04:00
royjr 878982447c Merge branch 'master' into ccnc-port 2026-04-19 12:06:46 -04:00
royjr 576527a36b Merge branch 'master' into ccnc-port 2026-04-17 05:42:35 -04:00
royjr ef8c35da24 Update opendbc_repo 2026-04-17 05:41:21 -04:00
royjr 85688b1040 Merge branch 'master' into ccnc-port 2026-04-16 19:48:39 -04:00
royjr 0fb2199130 Update opendbc_repo 2026-04-16 19:47:57 -04:00
royjr d48d756c1d Update opendbc_repo 2026-04-08 00:56:54 -04:00
royjr 2ed298a0c9 Merge branch 'master' into ccnc-port 2026-04-08 00:51:13 -04:00
royjr d68f038949 Update opendbc_repo 2026-04-08 00:51:11 -04:00
royjr 7231571e57 Merge branch 'master' into ccnc-port 2026-04-03 23:34:00 -04:00
royjr b37f1419d3 Update opendbc_repo 2026-04-03 23:33:19 -04:00
royjr cd85a66790 Merge branch 'master' into ccnc-port 2026-03-26 00:53:07 -04:00
royjr 305ea87daf Update opendbc_repo 2026-03-26 00:52:44 -04:00
royjr 4bbfc793e0 Merge branch 'master' into ccnc-port 2026-03-15 15:44:59 -04:00
royjr d5d983676e Update opendbc_repo 2026-03-13 16:41:05 -04:00
royjr de8a96a398 Merge branch 'master' into ccnc-port 2026-03-13 16:41:00 -04:00
royjr 0cbf45f699 Merge branch 'master' into ccnc-port 2026-03-11 23:28:59 -04:00
royjr 0d68a3a2ab Merge branch 'master' into ccnc-port 2026-03-09 19:58:32 -04:00
royjr 9e85a85059 Update opendbc_repo 2026-03-09 19:58:18 -04:00
royjr 0373c327c0 Update opendbc_repo 2026-03-02 10:10:38 -05:00
royjr efe9e5c200 Update opendbc_repo 2026-03-02 02:15:05 -05:00
royjr 8a249a45dc Update opendbc_repo 2026-03-02 02:06:16 -05:00
royjr bdbefe67f6 Update opendbc_repo 2026-03-02 01:40:31 -05:00
royjr 675bb166ad Merge branch 'master' into ccnc-port 2026-03-01 17:18:11 -05:00
royjr 1b717a7e88 Merge branch 'master' into ccnc-port 2026-03-01 13:23:30 -05:00
royjr 86f55a8ba9 Update opendbc_repo 2026-03-01 13:23:24 -05:00
royjr 629392d2f7 Update opendbc_repo 2026-02-27 16:31:59 -05:00
royjr bc414bdc8b Update opendbc_repo 2026-02-26 23:53:26 -05:00
royjr 7ca5649f2c Merge branch 'master' into ccnc-port 2026-02-26 23:52:46 -05:00
royjr 641ee8fa87 Update opendbc_repo 2026-02-26 23:52:27 -05:00
royjr 56c276158c Merge branch 'master' into ccnc-port 2026-02-24 14:12:12 -05:00
royjr c65308a8bd Update opendbc_repo 2026-02-24 14:12:01 -05:00
royjr 994e526460 Merge branch 'master' into ccnc-port 2026-02-18 12:06:05 -05:00
royjr 1defae36b7 Update opendbc_repo 2026-02-18 12:05:53 -05:00
royjr 8f029fd0ef Merge branch 'master' into ccnc-port 2026-02-13 23:01:41 -05:00
royjr ddb46284dc Update opendbc_repo 2026-02-13 23:01:16 -05:00
royjr 9effc754d9 Merge branch 'master' into ccnc-port 2026-02-06 01:16:06 -05:00
royjr e49ffc2a2d Update opendbc_repo 2026-02-06 01:15:59 -05:00
royjr 2cacd0b3e5 Merge branch 'master' into ccnc-port 2026-01-24 12:50:18 -05:00
royjr c4b8859dff Update opendbc_repo 2026-01-24 12:50:11 -05:00
royjr 8fb0953205 Merge branch 'master' into ccnc-port 2026-01-11 22:17:38 -05:00
royjr 63d1c8835f Merge branch 'master' into ccnc-port 2026-01-10 13:03:48 -05:00
royjr 17a185606d Merge branch 'master' into ccnc-port 2026-01-09 16:40:32 -05:00
royjr da10131392 Merge branch 'master' into ccnc-port 2025-12-28 17:18:51 -05:00
royjr 7107c2ba14 Merge branch 'master' into ccnc-port 2025-12-23 12:13:48 -05:00
royjr 95b6e877ac Update opendbc_repo 2025-12-23 12:13:29 -05:00
royjr eb02c6570e Update opendbc_repo 2025-12-21 15:43:04 -05:00
royjr 1be8ae31c4 Merge branch 'master' into ccnc-port 2025-12-19 01:04:42 -05:00
royjr 04dcd38856 Update opendbc_repo 2025-12-19 01:04:30 -05:00
royjr 22ccf0d72f Merge branch 'master' into ccnc-port 2025-12-15 17:02:50 -05:00
royjr 3c969bb627 Merge branch 'master' into ccnc-port 2025-12-13 23:22:22 -05:00
royjr 20f8011feb Update opendbc_repo 2025-12-13 23:22:11 -05:00
royjr 9cf17e74a1 Merge branch 'master' into ccnc-port 2025-12-12 23:19:56 -05:00
royjr 2c4efdf557 Merge branch 'master' into ccnc-port 2025-12-07 13:29:48 -05:00
royjr 4cd3d3c16c Merge branch 'master' into ccnc-port 2025-12-02 12:56:21 -05:00
royjr 637f3ae9c8 Merge branch 'master' into ccnc-port 2025-12-01 14:41:03 -05:00
royjr 464ee80f71 Merge branch 'master' into ccnc-port 2025-11-26 00:27:53 -05:00
royjr 2743a04613 Merge branch 'master' into ccnc-port 2025-11-24 18:44:13 -05:00
royjr 7f9978d001 Merge branch 'master' into ccnc-port 2025-11-22 00:21:15 -05:00
royjr 4b83961c67 Merge branch 'master' into ccnc-port 2025-11-21 16:23:22 -05:00
royjr c00eaf428a Update opendbc_repo 2025-11-21 16:23:01 -05:00
royjr 0a9993e8d4 Merge branch 'master' into ccnc-port 2025-11-19 16:49:59 -05:00
royjr 0af214a985 Update opendbc_repo 2025-11-19 16:49:51 -05:00
royjr af43385e3a Merge branch 'master' into ccnc-port 2025-11-11 10:19:51 -05:00
royjr 0ab2b8c590 Update opendbc_repo 2025-11-07 19:59:50 -05:00
royjr 67ab18a0de Merge branch 'master' into ccnc-port 2025-11-07 19:23:51 -05:00
royjr e87dc15b30 Update opendbc_repo 2025-11-07 19:23:37 -05:00
royjr 192d08516c Merge branch 'master' into ccnc-port 2025-11-02 19:23:00 -05:00
royjr 3cf001c59c Update opendbc_repo 2025-11-02 19:22:49 -05:00
royjr f2ccd021da Merge branch 'master' into ccnc-port 2025-11-02 14:07:54 -05:00
royjr c9fc900f64 Update opendbc_repo 2025-11-02 14:07:44 -05:00
royjr 3c37c5ce5d Update opendbc_repo 2025-10-30 11:28:49 -04:00
royjr 7c45889e4e Merge branch 'master' into ccnc-port 2025-10-30 11:27:50 -04:00
royjr 2aabb7aee8 Merge branch 'master' into ccnc-port 2025-10-24 14:16:09 -04:00
royjr 3859e9962f Update opendbc_repo 2025-10-24 14:15:51 -04:00
royjr 810efbab72 Merge branch 'master' into ccnc-port 2025-10-18 07:33:11 -04:00
royjr ec27bec326 Update opendbc_repo 2025-10-18 07:32:33 -04:00
royjr 250d553157 Merge branch 'master' into ccnc-port 2025-10-14 21:57:32 -04:00
royjr cea54a0ca8 Update opendbc_repo 2025-10-14 21:57:26 -04:00
royjr 8e72d783bd Update opendbc_repo 2025-10-13 22:41:21 -04:00
royjr 1b0dc103dc Merge branch 'master' into ccnc-port 2025-10-11 23:51:46 -04:00
royjr 6c364d292b Update opendbc_repo 2025-10-11 23:51:31 -04:00
royjr bcdec2ce84 Merge branch 'master' into ccnc-port 2025-10-10 17:29:05 -04:00
royjr 3deaeb3759 Merge branch 'master' into ccnc-port 2025-10-10 15:02:32 -04:00
royjr c669f0984a Update opendbc_repo 2025-10-10 15:02:17 -04:00
royjr 46dd946740 Merge branch 'master' into ccnc-port 2025-10-07 01:36:06 -04:00
royjr 9da4b3653e Update opendbc_repo 2025-10-07 01:35:58 -04:00
royjr 4e21ae7c50 Update opendbc_repo 2025-10-05 06:21:56 -04:00
royjr bb91e92237 Update opendbc_repo 2025-10-05 06:01:15 -04:00
royjr 14b4c4f85b Update opendbc_repo 2025-10-02 09:20:32 -04:00
royjr 0660b542c3 Merge branch 'master' into ccnc-port 2025-10-01 16:01:32 -04:00
royjr 2b893b90c9 Update opendbc_repo 2025-10-01 16:01:26 -04:00
royjr f5139178ed Merge branch 'master' into ccnc-port 2025-09-30 14:39:57 -04:00
royjr fb43b755f2 Update opendbc_repo 2025-09-30 14:39:50 -04:00
royjr 07f5b967d8 Merge branch 'master' into ccnc-port 2025-09-24 21:30:29 -04:00
royjr ea19c7d3bb Update opendbc_repo 2025-09-24 21:30:17 -04:00
royjr e461842cbb Merge branch 'master' into ccnc-port 2025-09-23 05:55:02 -04:00
royjr a73c9659d5 Update opendbc_repo 2025-09-23 05:54:50 -04:00
royjr cb796fbc76 Merge branch 'master' into ccnc-port 2025-09-18 20:10:43 -04:00
royjr 6bf75fc557 Update opendbc_repo 2025-09-18 20:10:37 -04:00
royjr 9a1fc28819 Merge branch 'master' into ccnc-port 2025-09-18 13:54:36 -04:00
royjr 0741d05e92 Update opendbc_repo 2025-09-18 13:54:23 -04:00
royjr 1ad008107d Merge branch 'master' into ccnc-port 2025-09-15 01:40:27 -04:00
royjr feebd9df93 Reapply "UI: Developer UI (#1233)"
This reverts commit 15e5d2efb9.
2025-09-15 01:40:21 -04:00
royjr c2e5ced3e5 Update opendbc_repo 2025-09-15 01:39:51 -04:00
royjr 15e5d2efb9 Revert "UI: Developer UI (#1233)"
This reverts commit 1bb4ca2547.
2025-09-12 02:10:29 -04:00
royjr a3929d0b54 Merge branch 'master' into ccnc-port 2025-09-12 01:40:29 -04:00
royjr 794f8f9991 Update opendbc_repo 2025-09-08 09:27:31 -04:00
royjr 68fa5e3f21 Merge branch 'master' into ccnc-port 2025-09-07 13:13:37 -04:00
royjr 86c6cc1f48 Merge branch 'master' into ccnc-port 2025-09-03 22:22:12 -04:00
royjr eb7ffbf093 Update opendbc_repo 2025-09-03 10:14:58 -04:00
royjr 3919095752 Update opendbc_repo 2025-09-03 10:05:35 -04:00
royjr 74d63be1c3 Merge branch 'master' into ccnc-port 2025-09-03 09:50:55 -04:00
royjr 8894486a1a Update opendbc_repo 2025-09-03 09:50:49 -04:00
royjr 810599315d Merge branch 'master' into ccnc-port 2025-08-31 16:53:30 -04:00
royjr 6f3ab810c8 Update opendbc_repo 2025-08-31 16:53:24 -04:00
royjr 230f78b8d3 Merge branch 'master' into ccnc-port 2025-08-26 12:14:46 -04:00
royjr f1affec088 Update opendbc_repo 2025-08-26 12:14:22 -04:00
royjr 97d8ef242c Merge branch 'master' into ccnc-port 2025-08-24 15:12:57 -04:00
royjr a63fff9b45 Update opendbc_repo 2025-08-24 15:12:46 -04:00
royjr cb3893daaa Merge branch 'master' into ccnc-port 2025-08-23 10:33:24 -04:00
royjr 29f60df74b Merge branch 'master' into ccnc-port 2025-08-22 11:18:27 -04:00
royjr c6c072e1f4 Update opendbc_repo 2025-08-22 11:18:18 -04:00
royjr d101cbb83e Update opendbc_repo 2025-08-13 16:00:04 -04:00
royjr 1536d59633 Update opendbc_repo 2025-08-13 15:28:37 -04:00
royjr dc99b865ae Merge branch 'master' into ccnc-port 2025-08-13 12:31:14 -04:00
royjr e59bc027ff Merge branch 'master' into ccnc-port 2025-08-13 11:58:05 -04:00
royjr cf7e5efaca Update opendbc_repo 2025-08-13 11:57:57 -04:00
royjr 4b44f2eb31 Merge branch 'master' into ccnc-port 2025-08-10 09:18:24 -04:00
royjr 107d2ab400 Update opendbc_repo 2025-08-10 09:18:15 -04:00
royjr 5432d9062c Merge branch 'master' into ccnc-port 2025-08-04 11:52:29 -04:00
royjr f533f6c843 Merge branch 'master' into ccnc-port 2025-08-02 06:53:54 -04:00
royjr 58e9ac763c Update opendbc_repo 2025-08-02 06:53:43 -04:00
royjr cb50d54169 Merge branch 'master-new' into ccnc-port 2025-07-24 20:23:46 -04:00
royjr bd5de4ed0a Merge branch 'master-new' into ccnc-port 2025-07-20 23:49:36 -04:00
royjr 0d4073fadb Merge branch 'master-new' into ccnc-port 2025-07-19 23:18:26 -04:00
royjr ebc70dcb52 Merge branch 'master-new' into ccnc-port 2025-07-19 14:37:17 -04:00
royjr 4d0426999e Update opendbc_repo 2025-07-19 14:36:59 -04:00
royjr 286da42573 Merge branch 'master-new' into ccnc-port 2025-07-16 23:48:23 -04:00
royjr 8a836710a9 Update opendbc_repo 2025-07-16 23:48:06 -04:00
royjr 5d515bcf33 Merge branch 'master-new' into ccnc-port 2025-07-07 05:35:55 -04:00
royjr 1c7f6d5133 Update opendbc_repo 2025-07-03 21:24:01 -04:00
royjr 05d57c7aeb Update opendbc_repo 2025-07-01 18:30:32 -04:00
royjr e4b0eaf352 Update opendbc_repo 2025-06-28 19:30:28 -04:00
royjr a710276472 Merge branch 'master-new' into ccnc-port 2025-06-28 19:22:59 -04:00
royjr af086db671 Merge branch 'master-new' into ccnc-port 2025-06-28 12:13:26 -04:00
royjr 0d9eb0e25e Update opendbc_repo submodule to latest commit
Advanced the opendbc_repo submodule to commit d309f7ec96e37267c94d12fc4bfe2672ad505b06. This pulls in the latest changes from the opendbc repository.
2025-06-26 14:03:56 -04:00
royjr 0616caed6d Merge branch 'master-new' into ccnc-port 2025-06-25 19:33:53 -04:00
royjr 095337b3c1 Update opendbc_repo 2025-06-25 19:33:38 -04:00
royjr 1edec2d22c Update opendbc_repo 2025-06-14 16:14:02 -04:00
royjr affabb9ee0 Update opendbc_repo 2025-06-14 15:55:07 -04:00
royjr dc27e8711c Update opendbc_repo 2025-06-14 14:54:32 -04:00
royjr cf7329a264 Merge branch 'master-new' into ccnc-port 2025-06-11 21:36:06 -04:00
royjr 5ee5ecd820 Merge branch 'master-new' into ccnc-port 2025-06-08 23:25:13 -04:00
royjr b064f730dd Update opendbc_repo 2025-06-08 17:54:43 -04:00
16 changed files with 402 additions and 218 deletions
+22 -3
View File
@@ -35,18 +35,36 @@ jobs:
- name: Init sunnypilot opendbc submodule
run: git submodule update --init --depth 1 opendbc_repo
- name: Checkout upstream openpilot cereal
- name: Checkout upstream openpilot
uses: actions/checkout@v6
with:
repository: 'commaai/openpilot'
path: upstream_openpilot
sparse-checkout: cereal
ref: "refs/heads/master"
- name: Init upstream opendbc submodule
working-directory: upstream_openpilot
run: git submodule update --init --depth 1 opendbc_repo
- name: Locate upstream capnp paths
id: locate-capnp
run: |
CEREAL_DIR=$(find upstream_openpilot -maxdepth 4 -name log.capnp -path '*/cereal/log.capnp' -printf '%h\n' -quit)
if [ -z "$CEREAL_DIR" ]; then
echo "::error::Could not locate cereal/log.capnp in upstream openpilot"
exit 1
fi
echo "cereal_dir=$CEREAL_DIR" >> "$GITHUB_OUTPUT"
echo "Found upstream cereal at: $CEREAL_DIR"
IMPORT_ARGS=""
CAR_CAPNP=$(find upstream_openpilot -maxdepth 5 -name car.capnp -path '*/opendbc/car/car.capnp' -printf '%h\n' -quit)
if [ -n "$CAR_CAPNP" ]; then
IMPORT_ARGS="-I $CAR_CAPNP"
echo "Found car.capnp at: $CAR_CAPNP"
fi
echo "import_args=$IMPORT_ARGS" >> "$GITHUB_OUTPUT"
- name: Install uv
run: pip install uv
@@ -62,4 +80,5 @@ jobs:
PYCAPNP_VER=$(python3 -c "import re; m=re.search(r'name = \"pycapnp\"\nversion = \"([^\"]+)\"', open('uv.lock').read()); print(m.group(1))")
uv run --isolated --with "pycapnp==${PYCAPNP_VER}" \
python3 cereal/messaging/tests/validate_sp_cereal_upstream.py \
-r -f /tmp/sp_schema.json --cereal-dir upstream_openpilot/cereal
-r -f /tmp/sp_schema.json --cereal-dir ${{ steps.locate-capnp.outputs.cereal_dir }} \
${{ steps.locate-capnp.outputs.import_args }}
@@ -1,12 +1,13 @@
#!/usr/bin/env python3
"""Schema-level cereal compat check between sunnypilot and upstream openpilot.
"""Validate sunnypilot routes are parseable by stock commaai/openpilot.
Rules (per struct matched across sides by typeId):
R1 shared ordinal must reference the same type.
R2 sunnypilot-only ordinal in a union -> FAIL (unknown discriminant upstream).
R3 sunnypilot-only ordinal on a regular field -> OK (additive struct evolution).
R4 upstream-only ordinal -> OK.
R5 sunnypilot-only struct referenced via an upstream-shared field -> FAIL.
Cap'n Proto is wire-compatible across renames, type relocations, and
additive fields. The only breaking change is a union variant that
upstream doesn't recognize — an unknown discriminant makes the entire
union unreadable.
This script checks: for every struct with a union that exists in both
schemas, does sunnypilot introduce union variants upstream doesn't have?
"""
from __future__ import annotations
@@ -24,46 +25,19 @@ def hex_id(value: int) -> str:
return f"0x{value:016x}"
def encode_type(type_node: Any) -> dict:
which = type_node.which()
if which == "struct":
return {"kind": "struct", "typeId": hex_id(type_node.struct.typeId)}
if which == "enum":
return {"kind": "enum", "typeId": hex_id(type_node.enum.typeId)}
if which == "interface":
return {"kind": "interface", "typeId": hex_id(type_node.interface.typeId)}
if which == "list":
return {"kind": "list", "element": encode_type(type_node.list.elementType)}
if which == "anyPointer":
return {"kind": "anyPointer"}
return {"kind": which}
def encode_field(name: str, field: Any) -> dict:
proto = field.proto
ordinal = proto.ordinal.explicit if proto.ordinal.which() == "explicit" else None
discriminant = proto.discriminantValue if proto.discriminantValue != NO_DISCRIMINANT else None
if proto.which() == "group":
type_desc = {"kind": "group", "typeId": hex_id(proto.group.typeId)}
else:
type_desc = encode_type(proto.slot.type)
return {
"name": name,
"ordinal": ordinal,
"discriminant": discriminant,
"type": type_desc,
}
def encode_struct(schema: Any) -> dict:
node = schema.node
fields = []
for name, field in schema.fields.items():
proto = field.proto
ordinal = proto.ordinal.explicit if proto.ordinal.which() == "explicit" else None
discriminant = proto.discriminantValue if proto.discriminantValue != NO_DISCRIMINANT else None
fields.append({"name": name, "ordinal": ordinal, "discriminant": discriminant})
return {
"typeId": hex_id(node.id),
"displayName": node.displayName,
"hasUnion": node.struct.discriminantCount > 0,
"fields": [encode_field(name, field) for name, field in schema.fields.items()],
"fields": fields,
}
@@ -105,15 +79,16 @@ def collect_schema(root: Any) -> dict[str, dict]:
return structs
def load_log(cereal_dir: str) -> Any:
def load_log(cereal_dir: str, extra_imports: list[str] | None = None) -> Any:
import capnp
cereal_dir = os.path.abspath(cereal_dir)
capnp.remove_import_hook()
return capnp.load(os.path.join(cereal_dir, "log.capnp"), imports=[cereal_dir])
imports = [cereal_dir] + [os.path.abspath(p) for p in (extra_imports or [])]
return capnp.load(os.path.join(cereal_dir, "log.capnp"), imports=imports)
def dump_schema(cereal_dir: str, path: str) -> None:
log = load_log(cereal_dir)
def dump_schema(cereal_dir: str, path: str, extra_imports: list[str] | None = None) -> None:
log = load_log(cereal_dir, extra_imports)
payload = {
"root": hex_id(log.Event.schema.node.id),
"structs": collect_schema(log.Event.schema),
@@ -123,100 +98,37 @@ def dump_schema(cereal_dir: str, path: str) -> None:
print(f"wrote schema dump with {len(payload['structs'])} structs to {path}")
def types_equal(a: dict, b: dict) -> bool:
if a.get("kind") != b.get("kind"):
return False
kind = a["kind"]
if kind in ("struct", "enum", "interface", "group"):
return a.get("typeId") == b.get("typeId")
if kind == "list":
return types_equal(a["element"], b["element"])
return True
def type_repr(t: dict) -> str:
kind = t.get("kind", "?")
if kind in ("struct", "enum", "interface", "group"):
return f"{kind}({t.get('typeId')})"
if kind == "list":
return f"list<{type_repr(t['element'])}>"
return kind
def field_is_union_variant(field: dict) -> bool:
return field.get("discriminant") is not None
def index_fields_by_ordinal(struct: dict) -> dict[int, dict]:
indexed: dict[int, dict] = {}
for field in struct["fields"]:
ordinal = field.get("ordinal")
if ordinal is None:
continue
indexed[ordinal] = field
return indexed
def compare(sunnypilot_dump: dict, upstream_dump: dict) -> list[str]:
violations: list[str] = []
sunnypilot_structs: dict[str, dict] = sunnypilot_dump["structs"]
upstream_structs: dict[str, dict] = upstream_dump["structs"]
sunnypilot_structs = sunnypilot_dump["structs"]
upstream_structs = upstream_dump["structs"]
sunnypilot_struct_referenced_from_shared: set[str] = set()
for type_id, sunnypilot_struct in sunnypilot_structs.items():
upstream_struct = upstream_structs.get(type_id)
if upstream_struct is None:
for type_id, sp_struct in sunnypilot_structs.items():
if not sp_struct["hasUnion"]:
continue
up_struct = upstream_structs.get(type_id)
if up_struct is None:
continue
sunnypilot_fields = index_fields_by_ordinal(sunnypilot_struct)
upstream_fields = index_fields_by_ordinal(upstream_struct)
display = sunnypilot_struct["displayName"]
up_ordinals = {f["ordinal"] for f in up_struct["fields"] if f.get("discriminant") is not None}
display = sp_struct["displayName"]
for ordinal, sunnypilot_field in sunnypilot_fields.items():
upstream_field = upstream_fields.get(ordinal)
if upstream_field is None:
if field_is_union_variant(sunnypilot_field):
violations.append(
f"[R2] {display} @{ordinal} ('{sunnypilot_field['name']}', {type_repr(sunnypilot_field['type'])}): "
f"union variant not present upstream. upstream cannot parse this discriminant."
)
for field in sp_struct["fields"]:
if field.get("discriminant") is None:
continue
if not types_equal(sunnypilot_field["type"], upstream_field["type"]):
if field["ordinal"] not in up_ordinals:
violations.append(
f"[R1] {display} @{ordinal}: type mismatch. "
f"sunnypilot='{sunnypilot_field['name']}' {type_repr(sunnypilot_field['type'])} vs "
f"upstream='{upstream_field['name']}' {type_repr(upstream_field['type'])}."
f"{display} @{field['ordinal']} '{field['name']}': "
f"union variant not present upstream (discriminant={field['discriminant']})"
)
continue
cursor = sunnypilot_field["type"]
while cursor.get("kind") == "list":
cursor = cursor["element"]
if cursor.get("kind") in ("struct", "group", "interface") and cursor.get("typeId"):
sunnypilot_struct_referenced_from_shared.add(cursor["typeId"])
for type_id, sunnypilot_struct in sunnypilot_structs.items():
if type_id in upstream_structs:
continue
if type_id in sunnypilot_struct_referenced_from_shared:
violations.append(
f"[R5] struct {sunnypilot_struct['displayName']} ({type_id}) exists only on sunnypilot "
f"but is referenced from an upstream-shared field. upstream cannot resolve this type."
)
return violations
def load_peer(path: str) -> dict:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
def run_read(cereal_dir: str, peer_path: str) -> int:
log = load_log(cereal_dir)
peer_dump = load_peer(peer_path)
def run_read(cereal_dir: str, peer_path: str, extra_imports: list[str] | None = None) -> int:
log = load_log(cereal_dir, extra_imports)
with open(peer_path, "r", encoding="utf-8") as f:
peer_dump = json.load(f)
local_dump = {
"root": hex_id(log.Event.schema.node.id),
"structs": collect_schema(log.Event.schema),
@@ -224,32 +136,29 @@ def run_read(cereal_dir: str, peer_path: str) -> int:
violations = compare(sunnypilot_dump=peer_dump, upstream_dump=local_dump)
if not violations:
print("cereal compat OK: upstream openpilot can parse sunnypilot routes "
"(no leaked structs, no ordinal collisions).")
print("cereal compat OK: upstream can parse sunnypilot routes.")
return 0
print(f"cereal compat FAIL: upstream openpilot would misparse sunnypilot routes "
f"({len(violations)} violation(s)):")
print(f"cereal compat FAIL ({len(violations)} leaked union variant(s)):")
for v in violations:
print(f" {v}")
return 1
def main() -> int:
parser = argparse.ArgumentParser(
description="sunnypilot <-> upstream cereal compatibility validator (schema-level)."
)
parser = argparse.ArgumentParser(description="sunnypilot cereal upstream compat check")
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("-g", "--generate", action="store_true", help="dump local schema to JSON")
mode.add_argument("-r", "--read", action="store_true", help="load peer JSON and diff against local")
parser.add_argument("-f", "--file", default="schema.json", help="JSON file path (default: schema.json)")
parser.add_argument("--cereal-dir", required=True, help="path to cereal directory containing log.capnp")
mode.add_argument("-r", "--read", action="store_true", help="validate against peer schema")
parser.add_argument("-f", "--file", default="schema.json", help="JSON file path")
parser.add_argument("--cereal-dir", required=True, help="path to cereal directory")
parser.add_argument("-I", "--import-path", action="append", default=[], help="extra capnp import paths")
args = parser.parse_args()
if args.generate:
dump_schema(args.cereal_dir, args.file)
dump_schema(args.cereal_dir, args.file, args.import_path)
return 0
return run_read(args.cereal_dir, args.file)
return run_read(args.cereal_dir, args.file, args.import_path)
if __name__ == "__main__":
+1
View File
@@ -80,6 +80,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveDelay", {PERSISTENT | BACKUP, BYTES}},
{"LiveParameters", {PERSISTENT, JSON}},
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LivestreamEncoderBitrate", {CLEAR_ON_MANAGER_START | DONT_LOG, INT}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
+41 -1
View File
@@ -29,7 +29,7 @@ from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutExce
create_connection)
import cereal.messaging as messaging
from cereal import log
from cereal import car, log
from cereal.services import SERVICE_LIST
from openpilot.common.api import Api, get_key_pair
from openpilot.common.utils import CallbackReader, get_upload_stream
@@ -45,6 +45,7 @@ from openpilot.system.hardware.hw import Paths
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {22, } # SSH
WEBRTCD_PORT = 5001
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
@@ -567,6 +568,16 @@ def getSshAuthorizedKeys() -> str:
def getGithubUsername() -> str:
return cast(str, Params().get("GithubUsername") or "")
@dispatcher.add_method
def getNotCar() -> bool:
cp_bytes = Params().get("CarParamsPersistent")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
return CP.notCar
return False
@dispatcher.add_method
def getSimInfo():
return HARDWARE.get_sim_info()
@@ -588,6 +599,35 @@ def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def startStream(sdp: str) -> dict:
from openpilot.system.webrtc.webrtcd import StreamRequestBody
bridge_services_in = []
# get live car params to avoid stale notCar edge case
cp_bytes = Params().get("CarParams")
if cp_bytes is not None:
with car.CarParams.from_bytes(cp_bytes) as CP:
if CP.notCar:
bridge_services_in.append("testJoystick")
body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"])
try:
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
json=asdict(body), timeout=10)
if not resp.ok:
try:
error_body = resp.json()
raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}"))
except ValueError:
resp.raise_for_status()
return resp.json()
except requests.ConnectTimeout as e:
raise Exception("webrtc took too long to respond. is it on?") from e
except requests.ConnectionError as e:
raise Exception("webrtc is not running. turn on comma body ignition.") from e
@dispatcher.add_method
def takeSnapshot() -> str | dict[str, str] | None:
from openpilot.system.camerad.snapshot import jpeg_write, snapshot
-3
View File
@@ -313,9 +313,6 @@ class Tici(HardwareBase):
continue
gov = 'ondemand' if powersave_enabled else 'performance'
sudo_write(gov, f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_governor')
if not powersave_enabled:
# cap max core freq to 1689 Mhz
sudo_write('1689600', f'/sys/devices/system/cpu/cpufreq/policy{n}/scaling_max_freq')
# *** IRQ config ***
+1
View File
@@ -26,6 +26,7 @@ public:
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
virtual void encoder_open() = 0;
virtual void encoder_close() = 0;
virtual void set_bitrate(int bitrate) = 0;
void publisher_publish(int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
+4
View File
@@ -72,6 +72,10 @@ void FfmpegEncoder::encoder_close() {
is_open = false;
}
void FfmpegEncoder::set_bitrate(int bitrate) {
LOGE("adaptive bitrate is not supported for ffmpeg encoder %s", encoder_info.publish_name);
}
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
assert(buf->width == this->in_width);
assert(buf->height == this->in_height);
+1
View File
@@ -21,6 +21,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int segment_num = -1;
+21
View File
@@ -155,6 +155,8 @@ V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_hei
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
EncoderSettings encoder_settings = encoder_info.get_settings(in_width);
current_bitrate = encoder_settings.bitrate;
adaptive_bitrate = encoder_info.adaptive_bitrate;
bool is_h265 = encoder_settings.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C;
struct v4l2_format fmt_out = {
@@ -304,6 +306,25 @@ void V4LEncoder::encoder_close() {
this->is_open = false;
}
void V4LEncoder::set_bitrate(int bitrate) {
if (!adaptive_bitrate || bitrate == current_bitrate) return;
if (bitrate <= 0) {
LOGE("invalid livestream encoder bitrate %d", bitrate);
return;
}
struct v4l2_control ctrl = {
.id = V4L2_CID_MPEG_VIDEO_BITRATE,
.value = bitrate,
};
if (util::safe_ioctl(fd, VIDIOC_S_CTRL, &ctrl) == -1) {
LOGE("failed to update %s bitrate to %d", encoder_info.publish_name, bitrate);
return;
}
current_bitrate = bitrate;
}
V4LEncoder::~V4LEncoder() {
encoder_close();
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
+3
View File
@@ -13,6 +13,7 @@ public:
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
void encoder_open();
void encoder_close();
void set_bitrate(int bitrate);
private:
int fd;
@@ -20,6 +21,8 @@ private:
bool is_open = false;
int segment_num = -1;
int counter = 0;
int current_bitrate = -1;
bool adaptive_bitrate;
SafeQueue<VisionIpcBufExtra> extras;
+15
View File
@@ -44,11 +44,24 @@ bool sync_encoders(EncoderdState *s, VisionStreamType cam_type, uint32_t frame_i
}
}
void apply_bitrate(std::vector<std::unique_ptr<Encoder>> &encoders) {
static Params params;
std::string val = params.get("LivestreamEncoderBitrate");
if (val.empty()) return;
int bitrate = std::stoi(val);
for (auto &e : encoders) {
e->set_bitrate(bitrate);
}
}
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
util::set_thread_name(cam_info.thread_name);
std::vector<std::unique_ptr<Encoder>> encoders;
bool has_adaptive = std::any_of(cam_info.encoder_infos.begin(), cam_info.encoder_infos.end(),
[](const auto &ei) { return ei.adaptive_bitrate; });
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
std::unique_ptr<JpegEncoder> jpeg_encoder;
@@ -108,6 +121,8 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
++cur_seg;
}
if (has_adaptive) apply_bitrate(encoders);
// encode a frame
for (int i = 0; i < encoders.size(); ++i) {
int out_id = encoders[i]->encode_frame(buf, &extra);
+9 -5
View File
@@ -47,8 +47,8 @@ struct EncoderSettings {
}
static EncoderSettings StreamEncoderSettings() {
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 5'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5};
}
};
@@ -59,6 +59,7 @@ public:
const char *filename = NULL;
bool record = true;
bool include_audio = false;
bool adaptive_bitrate = false;
int frame_width = -1;
int frame_height = -1;
int fps = MAIN_FPS;
@@ -104,6 +105,7 @@ const EncoderInfo stream_road_encoder_info = {
.publish_name = "livestreamRoadEncodeData",
//.thumbnail_name = "thumbnail",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
};
@@ -111,6 +113,7 @@ const EncoderInfo stream_road_encoder_info = {
const EncoderInfo stream_wide_road_encoder_info = {
.publish_name = "livestreamWideRoadEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
};
@@ -118,6 +121,7 @@ const EncoderInfo stream_wide_road_encoder_info = {
const EncoderInfo stream_driver_encoder_info = {
.publish_name = "livestreamDriverEncodeData",
.record = false,
.adaptive_bitrate = true,
.get_settings = [](int){return EncoderSettings::StreamEncoderSettings();},
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
};
@@ -153,19 +157,19 @@ const LogCameraInfo driver_camera_info{
const LogCameraInfo stream_road_camera_info{
.thread_name = "road_cam_encoder",
.stream_type = VISION_STREAM_ROAD,
.encoder_infos = {stream_road_encoder_info}
.encoder_infos = {stream_road_encoder_info},
};
const LogCameraInfo stream_wide_road_camera_info{
.thread_name = "wide_road_cam_encoder",
.stream_type = VISION_STREAM_WIDE_ROAD,
.encoder_infos = {stream_wide_road_encoder_info}
.encoder_infos = {stream_wide_road_encoder_info},
};
const LogCameraInfo stream_driver_camera_info{
.thread_name = "driver_cam_encoder",
.stream_type = VISION_STREAM_DRIVER,
.encoder_infos = {stream_driver_encoder_info}
.encoder_infos = {stream_driver_encoder_info},
};
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
+31 -4
View File
@@ -1,4 +1,5 @@
import asyncio
import struct
import time
import av
@@ -7,6 +8,13 @@ from teleoprtc.tracks import TiciVideoStreamTrack
from cereal import messaging
from openpilot.common.realtime import DT_MDL, DT_DMON
# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages
TIMING_SEI_UUID = bytes([
0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e,
0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
])
_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID
class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
camera_to_sock_mapping = {
@@ -19,9 +27,30 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
dt = DT_DMON if camera_type == "driver" else DT_MDL
super().__init__(camera_type, dt)
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
self._sock = self._make_sock(camera_type)
self._pts = 0
self._t0_ns = time.monotonic_ns()
self.timing_sei_enabled = False
def _make_sock(self, camera_type: str) -> messaging.SubSocket:
return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
def switch_camera(self, camera_type: str) -> None:
self._sock = self._make_sock(camera_type)
def _build_frame_data(self, msg) -> bytes:
encode_data = getattr(msg, msg.which())
if not self.timing_sei_enabled:
return encode_data.header + encode_data.data
idx = encode_data.idx
sei_nal = _SEI_PREFIX + struct.pack('>4d',
(idx.timestampEof - idx.timestampSof) / 1e6,
(msg.logMonoTime - idx.timestampEof) / 1e6,
(time.monotonic_ns() - msg.logMonoTime) / 1e6,
time.time() * 1000, # noqa: TID251
) + b'\x80'
return encode_data.header + sei_nal + encode_data.data
async def recv(self):
while True:
@@ -30,9 +59,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
break
await asyncio.sleep(0.005)
evta = getattr(msg, msg.which())
packet = av.Packet(evta.header + evta.data)
packet = av.Packet(self._build_frame_data(msg))
packet.time_base = self._time_base
self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
+204 -62
View File
@@ -1,7 +1,11 @@
#!/usr/bin/env python3
from abc import abstractmethod
import os
import time
import argparse
import asyncio
import contextlib
import json
import uuid
import logging
@@ -19,11 +23,39 @@ if TYPE_CHECKING:
from aiortc.rtcdatachannel import RTCDataChannel
from openpilot.system.webrtc.schema import generate_field
from openpilot.common.params import Params
from cereal import messaging, log
class CerealOutgoingMessageProxy:
class AsyncTaskRunner:
def __init__(self):
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
async def stop(self):
if self.task is None:
return
task = self.task
self.task = None
if task.done():
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
@abstractmethod
async def run(self):
pass
class CerealOutgoingMessageProxy(AsyncTaskRunner):
def __init__(self, sm: messaging.SubMaster):
super().__init__()
self.sm = sm
self.channels: list[RTCDataChannel] = []
@@ -55,6 +87,19 @@ class CerealOutgoingMessageProxy:
for channel in self.channels:
channel.send(encoded_msg)
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class CerealIncomingMessageProxy:
def __init__(self, pm: messaging.PubMaster):
@@ -72,37 +117,6 @@ class CerealIncomingMessageProxy:
self.pm.send(msg_type, msg)
class CerealProxyRunner:
def __init__(self, proxy: CerealOutgoingMessageProxy):
self.proxy = proxy
self.is_running = False
self.task = None
self.logger = logging.getLogger("webrtcd")
def start(self):
assert self.task is None
self.task = asyncio.create_task(self.run())
def stop(self):
if self.task is None or self.task.done():
return
self.task.cancel()
self.task = None
async def run(self):
from aiortc.exceptions import InvalidStateError
while True:
try:
self.proxy.update()
except InvalidStateError:
self.logger.warning("Cereal outgoing proxy invalid state (connection closed)")
break
except Exception:
self.logger.exception("Cereal outgoing proxy failure")
await asyncio.sleep(0.01)
class DynamicPubMaster(messaging.PubMaster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -115,21 +129,90 @@ class DynamicPubMaster(messaging.PubMaster):
self.sock[service] = messaging.pub_sock(service)
class LivestreamBitrateController(AsyncTaskRunner):
bitrates = [500_000, 1_500_000, int(os.environ.get("STREAM_BITRATE", 5_000_000))]
label_to_bitrate = { "high": bitrates[2], "med": bitrates[1], "low": bitrates[0]}
sample_interval = 0.2
high_level = 0.1 # drop immediately
med_level = 0.05 # drop after # of samples
low_level = 0 # raise after # of samples
down_samples = 5 # 1s
param_name = "LivestreamEncoderBitrate"
def __init__(self, peer_connection: Any):
super().__init__()
self.pc = peer_connection
self.params = Params()
self.level = 0
self.prev_lost, self.prev_sent = None, None
self.counter = 0
self.up_samples = 5 # 1s
self._auto = True
async def run(self):
while True:
await asyncio.sleep(self.sample_interval)
if not self._auto:
continue
loss_rate = await self._sample()
if loss_rate is None:
continue
if loss_rate >= self.med_level and self.level > 0:
self.counter += 1
if self.counter >= self.down_samples or loss_rate >= self.high_level:
self.level -= 1
self.up_samples *= 2 # exponential backoff before raising again
self.counter = 0
self._publish(self.bitrates[self.level])
elif loss_rate <= self.low_level and self.level < len(self.bitrates) - 1:
self.counter -= 1
if -self.counter >= self.up_samples:
self.level += 1
self.counter = 0
self._publish(self.bitrates[self.level])
async def _sample(self) -> float | None:
report = await self.pc.getStats()
packets_lost = packets_sent = 0
for s in report.values():
if s.type == "remote-inbound-rtp":
packets_lost += s.packetsLost
elif s.type == "outbound-rtp":
packets_sent += s.packetsSent
if self.prev_lost is None:
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return None
lost_delta = max(0, packets_lost - self.prev_lost)
sent_delta = max(0, packets_sent - self.prev_sent)
self.prev_lost, self.prev_sent = packets_lost, packets_sent
return lost_delta / sent_delta if sent_delta else 0.0
def _publish(self, bitrate: float):
self.params.put(self.param_name, bitrate)
def set_quality(self, quality):
if quality in self.label_to_bitrate:
self._publish(self.label_to_bitrate[quality])
self._auto = False
elif quality == "auto":
self._auto = True
class StreamSession:
shared_pub_master = DynamicPubMaster([])
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False):
from aiortc.mediastreams import VideoStreamTrack
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from teleoprtc import WebRTCAnswerBuilder
from teleoprtc.info import parse_info_from_offer
config = parse_info_from_offer(sdp)
builder = WebRTCAnswerBuilder(sdp)
assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks"
for cam in cameras:
builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())
self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()
builder.add_video_stream(init_camera, self.video_track)
self.stream = builder.stream()
self.identifier = str(uuid.uuid4())
@@ -137,35 +220,60 @@ class StreamSession:
self.incoming_bridge: CerealIncomingMessageProxy | None = None
self.incoming_bridge_services = incoming_services
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None
self.outgoing_bridge_runner: CerealProxyRunner | None = None
self.bitrate_controller: LivestreamBitrateController | None = None
if len(incoming_services) > 0:
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master)
if len(outgoing_services) > 0:
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services))
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge)
self.bitrate_controller = LivestreamBitrateController(self.stream.peer_connection)
self.run_task: asyncio.Task | None = None
self._cleanup_lock = asyncio.Lock()
self._cleanup_done = False
self.logger = logging.getLogger("webrtcd")
self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s",
self.identifier, cameras, incoming_services, outgoing_services)
self.logger.info(
"New stream session (%s), init camera %s, incoming services %s, outgoing services %s",
self.identifier, init_camera, incoming_services, outgoing_services,
)
def start(self):
self.run_task = asyncio.create_task(self.run())
def stop(self):
if self.run_task.done():
return
self.run_task.cancel()
async def stop(self):
if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task():
self.run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.run_task
self.run_task = None
asyncio.run(self.post_run_cleanup())
await self.post_run_cleanup()
async def get_answer(self):
return await self.stream.start()
async def message_handler(self, message: bytes):
def message_handler(self, message: bytes):
assert self.incoming_bridge is not None
try:
self.incoming_bridge.send(message)
payload = json.loads(message) if isinstance(message, (bytes, str)) else None
if isinstance(payload, dict):
msg_type = payload.get("type")
match msg_type:
case "livestreamCameraSwitch":
self.video_track.switch_camera(payload["data"]["camera"])
case "livestreamSettings":
self.bitrate_controller.set_quality(payload["data"]["quality"])
case "clockSync":
pong = json.dumps({"type": "clockSync", "data": {
"action": "pong", "browserSendTime": payload["data"]["browserSendTime"], "deviceTime": time.time() * 1000, # noqa: TID251
}})
self.stream.get_messaging_channel().send(pong)
case "enableTimingSei":
if hasattr(self.video_track, 'timing_sei_enabled'):
self.video_track.timing_sei_enabled = bool(payload["data"]["enabled"])
case _:
if payload.get("type") not in self.incoming_bridge_services:
return
self.incoming_bridge.send(message)
except Exception:
self.logger.exception("Cereal incoming proxy failure")
@@ -176,29 +284,38 @@ class StreamSession:
if self.incoming_bridge is not None:
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services)
self.stream.set_message_handler(self.message_handler)
if self.outgoing_bridge_runner is not None:
if self.outgoing_bridge is not None:
channel = self.stream.get_messaging_channel()
self.outgoing_bridge_runner.proxy.add_channel(channel)
self.outgoing_bridge_runner.start()
self.outgoing_bridge.add_channel(channel)
self.outgoing_bridge.start()
self.bitrate_controller.start()
self.logger.info("Stream session (%s) connected", self.identifier)
await self.stream.wait_for_disconnection()
await self.post_run_cleanup()
self.logger.info("Stream session (%s) ended", self.identifier)
except Exception:
self.logger.exception("Stream session failure")
finally:
await self.post_run_cleanup()
async def post_run_cleanup(self):
await self.stream.stop()
if self.outgoing_bridge is not None:
self.outgoing_bridge_runner.stop()
async with self._cleanup_lock:
if self._cleanup_done:
return
self._cleanup_done = True
if self.bitrate_controller is not None:
await self.bitrate_controller.stop()
if self.outgoing_bridge is not None:
await self.outgoing_bridge.stop()
await self.stream.stop()
@dataclass
class StreamRequestBody:
sdp: str
cameras: list[str]
initCamera: str
bridge_services_in: list[str] = field(default_factory=list)
bridge_services_out: list[str] = field(default_factory=list)
@@ -208,11 +325,33 @@ async def get_stream(request: 'web.Request'):
raw_body = await request.json()
body = StreamRequestBody(**raw_body)
session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode)
answer = await session.get_answer()
session.start()
async with request.app['stream_lock']:
# Fully disconnect any other active stream before starting the replacement.
for sid, s in list(stream_dict.items()):
if s.run_task and not s.run_task.done():
try:
ch = s.stream.get_messaging_channel()
ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."}))
except Exception:
pass
await s.stop()
del stream_dict[sid]
stream_dict[session.identifier] = session
session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode)
try:
answer = await session.get_answer()
except ValueError as e:
await session.stop()
raise web.HTTPBadRequest(
text=json.dumps({"error": "invalid_sdp", "message": str(e)}),
content_type="application/json",
) from e
except Exception:
await session.stop()
raise
session.start()
stream_dict[session.identifier] = session
return web.json_response({"sdp": answer.sdp, "type": answer.type})
@@ -224,6 +363,7 @@ async def get_schema(request: 'web.Request'):
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
return web.json_response(schema_dict)
async def post_notify(request: 'web.Request'):
try:
payload = await request.json()
@@ -239,9 +379,10 @@ async def post_notify(request: 'web.Request'):
return web.Response(status=200, text="OK")
async def on_shutdown(app: 'web.Application'):
for session in app['streams'].values():
session.stop()
await session.stop()
del app['streams']
@@ -254,6 +395,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
app = web.Application()
app['streams'] = dict()
app['stream_lock'] = asyncio.Lock()
app['debug'] = debug
app.on_shutdown.append(on_shutdown)
app.router.add_post("/stream", get_stream)
+1 -1
View File
@@ -56,7 +56,7 @@ async def ping(request: 'web.Request'):
async def offer(request: 'web.Request'):
params = await request.json()
body = StreamRequestBody(params["sdp"], ["driver"], ["testJoystick"], ["carState"])
body = StreamRequestBody(params["sdp"], "driver", ["testJoystick"], ["carState"])
body_json = json.dumps(dataclasses.asdict(body))
logger.info("Sending offer to webrtcd...")