Files
firestar5683 10cab1ceff SLC
2026-04-04 10:58:40 -05:00

285 lines
7.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import csv
import json
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_WORKSPACE = REPO_ROOT / ".tmp" / "speed_limit_training"
DEFAULT_DEBUG_BASE = Path("/data/media/0/vision_speed_limit_debug")
REPO_ASSET_DIR = REPO_ROOT / "starpilot" / "assets" / "vision_models"
DEFAULT_LOCAL_CLIP_ROOT = REPO_ROOT / ".tmp" / "live_route_clips" / "bookmark_windows" / "data" / "media" / "0" / "realdata"
DEFAULT_LOCAL_QLOG_MTIMES = REPO_ROOT / ".tmp" / "live_routes_meta" / "qlog_mtimes.txt"
DEFAULT_LOCAL_FILES_MANIFEST = REPO_ROOT / ".tmp" / "live_routes_meta" / "files.txt"
DEFAULT_LOCAL_SESSION_ROUTE_MAP = REPO_ROOT / ".tmp" / "live_routes_meta" / "session_route_map.json"
DEFAULT_EXTERNAL_ROOT = Path("/Volumes/T5/starpilot_speed_limit")
DETECTOR_CLASS_NAMES = (
"regulatory_speed_limit",
"advisory_speed_limit",
"school_zone_speed_limit",
)
DEFAULT_SPEED_VALUES = (15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75)
DETECTOR_EXPORT_NAME = "speed_limit_us_detector.onnx"
CLASSIFIER_EXPORT_NAME = "speed_limit_us_value_classifier.onnx"
def resolve_workspace(path: str | Path | None) -> Path:
return Path(path).expanduser().resolve() if path else DEFAULT_WORKSPACE
def preferred_external_root() -> Path | None:
return DEFAULT_EXTERNAL_ROOT if DEFAULT_EXTERNAL_ROOT.is_dir() else None
def preferred_analysis_root() -> Path | None:
external_root = preferred_external_root()
if external_root is None:
return None
analysis_root = external_root / "analysis"
return analysis_root if analysis_root.is_dir() else external_root
def preferred_clip_root() -> Path:
external_root = preferred_analysis_root()
if external_root is not None:
return external_root / "live_route_clips" / "bookmark_windows" / "data" / "media" / "0" / "realdata"
return DEFAULT_LOCAL_CLIP_ROOT
def preferred_qlog_mtimes_path() -> Path:
external_root = preferred_analysis_root()
if external_root is not None:
return external_root / "live_routes_meta" / "qlog_mtimes.txt"
return DEFAULT_LOCAL_QLOG_MTIMES
def preferred_files_manifest_path() -> Path:
external_root = preferred_analysis_root()
if external_root is not None:
return external_root / "live_routes_meta" / "files.txt"
return DEFAULT_LOCAL_FILES_MANIFEST
def preferred_session_route_map_path() -> Path:
external_root = preferred_analysis_root()
if external_root is not None:
return external_root / "live_routes_meta" / "session_route_map.json"
return DEFAULT_LOCAL_SESSION_ROUTE_MAP
def load_session_route_map(path: str | Path | None = None) -> dict[str, str]:
route_map_path = Path(path).expanduser().resolve() if path else preferred_session_route_map_path()
if not route_map_path.is_file():
return {}
data = json.loads(route_map_path.read_text(encoding="utf-8"))
return {str(key): str(value) for key, value in data.items() if key and value}
def default_raw_root(workspace: str | Path | None = None) -> Path:
resolved = resolve_workspace(workspace)
if resolved.parent.name == "workspace":
return resolved.parent.parent / "raw"
return resolved / "raw"
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def write_text(path: Path, text: str, force: bool = False) -> None:
if path.exists() and not force:
return
ensure_dir(path.parent)
path.write_text(text, encoding="utf-8")
def write_csv_header(path: Path, fieldnames: list[str], force: bool = False) -> None:
if path.exists() and not force:
return
ensure_dir(path.parent)
with path.open("w", encoding="utf-8", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
def read_jsonl(path: Path) -> list[dict]:
records: list[dict] = []
with path.open("r", encoding="utf-8") as jsonl_file:
for line in jsonl_file:
line = line.strip()
if line:
records.append(json.loads(line))
return records
def latest_debug_sessions(debug_base: Path, count: int = 1) -> list[Path]:
if not debug_base.is_dir():
return []
sessions = sorted((path for path in debug_base.iterdir() if path.is_dir()), reverse=True)
return sessions[:max(count, 0)]
def detector_dataset_yaml(workspace: Path) -> str:
detector_root = workspace / "detector"
lines = [
f"path: {detector_root}",
"train: images/train",
"val: images/val",
"names:",
]
for index, class_name in enumerate(DETECTOR_CLASS_NAMES):
lines.append(f" {index}: {class_name}")
return "\n".join(lines) + "\n"
def workspace_readme(speed_values: tuple[int, ...]) -> str:
supported_values = ", ".join(str(value) for value in speed_values)
return f"""# Speed Limit Vision Training Workspace
This workspace is generated by `scripts/speed_limit_vision/init_workspace.py`.
Directory layout:
- `detector/images/train` and `detector/images/val`: detector training images
- `detector/labels/train` and `detector/labels/val`: YOLO detector labels
- `classifier/value_labels.csv`: value labels used to crop detector boxes into classifier folders
- `classifier/train` and `classifier/val`: classifier-ready crop folders
- `review/images`: imported snapshots from live debug sessions
- `review/bookmarks.csv`: bookmark/publish/candidate manifest built from debug sessions
- `review/leadins/frames` and `review/leadins/contact_sheets`: sampled frames from 5-second pre-bookmark route windows
- `review/bookmark_leadins.csv`: manifest describing sampled pre-bookmark review frames
- `exports`: exported ONNX models
- `runs`: training outputs
Suggested detector classes:
- `regulatory_speed_limit`
- `advisory_speed_limit`
- `school_zone_speed_limit`
Suggested classifier values:
- `{supported_values}`
Generated manifests:
- `manifests/raw_sources.csv`: raw public/comma source provenance
- `manifests/public_detector_samples.csv`: imported detector samples and sign metadata
- `manifests/public_classifier_samples.csv`: imported classifier-ready value samples
"""
BOOKMARK_MANIFEST_FIELDS = [
"record_key",
"source_name",
"source_region",
"source_device",
"source_driver",
"session_id",
"event_index",
"event",
"session_seconds",
"wall_time",
"road_name",
"stream",
"status",
"candidate_speed_limit_mph",
"candidate_confidence",
"speed_limit_mph",
"confidence",
"source_confidence",
"source_event",
"published_speed_limit_mph",
"published_confidence",
"map_source",
"map_current_speed_limit_mph",
"map_next_speed_limit_mph",
"map_next_speed_limit_distance_m",
"map_expected_speed_limit_mph",
"map_relation",
"previous_map_speed_limit_mph",
"review_bucket",
"bookmark_count",
"snapshot_path",
"source_session_path",
]
BOOKMARK_LEADIN_MANIFEST_FIELDS = [
"record_key",
"source_name",
"source_region",
"source_device",
"source_driver",
"session_id",
"bookmark_number",
"route",
"segment",
"segment_offset_s",
"leadin_start_s",
"sample_offset_s",
"window_result",
"published_values",
"candidate_values",
"frame_path",
"contact_sheet_path",
"source_video_path",
]
VALUE_LABEL_FIELDS = [
"image_path",
"split",
"speed_limit_mph",
"bbox_index",
"padding",
"label_path",
]
RAW_SOURCE_FIELDS = [
"source_name",
"source_version",
"source_license",
"source_type",
"raw_path",
"notes",
]
PUBLIC_DETECTOR_SAMPLE_FIELDS = [
"record_key",
"source_name",
"split",
"image_path",
"label_path",
"annotation_path",
"source_image_id",
"class_name",
"speed_limit_mph",
"sign_code",
"bbox_left",
"bbox_top",
"bbox_right",
"bbox_bottom",
]
PUBLIC_CLASSIFIER_SAMPLE_FIELDS = [
"record_key",
"source_name",
"split",
"image_path",
"speed_limit_mph",
"bbox_index",
"label_path",
"source_image_id",
"sign_code",
]