mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-15 03:54:49 +08:00
574 lines
19 KiB
Bash
Executable File
574 lines
19 KiB
Bash
Executable File
#!/bin/sh
|
|
""":"
|
|
REPO_ROOT="$(CDPATH= cd -- "$(dirname "$0")" && pwd)"
|
|
PYTHON_BIN="$REPO_ROOT/.venv/bin/python"
|
|
|
|
if [ ! -x "$PYTHON_BIN" ]; then
|
|
echo "Missing $PYTHON_BIN. Run 'uv sync' from $REPO_ROOT first." >&2
|
|
exit 1
|
|
fi
|
|
|
|
exec "$PYTHON_BIN" "$0" "$@"
|
|
":"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import bz2
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sys
|
|
import zipfile
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
from email.utils import parsedate_to_datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import quote, urlparse
|
|
|
|
import requests
|
|
import zstandard
|
|
|
|
from cereal import log
|
|
|
|
|
|
API_HOST = os.getenv("COMMA_API_HOST", "https://api.commadotai.com").rstrip("/")
|
|
ROUTE_ID_RE = re.compile(r"([0-9a-f]{16})/([^/]+)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class StreamSpec:
|
|
api_key: str
|
|
display_name: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RouteId:
|
|
dongle_id: str
|
|
log_id: str
|
|
|
|
@property
|
|
def canonical_name(self) -> str:
|
|
return f"{self.dongle_id}|{self.log_id}"
|
|
|
|
@property
|
|
def cli_name(self) -> str:
|
|
return f"{self.dongle_id}/{self.log_id}"
|
|
|
|
@property
|
|
def safe_name(self) -> str:
|
|
return f"{self.dongle_id}_{self.log_id}"
|
|
|
|
|
|
@dataclass
|
|
class DownloadedFile:
|
|
segment: int
|
|
stream: str
|
|
filename: str
|
|
relative_path: str
|
|
mtime_epoch: int | None
|
|
|
|
|
|
@dataclass
|
|
class MapTileSummary:
|
|
qlogs_scanned: int = 0
|
|
mapd_messages: int = 0
|
|
tile_loaded_messages: int = 0
|
|
first_tile_segment: int | None = None
|
|
|
|
@property
|
|
def tile_loaded_any(self) -> bool:
|
|
return self.tile_loaded_messages > 0
|
|
|
|
|
|
class ValidationError(Exception):
|
|
def __init__(self, route: RouteId, reasons: list[str]):
|
|
self.route = route
|
|
self.reasons = reasons
|
|
super().__init__(self._render())
|
|
|
|
def _render(self) -> str:
|
|
lines = [f"Validation failed for {self.route.cli_name}:"]
|
|
lines.extend(f"- {reason}" for reason in self.reasons)
|
|
return "\n".join(lines)
|
|
|
|
|
|
REQUIRED_STREAMS = (
|
|
StreamSpec("qlogs", "qlog"),
|
|
StreamSpec("logs", "rlog"),
|
|
StreamSpec("cameras", "fcamera.hevc"),
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
argv = list(sys.argv[1:])
|
|
if argv and ROUTE_ID_RE.fullmatch(argv[0].lstrip("-")):
|
|
if argv[0].startswith("--"):
|
|
argv[0] = argv[0][2:]
|
|
argv = ["--routeid", argv[0], *argv[1:]]
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate and bundle a public comma route for speed-limit vision review."
|
|
)
|
|
parser.add_argument("routeid_positional", nargs="?", help="Route id like 'dongle/logid' or 'dongle/logid--7'.")
|
|
parser.add_argument("--routeid", help="Route id like 'dongle/logid' or 'dongle/logid--7'.")
|
|
parser.add_argument("--desktop-root", type=Path, default=Path.home() / "Desktop", help="Destination root for the bundle folder and zip.")
|
|
parser.add_argument("--timeout", type=float, default=60.0, help="HTTP timeout in seconds.")
|
|
parser.add_argument("--overwrite", action="store_true", help="Replace an existing Desktop bundle for this route.")
|
|
parser.add_argument("--validate-only", action="store_true", help="Run validation only and skip downloads/zipping.")
|
|
args = parser.parse_args(argv)
|
|
args.routeid = args.routeid or args.routeid_positional
|
|
if not args.routeid:
|
|
parser.error("the following arguments are required: --routeid")
|
|
return args
|
|
|
|
|
|
def parse_route_id(raw: str) -> RouteId:
|
|
text = raw.strip().strip("'\"").replace("|", "/")
|
|
match = ROUTE_ID_RE.fullmatch(text)
|
|
if match is None:
|
|
raise ValueError(f"Unrecognized route id: {raw}")
|
|
|
|
dongle_id = match.group(1)
|
|
tail = match.group(2)
|
|
parts = tail.split("--")
|
|
if len(parts) == 3 and parts[-1].isdigit():
|
|
log_id = "--".join(parts[:2])
|
|
else:
|
|
log_id = tail
|
|
|
|
if len(log_id) != 20:
|
|
raise ValueError(f"Invalid log id in route: {raw}")
|
|
return RouteId(dongle_id=dongle_id, log_id=log_id)
|
|
|
|
|
|
def route_url(route: RouteId) -> str:
|
|
return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/"
|
|
|
|
|
|
def route_files_url(route: RouteId) -> str:
|
|
return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/files"
|
|
|
|
|
|
def format_segments(segments: list[int]) -> str:
|
|
if not segments:
|
|
return "none"
|
|
|
|
ranges: list[str] = []
|
|
start = prev = segments[0]
|
|
for segment in segments[1:]:
|
|
if segment == prev + 1:
|
|
prev = segment
|
|
continue
|
|
ranges.append(f"{start}-{prev}" if start != prev else str(start))
|
|
start = prev = segment
|
|
ranges.append(f"{start}-{prev}" if start != prev else str(start))
|
|
return ", ".join(ranges)
|
|
|
|
|
|
def as_int(value: Any) -> int | None:
|
|
if isinstance(value, bool):
|
|
return int(value)
|
|
if isinstance(value, int):
|
|
return value
|
|
if isinstance(value, str) and value.isdigit():
|
|
return int(value)
|
|
return None
|
|
|
|
|
|
def relative_posix(path: Path, root: Path) -> str:
|
|
return path.relative_to(root).as_posix()
|
|
|
|
|
|
def fetch_json(session: requests.Session, url: str, timeout: float) -> Any:
|
|
response = session.get(url, timeout=timeout, allow_redirects=True)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def fetch_bytes(session: requests.Session, url: str, timeout: float) -> bytes:
|
|
response = session.get(url, timeout=timeout, allow_redirects=True)
|
|
response.raise_for_status()
|
|
return response.content
|
|
|
|
|
|
def fetch_stream_to_path(session: requests.Session, url: str, dest_path: Path, timeout: float) -> int | None:
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
response = session.get(url, stream=True, timeout=timeout, allow_redirects=True)
|
|
response.raise_for_status()
|
|
|
|
temp_path = dest_path.with_suffix(dest_path.suffix + ".part")
|
|
with temp_path.open("wb") as handle:
|
|
for chunk in response.iter_content(chunk_size=1 << 20):
|
|
if chunk:
|
|
handle.write(chunk)
|
|
temp_path.replace(dest_path)
|
|
|
|
last_modified = response.headers.get("Last-Modified")
|
|
if not last_modified:
|
|
return None
|
|
|
|
epoch = int(parsedate_to_datetime(last_modified).timestamp())
|
|
os.utime(dest_path, (epoch, epoch))
|
|
return epoch
|
|
|
|
|
|
def decompress_log_bytes(payload: bytes) -> bytes:
|
|
if payload[:3] == b"BZh":
|
|
return bz2.decompress(payload)
|
|
if payload[:4] == b"\x28\xb5\x2f\xfd":
|
|
reader = zstandard.ZstdDecompressor().stream_reader(io.BytesIO(payload))
|
|
try:
|
|
return reader.read()
|
|
finally:
|
|
reader.close()
|
|
return payload
|
|
|
|
|
|
def load_init_params(qlog_bytes: bytes) -> dict[str, str]:
|
|
payload = decompress_log_bytes(qlog_bytes)
|
|
for msg in log.Event.read_multiple_bytes(payload):
|
|
if msg.which() != "initData":
|
|
continue
|
|
|
|
params: dict[str, str] = {}
|
|
entries = msg.initData.params.to_dict().get("entries", [])
|
|
for entry in entries:
|
|
key = entry.get("key")
|
|
if not key:
|
|
continue
|
|
value = entry.get("value", b"")
|
|
if isinstance(value, bytes):
|
|
value = value.decode("utf-8", "replace")
|
|
elif value is None:
|
|
value = ""
|
|
else:
|
|
value = str(value)
|
|
params[str(key)] = value
|
|
return params
|
|
|
|
raise RuntimeError("No initData params found in qlog.")
|
|
|
|
|
|
def scan_qlogs_for_map_tiles(session: requests.Session, qlog_urls: list[str | None], timeout: float) -> MapTileSummary:
|
|
summary = MapTileSummary()
|
|
for segment, url in enumerate(qlog_urls):
|
|
if not isinstance(url, str) or not url:
|
|
continue
|
|
|
|
summary.qlogs_scanned += 1
|
|
payload = decompress_log_bytes(fetch_bytes(session, url, timeout))
|
|
for msg in log.Event.read_multiple_bytes(payload):
|
|
if msg.which() != "mapdOut":
|
|
continue
|
|
summary.mapd_messages += 1
|
|
if msg.mapdOut.tileLoaded:
|
|
summary.tile_loaded_messages += 1
|
|
if summary.first_tile_segment is None:
|
|
summary.first_tile_segment = segment
|
|
if summary.tile_loaded_any:
|
|
break
|
|
return summary
|
|
|
|
|
|
def expected_segment_count(route_meta: dict[str, Any], files_payload: dict[str, Any]) -> int:
|
|
candidates: list[int] = []
|
|
for key in ("maxqlog", "maxlog"):
|
|
value = as_int(route_meta.get(key))
|
|
if value is not None:
|
|
candidates.append(value + 1)
|
|
|
|
for key in ("qlogs", "logs", "cameras"):
|
|
value = files_payload.get(key)
|
|
if isinstance(value, list):
|
|
candidates.append(len(value))
|
|
|
|
return max(candidates, default=0)
|
|
|
|
|
|
def collect_missing_segments(urls: list[Any], expected_segments: int) -> list[int]:
|
|
missing: list[int] = []
|
|
for segment in range(expected_segments):
|
|
url = urls[segment] if segment < len(urls) else None
|
|
if not isinstance(url, str) or not url:
|
|
missing.append(segment)
|
|
return missing
|
|
|
|
|
|
def first_available_url(urls: list[Any]) -> str | None:
|
|
for url in urls:
|
|
if isinstance(url, str) and url:
|
|
return url
|
|
return None
|
|
|
|
|
|
def filename_from_url(url: str) -> str:
|
|
return Path(urlparse(url).path).name
|
|
|
|
|
|
def validate_route(route: RouteId, session: requests.Session, timeout: float) -> dict[str, Any]:
|
|
try:
|
|
route_meta = fetch_json(session, route_url(route), timeout)
|
|
except requests.HTTPError as exc:
|
|
status_code = exc.response.status_code if exc.response is not None else "unknown"
|
|
raise ValidationError(route, [f"route is not publicly accessible from comma connect (HTTP {status_code})."]) from exc
|
|
|
|
if not route_meta.get("is_public", False):
|
|
raise ValidationError(route, ["route metadata loaded, but `is_public` was false."])
|
|
|
|
try:
|
|
files_payload = fetch_json(session, route_files_url(route), timeout)
|
|
except requests.HTTPError as exc:
|
|
status_code = exc.response.status_code if exc.response is not None else "unknown"
|
|
raise ValidationError(route, [f"public route files could not be fetched from comma connect (HTTP {status_code})."]) from exc
|
|
|
|
expected_segments = expected_segment_count(route_meta, files_payload)
|
|
if expected_segments <= 0:
|
|
raise ValidationError(route, ["could not determine any route segments from comma connect."])
|
|
|
|
failures: list[str] = []
|
|
stream_urls: dict[str, list[Any]] = {}
|
|
for spec in REQUIRED_STREAMS:
|
|
urls = files_payload.get(spec.api_key)
|
|
if not isinstance(urls, list):
|
|
urls = []
|
|
stream_urls[spec.api_key] = urls
|
|
missing_segments = collect_missing_segments(urls, expected_segments)
|
|
if missing_segments:
|
|
failures.append(f"missing uploaded {spec.display_name} files for segment(s): {format_segments(missing_segments)}.")
|
|
|
|
first_qlog_url = first_available_url(stream_urls["qlogs"])
|
|
params: dict[str, str] = {}
|
|
map_tiles = MapTileSummary()
|
|
if first_qlog_url is None:
|
|
failures.append("no qlog was available to inspect route params.")
|
|
else:
|
|
try:
|
|
params = load_init_params(fetch_bytes(session, first_qlog_url, timeout))
|
|
except Exception as exc:
|
|
failures.append(f"could not read `initData.params` from the first qlog: {exc}.")
|
|
|
|
if params:
|
|
auto_bookmark = params.get("VisionSpeedLimitAutoBookmark", "")
|
|
if auto_bookmark != "1":
|
|
failures.append(f"`VisionSpeedLimitAutoBookmark` was {auto_bookmark!r}; it must be '1'.")
|
|
|
|
vision_detection = params.get("VisionSpeedLimitDetection", "")
|
|
if vision_detection != "1":
|
|
failures.append(f"`VisionSpeedLimitDetection` was {vision_detection!r}; it must be '1' for useful vision-model collection.")
|
|
|
|
try:
|
|
qlog_urls = [url if isinstance(url, str) and url else None for url in stream_urls["qlogs"]]
|
|
map_tiles = scan_qlogs_for_map_tiles(session, qlog_urls, timeout)
|
|
if not map_tiles.tile_loaded_any:
|
|
maps_selected = params.get("MapsSelected", "")
|
|
if maps_selected:
|
|
failures.append(
|
|
"offline map tiles never loaded during this route even though "
|
|
f"`MapsSelected` was {maps_selected!r}; tell the user to download local maps for the driven area so collection trains faster."
|
|
)
|
|
else:
|
|
failures.append(
|
|
"offline map tiles never loaded during this route and `MapsSelected` was empty; "
|
|
"tell the user to download local maps for the driven area so collection trains faster."
|
|
)
|
|
except Exception as exc:
|
|
failures.append(f"could not inspect qlogs for offline map usage: {exc}.")
|
|
|
|
if failures:
|
|
raise ValidationError(route, failures)
|
|
|
|
return {
|
|
"route_meta": route_meta,
|
|
"files_payload": files_payload,
|
|
"expected_segments": expected_segments,
|
|
"params": params,
|
|
"map_tiles": map_tiles,
|
|
}
|
|
|
|
|
|
def prepare_output_paths(route: RouteId, desktop_root: Path, overwrite: bool) -> tuple[Path, Path]:
|
|
desktop_root = desktop_root.expanduser().resolve()
|
|
desktop_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
bundle_root = desktop_root / f"speed_limit_route_bundle_{route.safe_name}"
|
|
zip_path = desktop_root / f"{bundle_root.name}.zip"
|
|
if overwrite:
|
|
if bundle_root.exists():
|
|
shutil.rmtree(bundle_root)
|
|
if zip_path.exists():
|
|
zip_path.unlink()
|
|
else:
|
|
if bundle_root.exists():
|
|
raise FileExistsError(f"Bundle folder already exists: {bundle_root}")
|
|
if zip_path.exists():
|
|
raise FileExistsError(f"Bundle zip already exists: {zip_path}")
|
|
return bundle_root, zip_path
|
|
|
|
|
|
def write_bundle_manifests(
|
|
bundle_root: Path,
|
|
route: RouteId,
|
|
validation: dict[str, Any],
|
|
downloaded_files: list[DownloadedFile],
|
|
) -> None:
|
|
live_routes_meta = bundle_root / "live_routes_meta"
|
|
live_routes_meta.mkdir(parents=True, exist_ok=True)
|
|
|
|
files_txt_lines: list[str] = []
|
|
qlog_mtimes_lines: list[str] = []
|
|
for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename)):
|
|
segment_name = f"{route.log_id}--{record.segment}"
|
|
files_txt_lines.append(f"{segment_name} {record.relative_path}")
|
|
if record.filename.startswith("qlog") and record.mtime_epoch is not None:
|
|
qlog_mtimes_lines.append(f"{record.relative_path} {record.mtime_epoch}")
|
|
|
|
(live_routes_meta / "files.txt").write_text("\n".join(files_txt_lines) + ("\n" if files_txt_lines else ""), encoding="utf-8")
|
|
(live_routes_meta / "qlog_mtimes.txt").write_text("\n".join(qlog_mtimes_lines) + ("\n" if qlog_mtimes_lines else ""), encoding="utf-8")
|
|
|
|
params = validation["params"]
|
|
route_meta = validation["route_meta"]
|
|
map_tiles: MapTileSummary = validation["map_tiles"]
|
|
manifest = {
|
|
"bundleCreatedAt": datetime.now(UTC).isoformat(),
|
|
"routeId": route.cli_name,
|
|
"routeFullname": route.canonical_name,
|
|
"expectedSegments": validation["expected_segments"],
|
|
"downloadedStreams": [spec.display_name for spec in REQUIRED_STREAMS],
|
|
"validation": {
|
|
"isPublic": bool(route_meta.get("is_public")),
|
|
"visionSpeedLimitAutoBookmark": params.get("VisionSpeedLimitAutoBookmark", ""),
|
|
"visionSpeedLimitDetection": params.get("VisionSpeedLimitDetection", ""),
|
|
"mapsSelected": params.get("MapsSelected", ""),
|
|
"lastMapsUpdate": params.get("LastMapsUpdate", ""),
|
|
"mapdMessagesScanned": map_tiles.mapd_messages,
|
|
"offlineTileLoaded": map_tiles.tile_loaded_any,
|
|
"firstTileSegment": map_tiles.first_tile_segment,
|
|
},
|
|
"routeMeta": {
|
|
"startTime": route_meta.get("start_time"),
|
|
"endTime": route_meta.get("end_time"),
|
|
"distance": route_meta.get("distance"),
|
|
"startLat": route_meta.get("start_lat"),
|
|
"startLng": route_meta.get("start_lng"),
|
|
"endLat": route_meta.get("end_lat"),
|
|
"endLng": route_meta.get("end_lng"),
|
|
"gitBranch": route_meta.get("git_branch"),
|
|
"gitCommit": route_meta.get("git_commit"),
|
|
"platform": route_meta.get("platform"),
|
|
"version": route_meta.get("version"),
|
|
"make": route_meta.get("make"),
|
|
},
|
|
"files": [
|
|
{
|
|
"segment": record.segment,
|
|
"stream": record.stream,
|
|
"filename": record.filename,
|
|
"relativePath": record.relative_path,
|
|
"mtimeEpoch": record.mtime_epoch,
|
|
}
|
|
for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename))
|
|
],
|
|
}
|
|
(bundle_root / "bundle_manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
|
|
|
|
def zip_bundle(bundle_root: Path, zip_path: Path) -> None:
|
|
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as archive:
|
|
for path in sorted(bundle_root.rglob("*")):
|
|
if path.is_file():
|
|
archive.write(path, arcname=Path(bundle_root.name) / path.relative_to(bundle_root))
|
|
|
|
|
|
def download_bundle(
|
|
route: RouteId,
|
|
validation: dict[str, Any],
|
|
session: requests.Session,
|
|
desktop_root: Path,
|
|
timeout: float,
|
|
overwrite: bool,
|
|
) -> tuple[Path, Path]:
|
|
bundle_root, zip_path = prepare_output_paths(route, desktop_root, overwrite)
|
|
clip_root = bundle_root / "data" / "media" / "0" / "realdata"
|
|
|
|
expected_segments = validation["expected_segments"]
|
|
files_payload = validation["files_payload"]
|
|
downloaded_files: list[DownloadedFile] = []
|
|
|
|
for spec in REQUIRED_STREAMS:
|
|
urls = files_payload.get(spec.api_key, [])
|
|
for segment in range(expected_segments):
|
|
url = urls[segment]
|
|
if not isinstance(url, str) or not url:
|
|
raise RuntimeError(f"Unexpected missing {spec.display_name} URL for segment {segment} after validation.")
|
|
|
|
filename = filename_from_url(url)
|
|
segment_name = f"{route.log_id}--{segment}"
|
|
dest_path = clip_root / segment_name / filename
|
|
print(f"Downloading {segment_name}/{filename}")
|
|
mtime_epoch = fetch_stream_to_path(session, url, dest_path, timeout)
|
|
downloaded_files.append(
|
|
DownloadedFile(
|
|
segment=segment,
|
|
stream=spec.display_name,
|
|
filename=filename,
|
|
relative_path=relative_posix(dest_path, bundle_root),
|
|
mtime_epoch=mtime_epoch,
|
|
)
|
|
)
|
|
|
|
write_bundle_manifests(bundle_root, route, validation, downloaded_files)
|
|
print(f"Creating {zip_path.name}")
|
|
zip_bundle(bundle_root, zip_path)
|
|
return bundle_root, zip_path
|
|
|
|
|
|
def print_validation_summary(route: RouteId, validation: dict[str, Any]) -> None:
|
|
params = validation["params"]
|
|
map_tiles: MapTileSummary = validation["map_tiles"]
|
|
print(f"Validated {route.cli_name}")
|
|
print(f" public route: yes")
|
|
print(f" segments: {validation['expected_segments']}")
|
|
print(f" VisionSpeedLimitDetection: {params.get('VisionSpeedLimitDetection', '')}")
|
|
print(f" VisionSpeedLimitAutoBookmark: {params.get('VisionSpeedLimitAutoBookmark', '')}")
|
|
print(f" MapsSelected: {params.get('MapsSelected', '') or '(empty)'}")
|
|
print(f" offline tile loaded: {'yes' if map_tiles.tile_loaded_any else 'no'}")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
route = parse_route_id(args.routeid)
|
|
|
|
session = requests.Session()
|
|
session.headers.update({"User-Agent": "starpilot-speed-limit-route-bundler/1.0"})
|
|
|
|
try:
|
|
validation = validate_route(route, session, args.timeout)
|
|
print_validation_summary(route, validation)
|
|
|
|
if args.validate_only:
|
|
return 0
|
|
|
|
bundle_root, zip_path = download_bundle(route, validation, session, args.desktop_root, args.timeout, args.overwrite)
|
|
print(f"Bundle folder: {bundle_root}")
|
|
print(f"Bundle zip: {zip_path}")
|
|
return 0
|
|
except ValidationError as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 2
|
|
except FileExistsError as exc:
|
|
print(f"{exc}\nUse --overwrite to replace the existing Desktop bundle, or --validate-only to skip packaging.", file=sys.stderr)
|
|
return 3
|
|
except Exception as exc:
|
|
print(f"Error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|