mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-14 19:44:37 +08:00
285 lines
7.7 KiB
Python
285 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
DEFAULT_WORKSPACE = REPO_ROOT / ".tmp" / "speed_limit_training"
|
|
DEFAULT_DEBUG_BASE = Path("/data/media/0/vision_speed_limit_debug")
|
|
REPO_ASSET_DIR = REPO_ROOT / "starpilot" / "assets" / "vision_models"
|
|
DEFAULT_LOCAL_CLIP_ROOT = REPO_ROOT / ".tmp" / "live_route_clips" / "bookmark_windows" / "data" / "media" / "0" / "realdata"
|
|
DEFAULT_LOCAL_QLOG_MTIMES = REPO_ROOT / ".tmp" / "live_routes_meta" / "qlog_mtimes.txt"
|
|
DEFAULT_LOCAL_FILES_MANIFEST = REPO_ROOT / ".tmp" / "live_routes_meta" / "files.txt"
|
|
DEFAULT_LOCAL_SESSION_ROUTE_MAP = REPO_ROOT / ".tmp" / "live_routes_meta" / "session_route_map.json"
|
|
DEFAULT_EXTERNAL_ROOT = Path("/Volumes/T5/starpilot_speed_limit")
|
|
|
|
DETECTOR_CLASS_NAMES = (
|
|
"regulatory_speed_limit",
|
|
"advisory_speed_limit",
|
|
"school_zone_speed_limit",
|
|
)
|
|
DEFAULT_SPEED_VALUES = (15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75)
|
|
|
|
DETECTOR_EXPORT_NAME = "speed_limit_us_detector.onnx"
|
|
CLASSIFIER_EXPORT_NAME = "speed_limit_us_value_classifier.onnx"
|
|
|
|
|
|
def resolve_workspace(path: str | Path | None) -> Path:
|
|
return Path(path).expanduser().resolve() if path else DEFAULT_WORKSPACE
|
|
|
|
|
|
def preferred_external_root() -> Path | None:
|
|
return DEFAULT_EXTERNAL_ROOT if DEFAULT_EXTERNAL_ROOT.is_dir() else None
|
|
|
|
|
|
def preferred_analysis_root() -> Path | None:
|
|
external_root = preferred_external_root()
|
|
if external_root is None:
|
|
return None
|
|
analysis_root = external_root / "analysis"
|
|
return analysis_root if analysis_root.is_dir() else external_root
|
|
|
|
|
|
def preferred_clip_root() -> Path:
|
|
external_root = preferred_analysis_root()
|
|
if external_root is not None:
|
|
return external_root / "live_route_clips" / "bookmark_windows" / "data" / "media" / "0" / "realdata"
|
|
return DEFAULT_LOCAL_CLIP_ROOT
|
|
|
|
|
|
def preferred_qlog_mtimes_path() -> Path:
|
|
external_root = preferred_analysis_root()
|
|
if external_root is not None:
|
|
return external_root / "live_routes_meta" / "qlog_mtimes.txt"
|
|
return DEFAULT_LOCAL_QLOG_MTIMES
|
|
|
|
|
|
def preferred_files_manifest_path() -> Path:
|
|
external_root = preferred_analysis_root()
|
|
if external_root is not None:
|
|
return external_root / "live_routes_meta" / "files.txt"
|
|
return DEFAULT_LOCAL_FILES_MANIFEST
|
|
|
|
|
|
def preferred_session_route_map_path() -> Path:
|
|
external_root = preferred_analysis_root()
|
|
if external_root is not None:
|
|
return external_root / "live_routes_meta" / "session_route_map.json"
|
|
return DEFAULT_LOCAL_SESSION_ROUTE_MAP
|
|
|
|
|
|
def load_session_route_map(path: str | Path | None = None) -> dict[str, str]:
|
|
route_map_path = Path(path).expanduser().resolve() if path else preferred_session_route_map_path()
|
|
if not route_map_path.is_file():
|
|
return {}
|
|
data = json.loads(route_map_path.read_text(encoding="utf-8"))
|
|
return {str(key): str(value) for key, value in data.items() if key and value}
|
|
|
|
|
|
def default_raw_root(workspace: str | Path | None = None) -> Path:
|
|
resolved = resolve_workspace(workspace)
|
|
if resolved.parent.name == "workspace":
|
|
return resolved.parent.parent / "raw"
|
|
return resolved / "raw"
|
|
|
|
|
|
def ensure_dir(path: Path) -> Path:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def write_text(path: Path, text: str, force: bool = False) -> None:
|
|
if path.exists() and not force:
|
|
return
|
|
ensure_dir(path.parent)
|
|
path.write_text(text, encoding="utf-8")
|
|
|
|
|
|
def write_csv_header(path: Path, fieldnames: list[str], force: bool = False) -> None:
|
|
if path.exists() and not force:
|
|
return
|
|
ensure_dir(path.parent)
|
|
with path.open("w", encoding="utf-8", newline="") as csv_file:
|
|
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
|
|
def read_jsonl(path: Path) -> list[dict]:
|
|
records: list[dict] = []
|
|
with path.open("r", encoding="utf-8") as jsonl_file:
|
|
for line in jsonl_file:
|
|
line = line.strip()
|
|
if line:
|
|
records.append(json.loads(line))
|
|
return records
|
|
|
|
|
|
def latest_debug_sessions(debug_base: Path, count: int = 1) -> list[Path]:
|
|
if not debug_base.is_dir():
|
|
return []
|
|
sessions = sorted((path for path in debug_base.iterdir() if path.is_dir()), reverse=True)
|
|
return sessions[:max(count, 0)]
|
|
|
|
|
|
def detector_dataset_yaml(workspace: Path) -> str:
|
|
detector_root = workspace / "detector"
|
|
lines = [
|
|
f"path: {detector_root}",
|
|
"train: images/train",
|
|
"val: images/val",
|
|
"names:",
|
|
]
|
|
for index, class_name in enumerate(DETECTOR_CLASS_NAMES):
|
|
lines.append(f" {index}: {class_name}")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def workspace_readme(speed_values: tuple[int, ...]) -> str:
|
|
supported_values = ", ".join(str(value) for value in speed_values)
|
|
return f"""# Speed Limit Vision Training Workspace
|
|
|
|
This workspace is generated by `scripts/speed_limit_vision/init_workspace.py`.
|
|
|
|
Directory layout:
|
|
|
|
- `detector/images/train` and `detector/images/val`: detector training images
|
|
- `detector/labels/train` and `detector/labels/val`: YOLO detector labels
|
|
- `classifier/value_labels.csv`: value labels used to crop detector boxes into classifier folders
|
|
- `classifier/train` and `classifier/val`: classifier-ready crop folders
|
|
- `review/images`: imported snapshots from live debug sessions
|
|
- `review/bookmarks.csv`: bookmark/publish/candidate manifest built from debug sessions
|
|
- `review/leadins/frames` and `review/leadins/contact_sheets`: sampled frames from 5-second pre-bookmark route windows
|
|
- `review/bookmark_leadins.csv`: manifest describing sampled pre-bookmark review frames
|
|
- `exports`: exported ONNX models
|
|
- `runs`: training outputs
|
|
|
|
Suggested detector classes:
|
|
|
|
- `regulatory_speed_limit`
|
|
- `advisory_speed_limit`
|
|
- `school_zone_speed_limit`
|
|
|
|
Suggested classifier values:
|
|
|
|
- `{supported_values}`
|
|
|
|
Generated manifests:
|
|
|
|
- `manifests/raw_sources.csv`: raw public/comma source provenance
|
|
- `manifests/public_detector_samples.csv`: imported detector samples and sign metadata
|
|
- `manifests/public_classifier_samples.csv`: imported classifier-ready value samples
|
|
"""
|
|
|
|
|
|
BOOKMARK_MANIFEST_FIELDS = [
|
|
"record_key",
|
|
"source_name",
|
|
"source_region",
|
|
"source_device",
|
|
"source_driver",
|
|
"session_id",
|
|
"event_index",
|
|
"event",
|
|
"session_seconds",
|
|
"wall_time",
|
|
"road_name",
|
|
"stream",
|
|
"status",
|
|
"candidate_speed_limit_mph",
|
|
"candidate_confidence",
|
|
"speed_limit_mph",
|
|
"confidence",
|
|
"source_confidence",
|
|
"source_event",
|
|
"published_speed_limit_mph",
|
|
"published_confidence",
|
|
"map_source",
|
|
"map_current_speed_limit_mph",
|
|
"map_next_speed_limit_mph",
|
|
"map_next_speed_limit_distance_m",
|
|
"map_expected_speed_limit_mph",
|
|
"map_relation",
|
|
"previous_map_speed_limit_mph",
|
|
"review_bucket",
|
|
"bookmark_count",
|
|
"snapshot_path",
|
|
"source_session_path",
|
|
]
|
|
|
|
|
|
BOOKMARK_LEADIN_MANIFEST_FIELDS = [
|
|
"record_key",
|
|
"source_name",
|
|
"source_region",
|
|
"source_device",
|
|
"source_driver",
|
|
"session_id",
|
|
"bookmark_number",
|
|
"route",
|
|
"segment",
|
|
"segment_offset_s",
|
|
"leadin_start_s",
|
|
"sample_offset_s",
|
|
"window_result",
|
|
"published_values",
|
|
"candidate_values",
|
|
"frame_path",
|
|
"contact_sheet_path",
|
|
"source_video_path",
|
|
]
|
|
|
|
|
|
VALUE_LABEL_FIELDS = [
|
|
"image_path",
|
|
"split",
|
|
"speed_limit_mph",
|
|
"bbox_index",
|
|
"padding",
|
|
"label_path",
|
|
]
|
|
|
|
|
|
RAW_SOURCE_FIELDS = [
|
|
"source_name",
|
|
"source_version",
|
|
"source_license",
|
|
"source_type",
|
|
"raw_path",
|
|
"notes",
|
|
]
|
|
|
|
|
|
PUBLIC_DETECTOR_SAMPLE_FIELDS = [
|
|
"record_key",
|
|
"source_name",
|
|
"split",
|
|
"image_path",
|
|
"label_path",
|
|
"annotation_path",
|
|
"source_image_id",
|
|
"class_name",
|
|
"speed_limit_mph",
|
|
"sign_code",
|
|
"bbox_left",
|
|
"bbox_top",
|
|
"bbox_right",
|
|
"bbox_bottom",
|
|
]
|
|
|
|
|
|
PUBLIC_CLASSIFIER_SAMPLE_FIELDS = [
|
|
"record_key",
|
|
"source_name",
|
|
"split",
|
|
"image_path",
|
|
"speed_limit_mph",
|
|
"bbox_index",
|
|
"label_path",
|
|
"source_image_id",
|
|
"sign_code",
|
|
]
|