Files
StarPilot/logs
firestar5683 25633be976 log grabber
2026-05-10 20:17:17 -05:00

574 lines
19 KiB
Bash
Executable File

#!/bin/sh
""":"
REPO_ROOT="$(CDPATH= cd -- "$(dirname "$0")" && pwd)"
PYTHON_BIN="$REPO_ROOT/.venv/bin/python"
if [ ! -x "$PYTHON_BIN" ]; then
echo "Missing $PYTHON_BIN. Run 'uv sync' from $REPO_ROOT first." >&2
exit 1
fi
exec "$PYTHON_BIN" "$0" "$@"
":"""
from __future__ import annotations
import argparse
import bz2
import io
import json
import os
import re
import shutil
import sys
import zipfile
from dataclasses import dataclass
from datetime import UTC, datetime
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlparse
import requests
import zstandard
from cereal import log
API_HOST = os.getenv("COMMA_API_HOST", "https://api.commadotai.com").rstrip("/")
ROUTE_ID_RE = re.compile(r"([0-9a-f]{16})/([^/]+)")
@dataclass(frozen=True)
class StreamSpec:
api_key: str
display_name: str
@dataclass(frozen=True)
class RouteId:
dongle_id: str
log_id: str
@property
def canonical_name(self) -> str:
return f"{self.dongle_id}|{self.log_id}"
@property
def cli_name(self) -> str:
return f"{self.dongle_id}/{self.log_id}"
@property
def safe_name(self) -> str:
return f"{self.dongle_id}_{self.log_id}"
@dataclass
class DownloadedFile:
segment: int
stream: str
filename: str
relative_path: str
mtime_epoch: int | None
@dataclass
class MapTileSummary:
qlogs_scanned: int = 0
mapd_messages: int = 0
tile_loaded_messages: int = 0
first_tile_segment: int | None = None
@property
def tile_loaded_any(self) -> bool:
return self.tile_loaded_messages > 0
class ValidationError(Exception):
def __init__(self, route: RouteId, reasons: list[str]):
self.route = route
self.reasons = reasons
super().__init__(self._render())
def _render(self) -> str:
lines = [f"Validation failed for {self.route.cli_name}:"]
lines.extend(f"- {reason}" for reason in self.reasons)
return "\n".join(lines)
REQUIRED_STREAMS = (
StreamSpec("qlogs", "qlog"),
StreamSpec("logs", "rlog"),
StreamSpec("cameras", "fcamera.hevc"),
)
def parse_args() -> argparse.Namespace:
argv = list(sys.argv[1:])
if argv and ROUTE_ID_RE.fullmatch(argv[0].lstrip("-")):
if argv[0].startswith("--"):
argv[0] = argv[0][2:]
argv = ["--routeid", argv[0], *argv[1:]]
parser = argparse.ArgumentParser(
description="Validate and bundle a public comma route for speed-limit vision review."
)
parser.add_argument("routeid_positional", nargs="?", help="Route id like 'dongle/logid' or 'dongle/logid--7'.")
parser.add_argument("--routeid", help="Route id like 'dongle/logid' or 'dongle/logid--7'.")
parser.add_argument("--desktop-root", type=Path, default=Path.home() / "Desktop", help="Destination root for the bundle folder and zip.")
parser.add_argument("--timeout", type=float, default=60.0, help="HTTP timeout in seconds.")
parser.add_argument("--overwrite", action="store_true", help="Replace an existing Desktop bundle for this route.")
parser.add_argument("--validate-only", action="store_true", help="Run validation only and skip downloads/zipping.")
args = parser.parse_args(argv)
args.routeid = args.routeid or args.routeid_positional
if not args.routeid:
parser.error("the following arguments are required: --routeid")
return args
def parse_route_id(raw: str) -> RouteId:
text = raw.strip().strip("'\"").replace("|", "/")
match = ROUTE_ID_RE.fullmatch(text)
if match is None:
raise ValueError(f"Unrecognized route id: {raw}")
dongle_id = match.group(1)
tail = match.group(2)
parts = tail.split("--")
if len(parts) == 3 and parts[-1].isdigit():
log_id = "--".join(parts[:2])
else:
log_id = tail
if len(log_id) != 20:
raise ValueError(f"Invalid log id in route: {raw}")
return RouteId(dongle_id=dongle_id, log_id=log_id)
def route_url(route: RouteId) -> str:
return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/"
def route_files_url(route: RouteId) -> str:
return f"{API_HOST}/v1/route/{quote(route.canonical_name, safe='')}/files"
def format_segments(segments: list[int]) -> str:
if not segments:
return "none"
ranges: list[str] = []
start = prev = segments[0]
for segment in segments[1:]:
if segment == prev + 1:
prev = segment
continue
ranges.append(f"{start}-{prev}" if start != prev else str(start))
start = prev = segment
ranges.append(f"{start}-{prev}" if start != prev else str(start))
return ", ".join(ranges)
def as_int(value: Any) -> int | None:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, str) and value.isdigit():
return int(value)
return None
def relative_posix(path: Path, root: Path) -> str:
return path.relative_to(root).as_posix()
def fetch_json(session: requests.Session, url: str, timeout: float) -> Any:
response = session.get(url, timeout=timeout, allow_redirects=True)
response.raise_for_status()
return response.json()
def fetch_bytes(session: requests.Session, url: str, timeout: float) -> bytes:
response = session.get(url, timeout=timeout, allow_redirects=True)
response.raise_for_status()
return response.content
def fetch_stream_to_path(session: requests.Session, url: str, dest_path: Path, timeout: float) -> int | None:
dest_path.parent.mkdir(parents=True, exist_ok=True)
response = session.get(url, stream=True, timeout=timeout, allow_redirects=True)
response.raise_for_status()
temp_path = dest_path.with_suffix(dest_path.suffix + ".part")
with temp_path.open("wb") as handle:
for chunk in response.iter_content(chunk_size=1 << 20):
if chunk:
handle.write(chunk)
temp_path.replace(dest_path)
last_modified = response.headers.get("Last-Modified")
if not last_modified:
return None
epoch = int(parsedate_to_datetime(last_modified).timestamp())
os.utime(dest_path, (epoch, epoch))
return epoch
def decompress_log_bytes(payload: bytes) -> bytes:
if payload[:3] == b"BZh":
return bz2.decompress(payload)
if payload[:4] == b"\x28\xb5\x2f\xfd":
reader = zstandard.ZstdDecompressor().stream_reader(io.BytesIO(payload))
try:
return reader.read()
finally:
reader.close()
return payload
def load_init_params(qlog_bytes: bytes) -> dict[str, str]:
payload = decompress_log_bytes(qlog_bytes)
for msg in log.Event.read_multiple_bytes(payload):
if msg.which() != "initData":
continue
params: dict[str, str] = {}
entries = msg.initData.params.to_dict().get("entries", [])
for entry in entries:
key = entry.get("key")
if not key:
continue
value = entry.get("value", b"")
if isinstance(value, bytes):
value = value.decode("utf-8", "replace")
elif value is None:
value = ""
else:
value = str(value)
params[str(key)] = value
return params
raise RuntimeError("No initData params found in qlog.")
def scan_qlogs_for_map_tiles(session: requests.Session, qlog_urls: list[str | None], timeout: float) -> MapTileSummary:
summary = MapTileSummary()
for segment, url in enumerate(qlog_urls):
if not isinstance(url, str) or not url:
continue
summary.qlogs_scanned += 1
payload = decompress_log_bytes(fetch_bytes(session, url, timeout))
for msg in log.Event.read_multiple_bytes(payload):
if msg.which() != "mapdOut":
continue
summary.mapd_messages += 1
if msg.mapdOut.tileLoaded:
summary.tile_loaded_messages += 1
if summary.first_tile_segment is None:
summary.first_tile_segment = segment
if summary.tile_loaded_any:
break
return summary
def expected_segment_count(route_meta: dict[str, Any], files_payload: dict[str, Any]) -> int:
candidates: list[int] = []
for key in ("maxqlog", "maxlog"):
value = as_int(route_meta.get(key))
if value is not None:
candidates.append(value + 1)
for key in ("qlogs", "logs", "cameras"):
value = files_payload.get(key)
if isinstance(value, list):
candidates.append(len(value))
return max(candidates, default=0)
def collect_missing_segments(urls: list[Any], expected_segments: int) -> list[int]:
missing: list[int] = []
for segment in range(expected_segments):
url = urls[segment] if segment < len(urls) else None
if not isinstance(url, str) or not url:
missing.append(segment)
return missing
def first_available_url(urls: list[Any]) -> str | None:
for url in urls:
if isinstance(url, str) and url:
return url
return None
def filename_from_url(url: str) -> str:
return Path(urlparse(url).path).name
def validate_route(route: RouteId, session: requests.Session, timeout: float) -> dict[str, Any]:
try:
route_meta = fetch_json(session, route_url(route), timeout)
except requests.HTTPError as exc:
status_code = exc.response.status_code if exc.response is not None else "unknown"
raise ValidationError(route, [f"route is not publicly accessible from comma connect (HTTP {status_code})."]) from exc
if not route_meta.get("is_public", False):
raise ValidationError(route, ["route metadata loaded, but `is_public` was false."])
try:
files_payload = fetch_json(session, route_files_url(route), timeout)
except requests.HTTPError as exc:
status_code = exc.response.status_code if exc.response is not None else "unknown"
raise ValidationError(route, [f"public route files could not be fetched from comma connect (HTTP {status_code})."]) from exc
expected_segments = expected_segment_count(route_meta, files_payload)
if expected_segments <= 0:
raise ValidationError(route, ["could not determine any route segments from comma connect."])
failures: list[str] = []
stream_urls: dict[str, list[Any]] = {}
for spec in REQUIRED_STREAMS:
urls = files_payload.get(spec.api_key)
if not isinstance(urls, list):
urls = []
stream_urls[spec.api_key] = urls
missing_segments = collect_missing_segments(urls, expected_segments)
if missing_segments:
failures.append(f"missing uploaded {spec.display_name} files for segment(s): {format_segments(missing_segments)}.")
first_qlog_url = first_available_url(stream_urls["qlogs"])
params: dict[str, str] = {}
map_tiles = MapTileSummary()
if first_qlog_url is None:
failures.append("no qlog was available to inspect route params.")
else:
try:
params = load_init_params(fetch_bytes(session, first_qlog_url, timeout))
except Exception as exc:
failures.append(f"could not read `initData.params` from the first qlog: {exc}.")
if params:
auto_bookmark = params.get("VisionSpeedLimitAutoBookmark", "")
if auto_bookmark != "1":
failures.append(f"`VisionSpeedLimitAutoBookmark` was {auto_bookmark!r}; it must be '1'.")
vision_detection = params.get("VisionSpeedLimitDetection", "")
if vision_detection != "1":
failures.append(f"`VisionSpeedLimitDetection` was {vision_detection!r}; it must be '1' for useful vision-model collection.")
try:
qlog_urls = [url if isinstance(url, str) and url else None for url in stream_urls["qlogs"]]
map_tiles = scan_qlogs_for_map_tiles(session, qlog_urls, timeout)
if not map_tiles.tile_loaded_any:
maps_selected = params.get("MapsSelected", "")
if maps_selected:
failures.append(
"offline map tiles never loaded during this route even though "
f"`MapsSelected` was {maps_selected!r}; tell the user to download local maps for the driven area so collection trains faster."
)
else:
failures.append(
"offline map tiles never loaded during this route and `MapsSelected` was empty; "
"tell the user to download local maps for the driven area so collection trains faster."
)
except Exception as exc:
failures.append(f"could not inspect qlogs for offline map usage: {exc}.")
if failures:
raise ValidationError(route, failures)
return {
"route_meta": route_meta,
"files_payload": files_payload,
"expected_segments": expected_segments,
"params": params,
"map_tiles": map_tiles,
}
def prepare_output_paths(route: RouteId, desktop_root: Path, overwrite: bool) -> tuple[Path, Path]:
desktop_root = desktop_root.expanduser().resolve()
desktop_root.mkdir(parents=True, exist_ok=True)
bundle_root = desktop_root / f"speed_limit_route_bundle_{route.safe_name}"
zip_path = desktop_root / f"{bundle_root.name}.zip"
if overwrite:
if bundle_root.exists():
shutil.rmtree(bundle_root)
if zip_path.exists():
zip_path.unlink()
else:
if bundle_root.exists():
raise FileExistsError(f"Bundle folder already exists: {bundle_root}")
if zip_path.exists():
raise FileExistsError(f"Bundle zip already exists: {zip_path}")
return bundle_root, zip_path
def write_bundle_manifests(
bundle_root: Path,
route: RouteId,
validation: dict[str, Any],
downloaded_files: list[DownloadedFile],
) -> None:
live_routes_meta = bundle_root / "live_routes_meta"
live_routes_meta.mkdir(parents=True, exist_ok=True)
files_txt_lines: list[str] = []
qlog_mtimes_lines: list[str] = []
for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename)):
segment_name = f"{route.log_id}--{record.segment}"
files_txt_lines.append(f"{segment_name} {record.relative_path}")
if record.filename.startswith("qlog") and record.mtime_epoch is not None:
qlog_mtimes_lines.append(f"{record.relative_path} {record.mtime_epoch}")
(live_routes_meta / "files.txt").write_text("\n".join(files_txt_lines) + ("\n" if files_txt_lines else ""), encoding="utf-8")
(live_routes_meta / "qlog_mtimes.txt").write_text("\n".join(qlog_mtimes_lines) + ("\n" if qlog_mtimes_lines else ""), encoding="utf-8")
params = validation["params"]
route_meta = validation["route_meta"]
map_tiles: MapTileSummary = validation["map_tiles"]
manifest = {
"bundleCreatedAt": datetime.now(UTC).isoformat(),
"routeId": route.cli_name,
"routeFullname": route.canonical_name,
"expectedSegments": validation["expected_segments"],
"downloadedStreams": [spec.display_name for spec in REQUIRED_STREAMS],
"validation": {
"isPublic": bool(route_meta.get("is_public")),
"visionSpeedLimitAutoBookmark": params.get("VisionSpeedLimitAutoBookmark", ""),
"visionSpeedLimitDetection": params.get("VisionSpeedLimitDetection", ""),
"mapsSelected": params.get("MapsSelected", ""),
"lastMapsUpdate": params.get("LastMapsUpdate", ""),
"mapdMessagesScanned": map_tiles.mapd_messages,
"offlineTileLoaded": map_tiles.tile_loaded_any,
"firstTileSegment": map_tiles.first_tile_segment,
},
"routeMeta": {
"startTime": route_meta.get("start_time"),
"endTime": route_meta.get("end_time"),
"distance": route_meta.get("distance"),
"startLat": route_meta.get("start_lat"),
"startLng": route_meta.get("start_lng"),
"endLat": route_meta.get("end_lat"),
"endLng": route_meta.get("end_lng"),
"gitBranch": route_meta.get("git_branch"),
"gitCommit": route_meta.get("git_commit"),
"platform": route_meta.get("platform"),
"version": route_meta.get("version"),
"make": route_meta.get("make"),
},
"files": [
{
"segment": record.segment,
"stream": record.stream,
"filename": record.filename,
"relativePath": record.relative_path,
"mtimeEpoch": record.mtime_epoch,
}
for record in sorted(downloaded_files, key=lambda item: (item.segment, item.stream, item.filename))
],
}
(bundle_root / "bundle_manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def zip_bundle(bundle_root: Path, zip_path: Path) -> None:
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as archive:
for path in sorted(bundle_root.rglob("*")):
if path.is_file():
archive.write(path, arcname=Path(bundle_root.name) / path.relative_to(bundle_root))
def download_bundle(
route: RouteId,
validation: dict[str, Any],
session: requests.Session,
desktop_root: Path,
timeout: float,
overwrite: bool,
) -> tuple[Path, Path]:
bundle_root, zip_path = prepare_output_paths(route, desktop_root, overwrite)
clip_root = bundle_root / "data" / "media" / "0" / "realdata"
expected_segments = validation["expected_segments"]
files_payload = validation["files_payload"]
downloaded_files: list[DownloadedFile] = []
for spec in REQUIRED_STREAMS:
urls = files_payload.get(spec.api_key, [])
for segment in range(expected_segments):
url = urls[segment]
if not isinstance(url, str) or not url:
raise RuntimeError(f"Unexpected missing {spec.display_name} URL for segment {segment} after validation.")
filename = filename_from_url(url)
segment_name = f"{route.log_id}--{segment}"
dest_path = clip_root / segment_name / filename
print(f"Downloading {segment_name}/{filename}")
mtime_epoch = fetch_stream_to_path(session, url, dest_path, timeout)
downloaded_files.append(
DownloadedFile(
segment=segment,
stream=spec.display_name,
filename=filename,
relative_path=relative_posix(dest_path, bundle_root),
mtime_epoch=mtime_epoch,
)
)
write_bundle_manifests(bundle_root, route, validation, downloaded_files)
print(f"Creating {zip_path.name}")
zip_bundle(bundle_root, zip_path)
return bundle_root, zip_path
def print_validation_summary(route: RouteId, validation: dict[str, Any]) -> None:
params = validation["params"]
map_tiles: MapTileSummary = validation["map_tiles"]
print(f"Validated {route.cli_name}")
print(f" public route: yes")
print(f" segments: {validation['expected_segments']}")
print(f" VisionSpeedLimitDetection: {params.get('VisionSpeedLimitDetection', '')}")
print(f" VisionSpeedLimitAutoBookmark: {params.get('VisionSpeedLimitAutoBookmark', '')}")
print(f" MapsSelected: {params.get('MapsSelected', '') or '(empty)'}")
print(f" offline tile loaded: {'yes' if map_tiles.tile_loaded_any else 'no'}")
def main() -> int:
args = parse_args()
route = parse_route_id(args.routeid)
session = requests.Session()
session.headers.update({"User-Agent": "starpilot-speed-limit-route-bundler/1.0"})
try:
validation = validate_route(route, session, args.timeout)
print_validation_summary(route, validation)
if args.validate_only:
return 0
bundle_root, zip_path = download_bundle(route, validation, session, args.desktop_root, args.timeout, args.overwrite)
print(f"Bundle folder: {bundle_root}")
print(f"Bundle zip: {zip_path}")
return 0
except ValidationError as exc:
print(str(exc), file=sys.stderr)
return 2
except FileExistsError as exc:
print(f"{exc}\nUse --overwrite to replace the existing Desktop bundle, or --validate-only to skip packaging.", file=sys.stderr)
return 3
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())