Files
StarPilot/tools/longitudinal/score_route_longitudinal.py
T
firestar5683 eebfd509d7 LongyLongNeck
2026-05-09 19:28:03 -05:00

442 lines
18 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import math
import os
from collections import defaultdict
from dataclasses import asdict, dataclass
from types import SimpleNamespace
import numpy as np
os.environ["DEBUG"] = "0"
os.environ.setdefault("FILEREADER_CACHE", "1")
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib import longitudinal_planner as longitudinal_planner_mod
from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import STOP_DISTANCE
from openpilot.tools.lib.logreader import LogReader
for level_name in ("info", "warning", "error", "exception", "event"):
if hasattr(cloudlog, level_name):
setattr(cloudlog, level_name, lambda *args, **kwargs: None)
NEAR_SPEED_HIGHWAY_MIN_SPEED = 22.0
NEAR_SPEED_LEAD_DELTA = 1.5
BRAKE_STAB_START = -0.35
BRAKE_STAB_END = -0.10
BRAKE_STAB_MAX_DURATION = 1.0
VISION_SLOW_LEAD_MIN_DELTA = 3.0
VISION_SLOW_LEAD_MIN_DIST = 35.0
VISION_SLOW_LEAD_MAX_DIST = 120.0
VISION_SLOW_LEAD_MIN_MODEL_PROB = 0.9
SLOW_LEAD_DECEL_TRIGGER = -0.2
REQUIRED_SERVICES = {
"carControl",
"carState",
"controlsState",
"liveParameters",
"radarState",
"selfdriveState",
"starpilotPlan",
}
@dataclass
class RouteMetrics:
route: str
replayed_samples: int = 0
near_speed_samples: int = 0
highway_follow_samples: int = 0
vision_slow_lead_samples: int = 0
brake_stab_count: int = 0
false_far_lead_brake_count: int = 0
highway_accel_std: float = 0.0
highway_rms_jerk: float = 0.0
near_speed_accel_std: float = 0.0
near_speed_rms_jerk: float = 0.0
slower_lead_event_count: int = 0
slower_lead_mean_response_delay: float | None = None
slower_lead_worst_response_delay: float | None = None
slower_lead_min_a_target: float | None = None
slower_lead_min_ttc: float | None = None
slower_lead_min_gap: float | None = None
def configure_planner_profile(profile: str) -> None:
if profile == "current":
return
if profile == "legacy_panic_bypass":
longitudinal_planner_mod.UNCERT_PANIC_MIN_CLOSING_SPEED = 0.5
longitudinal_planner_mod.UNCERT_PANIC_MIN_CLOSING_SPEED_GAIN = 0.0
longitudinal_planner_mod.UNCERT_PANIC_MAX_GAP_BUFFER_MIN = 1e6
longitudinal_planner_mod.UNCERT_PANIC_MAX_GAP_BUFFER_GAIN = 0.0
return
raise ValueError(f"Unsupported planner profile: {profile}")
def default_toggles() -> SimpleNamespace:
return SimpleNamespace(
taco_tune=False,
classic_model=False,
tinygrad_model=True,
model_version="v11",
stop_distance=float(STOP_DISTANCE),
vEgoStopping=0.5,
)
def parse_toggles(serialized: str, previous: SimpleNamespace | None) -> SimpleNamespace:
if not serialized:
return previous if previous is not None else default_toggles()
payload = vars(default_toggles())
try:
payload.update(json.loads(serialized))
except Exception:
return previous if previous is not None else default_toggles()
return SimpleNamespace(**payload)
def finite_min(current: float | None, value: float | None) -> float | None:
if value is None or not math.isfinite(value):
return current
if current is None:
return value
return min(current, value)
def finite_mean(values: list[float]) -> float | None:
if not values:
return None
return float(np.mean(values))
def finite_max(values: list[float]) -> float | None:
if not values:
return None
return float(np.max(values))
def is_near_speed_follow(v_ego: float, lead_status: bool, v_lead: float) -> bool:
return lead_status and v_ego > NEAR_SPEED_HIGHWAY_MIN_SPEED and abs(v_ego - v_lead) <= NEAR_SPEED_LEAD_DELTA
def is_highway_follow(v_ego: float, lead_status: bool) -> bool:
return lead_status and v_ego > NEAR_SPEED_HIGHWAY_MIN_SPEED
def is_vision_slow_lead(v_ego: float, lead_status: bool, lead_radar: bool, lead_prob: float, lead_dist: float, v_lead: float) -> bool:
return (
lead_status and
not lead_radar and
lead_prob >= VISION_SLOW_LEAD_MIN_MODEL_PROB and
(v_ego - v_lead) >= VISION_SLOW_LEAD_MIN_DELTA and
VISION_SLOW_LEAD_MIN_DIST <= lead_dist <= VISION_SLOW_LEAD_MAX_DIST
)
def score_route(route: str, capture_events: bool = False, max_events: int = 200) -> tuple[RouteMetrics, list[dict]]:
metrics = RouteMetrics(route=route)
planner = None
toggles = default_toggles()
state: dict[str, object] = {}
events: list[dict] = []
highway_accels: list[float] = []
highway_jerks: list[float] = []
near_speed_accels: list[float] = []
near_speed_jerks: list[float] = []
slower_response_delays: list[float] = []
active_slow_event: dict | None = None
active_brake_stab: dict | None = None
active_false_far_brake: dict | None = None
prev_plan_t: int | None = None
prev_a_target: float | None = None
for msg in LogReader(route, sort_by_time=True):
which = msg.which()
if which == "carParams" and planner is None:
planner = LongitudinalPlanner(msg.carParams)
continue
if which not in REQUIRED_SERVICES and which != "modelV2":
continue
state[which] = getattr(msg, which)
if which == "starpilotPlan":
toggles = parse_toggles(state["starpilotPlan"].starpilotToggles, toggles)
if which != "modelV2" or planner is None or not REQUIRED_SERVICES.issubset(state):
continue
planner.update(state, toggles)
metrics.replayed_samples += 1
car_state = state["carState"]
radar_state = state["radarState"]
lead = radar_state.leadOne
v_ego = float(car_state.vEgo)
lead_status = bool(lead.status)
lead_dist = float(lead.dRel) if lead_status else float("inf")
v_lead = float(lead.vLead) if lead_status else v_ego
lead_prob = float(getattr(lead, "modelProb", 0.0))
lead_radar = bool(getattr(lead, "radar", False))
a_target = float(planner.output_a_target)
mono_time = int(msg.logMonoTime)
dt = None if prev_plan_t is None else max((mono_time - prev_plan_t) / 1e9, 1e-3)
jerk = None if dt is None or prev_a_target is None else (a_target - prev_a_target) / dt
near_speed_follow = is_near_speed_follow(v_ego, lead_status, v_lead)
highway_follow = is_highway_follow(v_ego, lead_status)
vision_slow_lead = is_vision_slow_lead(v_ego, lead_status, lead_radar, lead_prob, lead_dist, v_lead)
if near_speed_follow:
metrics.near_speed_samples += 1
near_speed_accels.append(a_target)
if jerk is not None:
near_speed_jerks.append(jerk)
if highway_follow:
metrics.highway_follow_samples += 1
highway_accels.append(a_target)
if jerk is not None:
highway_jerks.append(jerk)
if vision_slow_lead:
metrics.vision_slow_lead_samples += 1
if near_speed_follow:
if active_brake_stab is None and a_target < BRAKE_STAB_START:
active_brake_stab = {
"start_t": mono_time / 1e9,
"start_mono_time": mono_time,
"min_a_target": a_target,
"start_lead_dist": lead_dist,
"start_v_ego": v_ego,
"start_v_lead": v_lead,
"source": str(planner.mpc.source),
"filter_time_factor": float(getattr(planner.mpc, "filter_time_factor", 1.0)),
}
elif active_brake_stab is not None and a_target > BRAKE_STAB_END:
duration = mono_time / 1e9 - active_brake_stab["start_t"]
if duration < BRAKE_STAB_MAX_DURATION:
metrics.brake_stab_count += 1
if capture_events and len(events) < max_events:
events.append({
"route": route,
"kind": "brake_stab",
"start_mono_time": active_brake_stab["start_mono_time"],
"duration": duration,
"min_a_target": active_brake_stab["min_a_target"],
"start_lead_dist": active_brake_stab["start_lead_dist"],
"start_v_ego": active_brake_stab["start_v_ego"],
"start_v_lead": active_brake_stab["start_v_lead"],
"source": active_brake_stab["source"],
"filter_time_factor": active_brake_stab["filter_time_factor"],
})
active_brake_stab = None
elif active_brake_stab is not None:
active_brake_stab["min_a_target"] = min(active_brake_stab["min_a_target"], a_target)
else:
active_brake_stab = None
lead_source_active = str(planner.mpc.source) in ("lead0", "lead1") or bool(getattr(state["starpilotPlan"], "trackingLead", False))
false_far_brake = (
highway_follow and
lead_source_active and
lead_status and
not lead_radar and
lead_dist > 80.0 and
0.1 < (v_ego - v_lead) < 2.0 and
a_target < -0.2
)
if false_far_brake:
if active_false_far_brake is None:
active_false_far_brake = {
"start_mono_time": mono_time,
"start_t": mono_time / 1e9,
"min_a_target": a_target,
"min_lead_dist": lead_dist,
"max_closing_speed": max(0.0, v_ego - v_lead),
"start_v_ego": v_ego,
"start_v_lead": v_lead,
"lead_prob": lead_prob,
"source": str(planner.mpc.source),
"filter_time_factor": float(getattr(planner.mpc, "filter_time_factor", 1.0)),
}
else:
active_false_far_brake["min_a_target"] = min(active_false_far_brake["min_a_target"], a_target)
active_false_far_brake["min_lead_dist"] = min(active_false_far_brake["min_lead_dist"], lead_dist)
active_false_far_brake["max_closing_speed"] = max(active_false_far_brake["max_closing_speed"], max(0.0, v_ego - v_lead))
elif active_false_far_brake is not None:
if capture_events and len(events) < max_events:
events.append({
"route": route,
"kind": "false_far_lead_brake",
"start_mono_time": active_false_far_brake["start_mono_time"],
"duration": mono_time / 1e9 - active_false_far_brake["start_t"],
"min_a_target": active_false_far_brake["min_a_target"],
"min_lead_dist": active_false_far_brake["min_lead_dist"],
"max_closing_speed": active_false_far_brake["max_closing_speed"],
"start_v_ego": active_false_far_brake["start_v_ego"],
"start_v_lead": active_false_far_brake["start_v_lead"],
"lead_prob": active_false_far_brake["lead_prob"],
"source": active_false_far_brake["source"],
"filter_time_factor": active_false_far_brake["filter_time_factor"],
})
metrics.false_far_lead_brake_count += 1
active_false_far_brake = None
if vision_slow_lead:
ttc = lead_dist / max(v_ego - v_lead, 0.1)
if active_slow_event is None:
active_slow_event = {
"start_t": mono_time / 1e9,
"response_delay": None,
"min_a_target": a_target,
"min_ttc": ttc,
"min_gap": lead_dist,
}
if active_slow_event["response_delay"] is None and a_target <= SLOW_LEAD_DECEL_TRIGGER:
active_slow_event["response_delay"] = mono_time / 1e9 - active_slow_event["start_t"]
active_slow_event["min_a_target"] = min(active_slow_event["min_a_target"], a_target)
active_slow_event["min_ttc"] = min(active_slow_event["min_ttc"], ttc)
active_slow_event["min_gap"] = min(active_slow_event["min_gap"], lead_dist)
elif active_slow_event is not None:
metrics.slower_lead_event_count += 1
if active_slow_event["response_delay"] is not None:
slower_response_delays.append(active_slow_event["response_delay"])
metrics.slower_lead_min_a_target = finite_min(metrics.slower_lead_min_a_target, active_slow_event["min_a_target"])
metrics.slower_lead_min_ttc = finite_min(metrics.slower_lead_min_ttc, active_slow_event["min_ttc"])
metrics.slower_lead_min_gap = finite_min(metrics.slower_lead_min_gap, active_slow_event["min_gap"])
active_slow_event = None
prev_plan_t = mono_time
prev_a_target = a_target
if active_slow_event is not None:
metrics.slower_lead_event_count += 1
if active_slow_event["response_delay"] is not None:
slower_response_delays.append(active_slow_event["response_delay"])
metrics.slower_lead_min_a_target = finite_min(metrics.slower_lead_min_a_target, active_slow_event["min_a_target"])
metrics.slower_lead_min_ttc = finite_min(metrics.slower_lead_min_ttc, active_slow_event["min_ttc"])
metrics.slower_lead_min_gap = finite_min(metrics.slower_lead_min_gap, active_slow_event["min_gap"])
if active_false_far_brake is not None and capture_events and len(events) < max_events:
events.append({
"route": route,
"kind": "false_far_lead_brake",
"start_mono_time": active_false_far_brake["start_mono_time"],
"duration": prev_plan_t / 1e9 - active_false_far_brake["start_t"] if prev_plan_t is not None else 0.0,
"min_a_target": active_false_far_brake["min_a_target"],
"min_lead_dist": active_false_far_brake["min_lead_dist"],
"max_closing_speed": active_false_far_brake["max_closing_speed"],
"start_v_ego": active_false_far_brake["start_v_ego"],
"start_v_lead": active_false_far_brake["start_v_lead"],
"lead_prob": active_false_far_brake["lead_prob"],
"source": active_false_far_brake["source"],
"filter_time_factor": active_false_far_brake["filter_time_factor"],
})
if active_false_far_brake is not None:
metrics.false_far_lead_brake_count += 1
if highway_accels:
metrics.highway_accel_std = float(np.std(highway_accels))
if highway_jerks:
metrics.highway_rms_jerk = float(np.sqrt(np.mean(np.square(highway_jerks))))
if near_speed_accels:
metrics.near_speed_accel_std = float(np.std(near_speed_accels))
if near_speed_jerks:
metrics.near_speed_rms_jerk = float(np.sqrt(np.mean(np.square(near_speed_jerks))))
metrics.slower_lead_mean_response_delay = finite_mean(slower_response_delays)
metrics.slower_lead_worst_response_delay = finite_max(slower_response_delays)
return metrics, events
def route_weight(route: str, priority_routes: set[str], priority_weight: float, normal_count: int, priority_count: int) -> float:
if priority_count == 0 or normal_count == 0:
return 1.0 / max(priority_count + normal_count, 1)
if route in priority_routes and priority_count > 0:
return priority_weight / priority_count
if route not in priority_routes and normal_count > 0:
return (1.0 - priority_weight) / normal_count
return 0.0
def aggregate_summary(results: list[RouteMetrics], priority_routes: set[str], priority_weight: float) -> dict:
priority_count = sum(1 for result in results if result.route in priority_routes)
normal_count = len(results) - priority_count
weighted = defaultdict(float)
for result in results:
weight = route_weight(result.route, priority_routes, priority_weight, normal_count, priority_count)
weighted["route_count"] += weight
weighted["brake_stab_count"] += weight * result.brake_stab_count
weighted["false_far_lead_brake_count"] += weight * result.false_far_lead_brake_count
weighted["highway_accel_std"] += weight * result.highway_accel_std
weighted["highway_rms_jerk"] += weight * result.highway_rms_jerk
weighted["near_speed_accel_std"] += weight * result.near_speed_accel_std
weighted["near_speed_rms_jerk"] += weight * result.near_speed_rms_jerk
if result.slower_lead_mean_response_delay is not None:
weighted["slower_lead_mean_response_delay"] += weight * result.slower_lead_mean_response_delay
if result.slower_lead_worst_response_delay is not None:
weighted["slower_lead_worst_response_delay"] += weight * result.slower_lead_worst_response_delay
return dict(weighted)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Replay the longitudinal planner directly on local route logs and score planner behavior. Route identifiers are runtime-only inputs."
)
parser.add_argument("routes", nargs="+", help="Route or segment identifiers to replay")
parser.add_argument("--priority-route", action="append", default=[],
help="Route identifier to weight in the priority subset")
parser.add_argument("--priority-weight", type=float, default=0.60,
help="Total weight assigned to the priority subset")
parser.add_argument("--planner-profile", choices=("current", "legacy_panic_bypass"), default="current",
help="Planner behavior profile used for local comparison runs")
parser.add_argument("--json-out", type=str,
help="Optional local output path for JSON results")
parser.add_argument("--events-out", type=str,
help="Optional local output path for captured debug events")
parser.add_argument("--max-events-per-route", type=int, default=200,
help="Maximum number of debug events to capture per route")
return parser.parse_args()
def main() -> None:
args = parse_args()
configure_planner_profile(args.planner_profile)
scored = [score_route(route, capture_events=bool(args.events_out), max_events=args.max_events_per_route) for route in args.routes]
results = [result for result, _ in scored]
events = {route: route_events for (result, route_events), route in zip(scored, args.routes)}
payload = {
"summary": aggregate_summary(results, set(args.priority_route), args.priority_weight),
"routes": [asdict(result) for result in results],
}
if args.json_out:
out_dir = os.path.dirname(args.json_out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, sort_keys=True)
if args.events_out:
out_dir = os.path.dirname(args.events_out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.events_out, "w", encoding="utf-8") as f:
json.dump(events, f, indent=2, sort_keys=True)
print(json.dumps(payload, indent=2, sort_keys=True))
if __name__ == "__main__":
main()