#!/bin/sh """:" REPO_ROOT="$(CDPATH= cd -- "$(dirname "$0")" && pwd)" PYTHON_BIN="$REPO_ROOT/.venv/bin/python" if [ ! -x "$PYTHON_BIN" ]; then echo "Missing $PYTHON_BIN. Run 'uv sync' from $REPO_ROOT first." >&2 exit 1 fi exec "$PYTHON_BIN" "$0" "$@" ":""" from __future__ import annotations import argparse import bz2 import io import json import os import re import shutil import sys import zipfile from dataclasses import dataclass from datetime import UTC, datetime from email.utils import parsedate_to_datetime from pathlib import Path from typing import Any from urllib.parse import quote, urlparse import requests import zstandard from cereal import log API_HOST = os.getenv("COMMA_API_HOST", "https://api.commadotai.com").rstrip("/") ROUTE_ID_RE = re.compile(r"([0-9a-f]{16})/([^/]+)") @dataclass(frozen=True) class StreamSpec: api_key: str display_name: str @dataclass(frozen=True) class RouteId: dongle_id: str log_id: str @property def canonical_name(self) -> str: return f"{self.dongle_id}|{self.log_id}" @property def cli_name(self) -> str: return f"{self.dongle_id}/{self.log_id}" @property def safe_name(self) -> str: return f"{self.dongle_id}_{self.log_id}" @dataclass class DownloadedFile: segment: int stream: str filename: str relative_path: str mtime_epoch: int | None @dataclass class MapTileSummary: qlogs_scanned: int = 0 mapd_messages: int = 0 tile_loaded_messages: int = 0 first_tile_segment: int | None = None @property def tile_loaded_any(self) -> bool: return self.tile_loaded_messages > 0 class ValidationError(Exception): def __init__(self, route: RouteId, reasons: list[str]): self.route = route self.reasons = reasons super().__init__(self._render()) def _render(self) -> str: lines = [f"Validation failed for {self.route.cli_name}:"] lines.extend(f"- {reason}" for reason in self.reasons) return "\n".join(lines) REQUIRED_STREAMS = ( StreamSpec("qlogs", "qlog"), StreamSpec("logs", "rlog"), StreamSpec("cameras", "fcamera.hevc"), ) def parse_args() -> argparse.Namespace: argv = list(sys.argv[1:]) if argv and ROUTE_ID_RE.fullmatch(argv[0].lstrip("-")): if argv[0].startswith("--"): argv[0] = argv[0][2:] argv = ["--routeid", argv[0], *argv[1:]] parser = argparse.ArgumentParser( description="Validate and bundle a public comma route for speed-limit vision review." ) parser.add_argument("routeid_positional", nargs="?", help="Route id like 'dongle/logid' or 'dongle/logid--7'.") parser.add_argument("--routeid", help="Route id like 'dongle/logid' or 'dongle/logid--7'.") parser.add_argument("--desktop-root", type=Path, default=Path.home() / "Desktop", help="Destination root for the bundle folder and zip.") parser.add_argument("--timeout", type=float, default=60.0, help="HTTP timeout in seconds.") parser.add_argument("--overwrite", action="store_true", help="Replace an existing Desktop bundle for this route.") parser.add_argument("--validate-only", action="store_true", help="Run validation only and skip downloads/zipping.") args = parser.parse_args(argv) args.routeid = args.routeid or args.routeid_positional if not args.routeid: parser.error("the following arguments are required: --routeid") return args def parse_route_id(raw: str) -> RouteId: text = raw.strip().strip("'\"").replace("|", "/") match = ROUTE_ID_RE.fullmatch(text) if match is None: raise ValueError(f"Unrecognized route id: {raw}") dongle_id = match.group(1) tail = match.group(2) parts = tail.split("--") if len(parts) == 3 and parts[-1].isdigit(): log_id = "--".join(parts[:2]) else: log_id = tail if len(log_id) != 20: raise ValueError(f"Invalid log id in route: {raw}") return RouteId(dongle_id=dongle_id, log_id=log_id) def route_url(route: RouteId) -> str: return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/" def route_files_url(route: RouteId) -> str: return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/files" def format_segments(segments: list[int]) -> str: if not segments: return "none" ranges: list[str] = [] start = prev = segments[0] for segment in segments[1:]: if segment == prev + 1: prev = segment continue ranges.append(f"{start}-{prev}" if start != prev else str(start)) start = prev = segment ranges.append(f"{start}-{prev}" if start != prev else str(start)) return ", ".join(ranges) def as_int(value: Any) -> int | None: if isinstance(value, bool): return int(value) if isinstance(value, int): return value if isinstance(value, str) and value.isdigit(): return int(value) return None def relative_posix(path: Path, root: Path) -> str: return path.relative_to(root).as_posix() def fetch_json(session: requests.Session, url: str, timeout: float) -> Any: response = session.get(url, timeout=timeout, allow_redirects=True) response.raise_for_status() return response.json() def fetch_bytes(session: requests.Session, url: str, timeout: float) -> bytes: response = session.get(url, timeout=timeout, allow_redirects=True) response.raise_for_status() return response.content def fetch_stream_to_path(session: requests.Session, url: str, dest_path: Path, timeout: float) -> int | None: dest_path.parent.mkdir(parents=True, exist_ok=True) response = session.get(url, stream=True, timeout=timeout, allow_redirects=True) response.raise_for_status() temp_path = dest_path.with_suffix(dest_path.suffix + ".part") with temp_path.open("wb") as handle: for chunk in response.iter_content(chunk_size=1 << 20): if chunk: handle.write(chunk) temp_path.replace(dest_path) last_modified = response.headers.get("Last-Modified") if not last_modified: return None epoch = int(parsedate_to_datetime(last_modified).timestamp()) os.utime(dest_path, (epoch, epoch)) return epoch def decompress_log_bytes(payload: bytes) -> bytes: if payload[:3] == b"BZh": return bz2.decompress(payload) if payload[:4] == b"\x28\xb5\x2f\xfd": reader = zstandard.ZstdDecompressor().stream_reader(io.BytesIO(payload)) try: return reader.read() finally: reader.close() return payload def load_init_params(qlog_bytes: bytes) -> dict[str, str]: payload = decompress_log_bytes(qlog_bytes) for msg in log.Event.read_multiple_bytes(payload): if msg.which() != "initData": continue params: dict[str, str] = {} entries = msg.initData.params.to_dict().get("entries", []) for entry in entries: key = entry.get("key") if not key: continue value = entry.get("value", b"") if isinstance(value, bytes): value = value.decode("utf-8", "replace") elif value is None: value = "" else: value = str(value) params[str(key)] = value return params raise RuntimeError("No initData params found in qlog.") def scan_qlogs_for_map_tiles(session: requests.Session, qlog_urls: list[str | None], timeout: float) -> MapTileSummary: summary = MapTileSummary() for segment, url in enumerate(qlog_urls): if not isinstance(url, str) or not url: continue summary.qlogs_scanned += 1 payload = decompress_log_bytes(fetch_bytes(session, url, timeout)) for msg in log.Event.read_multiple_bytes(payload): if msg.which() != "mapdOut": continue summary.mapd_messages += 1 if msg.mapdOut.tileLoaded: summary.tile_loaded_messages += 1 if summary.first_tile_segment is None: summary.first_tile_segment = segment if summary.tile_loaded_any: break return summary def expected_segment_count(route_meta: dict[str, Any], files_payload: dict[str, Any]) -> int: candidates: list[int] = [] for key in ("maxqlog", "maxlog"): value = as_int(route_meta.get(key)) if value is not None: candidates.append(value + 1) for key in ("qlogs", "logs", "cameras"): value = files_payload.get(key) if isinstance(value, list): candidates.append(len(value)) return max(candidates, default=0) def collect_missing_segments(urls: list[Any], expected_segments: int) -> list[int]: missing: list[int] = [] for segment in range(expected_segments): url = urls[segment] if segment < len(urls) else None if not isinstance(url, str) or not url: missing.append(segment) return missing def first_available_url(urls: list[Any]) -> str | None: for url in urls: if isinstance(url, str) and url: return url return None def filename_from_url(url: str) -> str: return Path(urlparse(url).path).name def validate_route(route: RouteId, session: requests.Session, timeout: float) -> dict[str, Any]: try: route_meta = fetch_json(session, route_url(route), timeout) except requests.HTTPError as exc: status_code = exc.response.status_code if exc.response is not None else "unknown" raise ValidationError(route, [f"route is not publicly accessible from comma connect (HTTP {status_code})."]) from exc if not route_meta.get("is_public", False): raise ValidationError(route, ["route metadata loaded, but `is_public` was false."]) try: files_payload = fetch_json(session, route_files_url(route), timeout) except requests.HTTPError as exc: status_code = exc.response.status_code if exc.response is not None else "unknown" raise ValidationError(route, [f"public route files could not be fetched from comma connect (HTTP {status_code})."]) from exc expected_segments = expected_segment_count(route_meta, files_payload) if expected_segments <= 0: raise ValidationError(route, ["could not determine any route segments from comma connect."]) failures: list[str] = [] stream_urls: dict[str, list[Any]] = {} for spec in REQUIRED_STREAMS: urls = files_payload.get(spec.api_key) if not isinstance(urls, list): urls = [] stream_urls[spec.api_key] = urls missing_segments = collect_missing_segments(urls, expected_segments) if missing_segments: failures.append(f"missing uploaded {spec.display_name} files for segment(s): {format_segments(missing_segments)}.") first_qlog_url = first_available_url(stream_urls["qlogs"]) params: dict[str, str] = {} map_tiles = MapTileSummary() if first_qlog_url is None: failures.append("no qlog was available to inspect route params.") else: try: params = load_init_params(fetch_bytes(session, first_qlog_url, timeout)) except Exception as exc: failures.append(f"could not read `initData.params` from the first qlog: {exc}.") if params: auto_bookmark = params.get("VisionSpeedLimitAutoBookmark", "") if auto_bookmark != "1": failures.append(f"`VisionSpeedLimitAutoBookmark` was {auto_bookmark!r}; it must be '1'.") vision_detection = params.get("VisionSpeedLimitDetection", "") if vision_detection != "1": failures.append(f"`VisionSpeedLimitDetection` was {vision_detection!r}; it must be '1' for useful vision-model collection.") try: qlog_urls = [url if isinstance(url, str) and url else None for url in stream_urls["qlogs"]] map_tiles = scan_qlogs_for_map_tiles(session, qlog_urls, timeout) if not map_tiles.tile_loaded_any: maps_selected = params.get("MapsSelected", "") if maps_selected: failures.append( "offline map tiles never loaded during this route even though " f"`MapsSelected` was {maps_selected!r}; tell the user to download local maps for the driven area so collection trains faster." ) else: failures.append( "offline map tiles never loaded during this route and `MapsSelected` was empty; " "tell the user to download local maps for the driven area so collection trains faster." ) except Exception as exc: failures.append(f"could not inspect qlogs for offline map usage: {exc}.") if failures: raise ValidationError(route, failures) return { "route_meta": route_meta, "files_payload": files_payload, "expected_segments": expected_segments, "params": params, "map_tiles": map_tiles, } def prepare_output_paths(route: RouteId, desktop_root: Path, overwrite: bool) -> tuple[Path, Path]: desktop_root = desktop_root.expanduser().resolve() desktop_root.mkdir(parents=True, exist_ok=True) bundle_root = desktop_root / f"speed_limit_route_bundle_{route.safe_name}" zip_path = desktop_root / f"{bundle_root.name}.zip" if overwrite: if bundle_root.exists(): shutil.rmtree(bundle_root) if zip_path.exists(): zip_path.unlink() else: if bundle_root.exists(): raise FileExistsError(f"Bundle folder already exists: {bundle_root}") if zip_path.exists(): raise FileExistsError(f"Bundle zip already exists: {zip_path}") return bundle_root, zip_path def write_bundle_manifests( bundle_root: Path, route: RouteId, validation: dict[str, Any], downloaded_files: list[DownloadedFile], ) -> None: live_routes_meta = bundle_root / "live_routes_meta" live_routes_meta.mkdir(parents=True, exist_ok=True) files_txt_lines: list[str] = [] qlog_mtimes_lines: list[str] = [] for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename)): segment_name = f"{route.log_id}--{record.segment}" files_txt_lines.append(f"{segment_name} {record.relative_path}") if record.filename.startswith("qlog") and record.mtime_epoch is not None: qlog_mtimes_lines.append(f"{record.relative_path} {record.mtime_epoch}") (live_routes_meta / "files.txt").write_text("\n".join(files_txt_lines) + ("\n" if files_txt_lines else ""), encoding="utf-8") (live_routes_meta / "qlog_mtimes.txt").write_text("\n".join(qlog_mtimes_lines) + ("\n" if qlog_mtimes_lines else ""), encoding="utf-8") params = validation["params"] route_meta = validation["route_meta"] map_tiles: MapTileSummary = validation["map_tiles"] manifest = { "bundleCreatedAt": datetime.now(UTC).isoformat(), "routeId": route.cli_name, "routeFullname": route.canonical_name, "expectedSegments": validation["expected_segments"], "downloadedStreams": [spec.display_name for spec in REQUIRED_STREAMS], "validation": { "isPublic": bool(route_meta.get("is_public")), "visionSpeedLimitAutoBookmark": params.get("VisionSpeedLimitAutoBookmark", ""), "visionSpeedLimitDetection": params.get("VisionSpeedLimitDetection", ""), "mapsSelected": params.get("MapsSelected", ""), "lastMapsUpdate": params.get("LastMapsUpdate", ""), "mapdMessagesScanned": map_tiles.mapd_messages, "offlineTileLoaded": map_tiles.tile_loaded_any, "firstTileSegment": map_tiles.first_tile_segment, }, "routeMeta": { "startTime": route_meta.get("start_time"), "endTime": route_meta.get("end_time"), "distance": route_meta.get("distance"), "startLat": route_meta.get("start_lat"), "startLng": route_meta.get("start_lng"), "endLat": route_meta.get("end_lat"), "endLng": route_meta.get("end_lng"), "gitBranch": route_meta.get("git_branch"), "gitCommit": route_meta.get("git_commit"), "platform": route_meta.get("platform"), "version": route_meta.get("version"), "make": route_meta.get("make"), }, "files": [ { "segment": record.segment, "stream": record.stream, "filename": record.filename, "relativePath": record.relative_path, "mtimeEpoch": record.mtime_epoch, } for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename)) ], } (bundle_root / "bundle_manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8") def zip_bundle(bundle_root: Path, zip_path: Path) -> None: with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as archive: for path in sorted(bundle_root.rglob("*")): if path.is_file(): archive.write(path, arcname=Path(bundle_root.name) / path.relative_to(bundle_root)) def download_bundle( route: RouteId, validation: dict[str, Any], session: requests.Session, desktop_root: Path, timeout: float, overwrite: bool, ) -> tuple[Path, Path]: bundle_root, zip_path = prepare_output_paths(route, desktop_root, overwrite) clip_root = bundle_root / "data" / "media" / "0" / "realdata" expected_segments = validation["expected_segments"] files_payload = validation["files_payload"] downloaded_files: list[DownloadedFile] = [] for spec in REQUIRED_STREAMS: urls = files_payload.get(spec.api_key, []) for segment in range(expected_segments): url = urls[segment] if not isinstance(url, str) or not url: raise RuntimeError(f"Unexpected missing {spec.display_name} URL for segment {segment} after validation.") filename = filename_from_url(url) segment_name = f"{route.log_id}--{segment}" dest_path = clip_root / segment_name / filename print(f"Downloading {segment_name}/{filename}") mtime_epoch = fetch_stream_to_path(session, url, dest_path, timeout) downloaded_files.append( DownloadedFile( segment=segment, stream=spec.display_name, filename=filename, relative_path=relative_posix(dest_path, bundle_root), mtime_epoch=mtime_epoch, ) ) write_bundle_manifests(bundle_root, route, validation, downloaded_files) print(f"Creating {zip_path.name}") zip_bundle(bundle_root, zip_path) return bundle_root, zip_path def print_validation_summary(route: RouteId, validation: dict[str, Any]) -> None: params = validation["params"] map_tiles: MapTileSummary = validation["map_tiles"] print(f"Validated {route.cli_name}") print(f" public route: yes") print(f" segments: {validation['expected_segments']}") print(f" VisionSpeedLimitDetection: {params.get('VisionSpeedLimitDetection', '')}") print(f" VisionSpeedLimitAutoBookmark: {params.get('VisionSpeedLimitAutoBookmark', '')}") print(f" MapsSelected: {params.get('MapsSelected', '') or '(empty)'}") print(f" offline tile loaded: {'yes' if map_tiles.tile_loaded_any else 'no'}") def main() -> int: args = parse_args() route = parse_route_id(args.routeid) session = requests.Session() session.headers.update({"User-Agent": "starpilot-speed-limit-route-bundler/1.0"}) try: validation = validate_route(route, session, args.timeout) print_validation_summary(route, validation) if args.validate_only: return 0 bundle_root, zip_path = download_bundle(route, validation, session, args.desktop_root, args.timeout, args.overwrite) print(f"Bundle folder: {bundle_root}") print(f"Bundle zip: {zip_path}") return 0 except ValidationError as exc: print(str(exc), file=sys.stderr) return 2 except FileExistsError as exc: print(f"{exc}\nUse --overwrite to replace the existing Desktop bundle, or --validate-only to skip packaging.", file=sys.stderr) return 3 except Exception as exc: print(f"Error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())