Lateral maneuver report (#37562)

* lateral report

* mutually exclude buttons

* gating

* set maneuver

* add timer

* timer text

* fix plot

* use curvature

* more curves

* fix gating

* rm delay

* highway speed only

* msg

* add sine

* add step-down

* use relative

* text

* stabilize

* tuning

* windup

* text

* winddown

* no windup

* tuning

* more tuning

* more

* formatting

* test faster

* extend sine

* report crossings

* add readme

* clean report

* fix lint

* gating

* fix

* straighter

* compensate roll

* rm abs roll

* len

* Revert "rm abs roll"

This reverts commit a22d6bb136f90d2bf997e6b9aeee2f784398ef42.

* Revert "compensate roll"

This reverts commit dfda52119cc4a2e29ac2854b9154c08459086fea.

* print actuators

* show curve and roll

* tune roll

* text

* slower

* timer

* too much banked streets in US

* readme

* filter incomplete

* plot jerk

* plot angle jerk

* lil edits

* fix lint

* apply suggestions

* better table

* apply comments

* clean

* shane comments

* deflicker

---------

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
Daniel Koepping
2026-03-27 13:31:00 -07:00
committed by GitHub
parent b706673e1c
commit 6b94c47c6a
14 changed files with 577 additions and 23 deletions

View File

@@ -88,6 +88,7 @@ struct OnroadEvent @0xc4fa6047f024e718 {
lowMemory @51;
stockAeb @52;
stockLkas @98;
lateralManeuver @99;
ldw @53;
carUnrecognized @54;
invalidLkasSetting @55;
@@ -1241,6 +1242,10 @@ struct DriverAssistance {
# FCW, AEB, etc. will go here
}
struct LateralManeuverPlan {
desiredCurvature @0 :Float32; # 1/m
}
struct LongitudinalPlan @0xe00b5b3eba12876c {
modelMonoTime @9 :UInt64;
hasLead @7 :Bool;
@@ -2612,6 +2617,8 @@ struct Event {
bookmarkButton @148 :UserBookmark;
audioFeedback @149 :AudioFeedback;
lateralManeuverPlan @150 :LateralManeuverPlan;
# *********** debug ***********
testJoystick @52 :Joystick;
roadEncodeData @86 :EncodeData;

View File

@@ -49,6 +49,7 @@ _services: dict[str, tuple] = {
"carControl": (True, 100., 10),
"carOutput": (True, 100., 10),
"longitudinalPlan": (True, 20., 10),
"lateralManeuverPlan": (True, 20.),
"driverAssistance": (True, 20., 20),
"procLog": (True, 0.5, 15, QueueSize.BIG),
"gpsLocationExternal": (True, 10., 10),

View File

@@ -82,6 +82,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
{"LongitudinalManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
{"LongitudinalPersonality", {PERSISTENT, INT, std::to_string(static_cast<int>(cereal::LongitudinalPersonality::STANDARD))}},
{"NetworkMetered", {PERSISTENT, BOOL}},

View File

@@ -37,7 +37,7 @@ class Controls:
self.CI = interfaces[self.CP.carFingerprint](self.CP)
self.sm = messaging.SubMaster(['liveDelay', 'liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
'liveCalibration', 'livePose', 'longitudinalPlan', 'lateralManeuverPlan', 'carState', 'carOutput',
'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
self.pm = messaging.PubMaster(['carControl', 'controlsState'])
@@ -116,7 +116,10 @@ class Controls:
# Steering PID loop and lateral MPC
# Reset desired curvature to current to avoid violating the limits on engage
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
if self.sm.valid['lateralManeuverPlan']:
new_desired_curvature = self.sm['lateralManeuverPlan'].desiredCurvature if CC.latActive else self.curvature
else:
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
lat_delay = self.sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS

View File

@@ -409,6 +409,11 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
"Ensure road ahead is clear"),
},
EventName.lateralManeuver: {
ET.WARNING: longitudinal_maneuver_alert,
ET.PERMANENT: NormalPermanentAlert("Lateral Maneuver Mode"),
},
EventName.selfdriveInitializing: {
ET.NO_ENTRY: NoEntryAlert("System Initializing"),
},

View File

@@ -74,7 +74,7 @@ class SelfdriveD:
# TODO: de-couple selfdrived with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + self.gps_packets + ['alertDebug']
ignore = self.sensor_packets + self.gps_packets + ['alertDebug', 'lateralManeuverPlan']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
@@ -83,7 +83,8 @@ class SelfdriveD:
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'livePose', 'liveDelay',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userBookmark', 'audioFeedback'] + \
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userBookmark', 'audioFeedback',
'lateralManeuverPlan'] + \
self.camera_packets + self.sensor_packets + self.gps_packets,
ignore_alive=ignore, ignore_avg_freq=ignore,
ignore_valid=ignore, frequency=int(1/DT_CTRL))
@@ -148,7 +149,10 @@ class SelfdriveD:
self.events.add(EventName.joystickDebug)
self.startup_event = None
if self.sm.recv_frame['alertDebug'] > 0:
if self.sm.recv_frame['lateralManeuverPlan'] > 0:
self.events.add(EventName.lateralManeuver)
self.startup_event = None
elif self.sm.recv_frame['alertDebug'] > 0:
self.events.add(EventName.longitudinalManeuver)
self.startup_event = None

View File

@@ -67,6 +67,13 @@ class DeveloperLayout(Widget):
callback=self._on_long_maneuver_mode,
)
self._lat_maneuver_toggle = toggle_item(
lambda: tr("Lateral Maneuver Mode"),
description="",
initial_state=self._params.get_bool("LateralManeuverMode"),
callback=self._on_lat_maneuver_mode,
)
self._alpha_long_toggle = toggle_item(
lambda: tr("openpilot Longitudinal Control (Alpha)"),
description=lambda: tr(DESCRIPTIONS["alpha_longitudinal"]),
@@ -89,6 +96,7 @@ class DeveloperLayout(Widget):
self._ssh_keys,
self._joystick_toggle,
self._long_maneuver_toggle,
self._lat_maneuver_toggle,
self._alpha_long_toggle,
self._ui_debug_toggle,
], line_separator=True, spacing=0)
@@ -109,7 +117,7 @@ class DeveloperLayout(Widget):
# Hide non-release toggles on release builds
# TODO: we can do an onroad cycle, but alpha long toggle requires a deinit function to re-enable radar and not fault
for item in (self._joystick_toggle, self._long_maneuver_toggle, self._alpha_long_toggle):
for item in (self._joystick_toggle, self._long_maneuver_toggle, self._lat_maneuver_toggle, self._alpha_long_toggle):
item.set_visible(not self._is_release)
# CP gating
@@ -126,8 +134,12 @@ class DeveloperLayout(Widget):
if not long_man_enabled:
self._long_maneuver_toggle.action_item.set_state(False)
self._params.put_bool("LongitudinalManeuverMode", False)
lat_man_enabled = ui_state.is_offroad()
self._lat_maneuver_toggle.action_item.set_enabled(lat_man_enabled)
else:
self._long_maneuver_toggle.action_item.set_enabled(False)
self._lat_maneuver_toggle.action_item.set_enabled(False)
self._alpha_long_toggle.set_visible(False)
# TODO: make a param control list item so we don't need to manage internal state as much here
@@ -137,6 +149,7 @@ class DeveloperLayout(Widget):
("SshEnabled", self._ssh_toggle),
("JoystickDebugMode", self._joystick_toggle),
("LongitudinalManeuverMode", self._long_maneuver_toggle),
("LateralManeuverMode", self._lat_maneuver_toggle),
("AlphaLongitudinalEnabled", self._alpha_long_toggle),
("ShowDebugInfo", self._ui_debug_toggle),
):
@@ -157,11 +170,23 @@ class DeveloperLayout(Widget):
self._params.put_bool("JoystickDebugMode", state)
self._params.put_bool("LongitudinalManeuverMode", False)
self._long_maneuver_toggle.action_item.set_state(False)
self._params.put_bool("LateralManeuverMode", False)
self._lat_maneuver_toggle.action_item.set_state(False)
def _on_long_maneuver_mode(self, state: bool):
self._params.put_bool("LongitudinalManeuverMode", state)
self._params.put_bool("JoystickDebugMode", False)
self._joystick_toggle.action_item.set_state(False)
self._params.put_bool("LateralManeuverMode", False)
self._lat_maneuver_toggle.action_item.set_state(False)
def _on_lat_maneuver_mode(self, state: bool):
self._params.put_bool("LateralManeuverMode", state)
self._params.put_bool("ExperimentalMode", False)
self._params.put_bool("JoystickDebugMode", False)
self._joystick_toggle.action_item.set_state(False)
self._params.put_bool("LongitudinalManeuverMode", False)
self._long_maneuver_toggle.action_item.set_state(False)
def _on_alpha_long_enabled(self, state: bool):
if state:

View File

@@ -55,6 +55,9 @@ class DeveloperLayoutMici(NavScroller):
self._long_maneuver_toggle = BigToggle("longitudinal maneuver mode",
initial_state=ui_state.params.get_bool("LongitudinalManeuverMode"),
toggle_callback=self._on_long_maneuver_mode)
self._lat_maneuver_toggle = BigToggle("lateral maneuver mode",
initial_state=ui_state.params.get_bool("LateralManeuverMode"),
toggle_callback=self._on_lat_maneuver_mode)
self._alpha_long_toggle = BigToggle("alpha longitudinal",
initial_state=ui_state.params.get_bool("AlphaLongitudinalEnabled"),
toggle_callback=self._on_alpha_long_enabled)
@@ -68,6 +71,7 @@ class DeveloperLayoutMici(NavScroller):
self._ssh_keys_btn,
self._joystick_toggle,
self._long_maneuver_toggle,
self._lat_maneuver_toggle,
self._alpha_long_toggle,
self._debug_mode_toggle,
])
@@ -78,12 +82,13 @@ class DeveloperLayoutMici(NavScroller):
("SshEnabled", self._ssh_toggle),
("JoystickDebugMode", self._joystick_toggle),
("LongitudinalManeuverMode", self._long_maneuver_toggle),
("LateralManeuverMode", self._lat_maneuver_toggle),
("AlphaLongitudinalEnabled", self._alpha_long_toggle),
("ShowDebugInfo", self._debug_mode_toggle),
)
onroad_blocked_toggles = (self._adb_toggle, self._joystick_toggle)
release_blocked_toggles = (self._joystick_toggle, self._long_maneuver_toggle, self._alpha_long_toggle)
engaged_blocked_toggles = (self._long_maneuver_toggle, self._alpha_long_toggle)
release_blocked_toggles = (self._joystick_toggle, self._long_maneuver_toggle, self._lat_maneuver_toggle, self._alpha_long_toggle)
engaged_blocked_toggles = (self._long_maneuver_toggle, self._lat_maneuver_toggle, self._alpha_long_toggle)
# Hide non-release toggles on release builds
for item in release_blocked_toggles:
@@ -129,8 +134,12 @@ class DeveloperLayoutMici(NavScroller):
if not long_man_enabled:
self._long_maneuver_toggle.set_checked(False)
ui_state.params.put_bool("LongitudinalManeuverMode", False)
lat_man_enabled = ui_state.is_offroad()
self._lat_maneuver_toggle.set_enabled(lat_man_enabled)
else:
self._long_maneuver_toggle.set_enabled(False)
self._lat_maneuver_toggle.set_enabled(False)
self._alpha_long_toggle.set_visible(False)
# Refresh toggles from params to mirror external changes
@@ -141,11 +150,24 @@ class DeveloperLayoutMici(NavScroller):
ui_state.params.put_bool("JoystickDebugMode", state)
ui_state.params.put_bool("LongitudinalManeuverMode", False)
self._long_maneuver_toggle.set_checked(False)
ui_state.params.put_bool("LateralManeuverMode", False)
self._lat_maneuver_toggle.set_checked(False)
def _on_long_maneuver_mode(self, state: bool):
ui_state.params.put_bool("LongitudinalManeuverMode", state)
ui_state.params.put_bool("JoystickDebugMode", False)
self._joystick_toggle.set_checked(False)
ui_state.params.put_bool("LateralManeuverMode", False)
self._lat_maneuver_toggle.set_checked(False)
restart_needed_callback(state)
def _on_lat_maneuver_mode(self, state: bool):
ui_state.params.put_bool("LateralManeuverMode", state)
ui_state.params.put_bool("ExperimentalMode", False)
ui_state.params.put_bool("JoystickDebugMode", False)
self._joystick_toggle.set_checked(False)
ui_state.params.put_bool("LongitudinalManeuverMode", False)
self._long_maneuver_toggle.set_checked(False)
restart_needed_callback(state)
def _on_alpha_long_enabled(self, state: bool):

View File

@@ -40,6 +40,9 @@ def not_joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
def long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("LongitudinalManeuverMode")
def lat_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("LateralManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
@@ -100,6 +103,7 @@ procs = [
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("lateral_maneuversd", "tools.lateral_maneuvers.lateral_maneuversd", lat_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),

1
tools/lateral_maneuvers/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/lateral_reports/

View File

@@ -0,0 +1,42 @@
# Lateral Maneuvers Testing Tool
> [!WARNING]
> Use caution when using this tool.
Test your vehicle's lateral control tuning with this tool. The tool will test the vehicle's ability to follow a few lateral maneuvers and includes a tool to generate a report from the route.
## Instructions
1. Check out a development branch such as `master` on your comma device.
2. The full maneuver suite runs at 20 and 30 mph.
3. Enable "Lateral Maneuver Mode" in Settings > Developer on the device while offroad. Alternatively, set the parameter manually:
```sh
echo -n 1 > /data/params/d/LateralManeuverMode
```
4. Turn your vehicle back on. You will see "Lateral Maneuver Mode".
5. Ensure the area ahead is clear, as openpilot will command lateral acceleration steps in this mode. Once you are ready, set ACC manually to the target speed shown on screen and let openpilot stabilize lateral. After 1 seconds of steady straight driving, the maneuver will begin automatically. openpilot lateral control stays engaged between maneuvers normally while waiting for the next maneuver's readiness conditions. The maneuver will be aborted and repeated if speed is out of range, steering is touched or openpilot disengages.
6. When the testing is complete, you'll see an alert that says "Maneuvers Finished." Complete the route by pulling over and turning off the vehicle.
7. Visit https://connect.comma.ai and locate the route(s). They will stand out with lots of orange intervals in their timeline. Ensure "All logs" show as "uploaded."
![image](https://github.com/user-attachments/assets/cfe4c6d9-752f-4b24-b421-4b90a01933dc)
8. Gather the route ID and then run the report generator. The file will be exported to the same directory:
```sh
$ python tools/lateral_maneuvers/generate_report.py 98395b7c5b27882e/000001cc--5a73bde686
processing report for KIA_EV6
plotting maneuver: step right 20mph, runs: 3
plotting maneuver: step left 20mph, runs: 3
plotting maneuver: sine 0.5Hz 20mph, runs: 3
plotting maneuver: step right 30mph, runs: 3
Opening report: /home/batman/openpilot/tools/lateral_maneuvers/lateral_reports/KIA_EV6_98395b7c5b27882e_000001cc--5a73bde686.html
```
You can reach out on [Discord](https://discord.comma.ai) if you have any questions about these instructions or the tool itself.

View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
import argparse
import base64
import io
import math
import numpy as np
import os
import webbrowser
from collections import defaultdict
from pathlib import Path
import matplotlib.pyplot as plt
from openpilot.common.utils import tabulate
from cereal import car
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.controls.lib.latcontrol_torque import LP_FILTER_CUTOFF_HZ
from openpilot.tools.lib.logreader import LogReader
from openpilot.system.hardware.hw import Paths
from openpilot.common.constants import CV
from openpilot.tools.longitudinal_maneuvers.generate_report import format_car_params
def lat_accel(curvature, v):
return curvature * max(v, 1.0) ** 2
def report(platform, route, _description, CP, ID, maneuvers):
output_path = Path(__file__).resolve().parent / "lateral_reports"
output_fn = output_path / f"{platform}_{route.replace('/', '_')}.html"
output_path.mkdir(exist_ok=True)
target_cross_times = defaultdict(list)
builder = [
"<style>summary { cursor: pointer; }\n td, th { padding: 8px; } </style>\n",
"<h1>Lateral maneuver report</h1>\n",
f"<h3>{platform}</h3>\n",
f"<h3>{route}</h3>\n",
f"<h3>{ID.gitCommit}, {ID.gitBranch}, {ID.gitRemote}</h3>\n",
]
if _description is not None:
builder.append(f"<h3>Description: {_description}</h3>\n")
builder.append(f"<details><summary><h3 style='display: inline-block;'>CarParams</h3></summary><pre>{format_car_params(CP)}</pre></details>\n")
builder.append('{ summary }') # to be replaced below
for description, runs in maneuvers:
# filter incomplete runs
completed_runs = [msgs for msgs in runs
if any(m.alertDebug.alertText1 == 'Complete' for m in msgs if m.which() == 'alertDebug')]
print(f'plotting maneuver: {description}')
if not completed_runs:
continue
builder.append("<div style='border-top: 1px solid #000; margin: 20px 0;'></div>\n")
builder.append(f"<h2>{description}</h2>\n")
for run, msgs in enumerate(completed_runs):
t_carControl, carControl = zip(*[(m.logMonoTime, m.carControl) for m in msgs if m.which() == 'carControl'], strict=True)
t_carState, carState = zip(*[(m.logMonoTime, m.carState) for m in msgs if m.which() == 'carState'], strict=True)
t_controlsState, controlsState = zip(*[(m.logMonoTime, m.controlsState) for m in msgs if m.which() == 'controlsState'], strict=True)
t_lateralPlan, lateralPlan = zip(*[(m.logMonoTime, m.lateralManeuverPlan) for m in msgs if m.which() == 'lateralManeuverPlan' and m.valid], strict=True)
t_carOutput, carOutput = zip(*[(m.logMonoTime, m.carOutput) for m in msgs if m.which() == 'carOutput'], strict=True)
# make time relative seconds
t_carControl = [(t - t_carControl[0]) / 1e9 for t in t_carControl]
t_carState = [(t - t_carState[0]) / 1e9 for t in t_carState]
t_controlsState = [(t - t_controlsState[0]) / 1e9 for t in t_controlsState]
t_lateralPlan = [(t - t_lateralPlan[0]) / 1e9 for t in t_lateralPlan]
t_carOutput = [(t - t_carOutput[0]) / 1e9 for t in t_carOutput]
# maneuver validity
latActive = [m.latActive for m in carControl]
maneuver_valid = all(latActive) and not any(cs.steeringPressed for cs in carState)
_open = 'open' if maneuver_valid else ''
title = f'Run #{int(run)+1}' + (' <span style="color: red">(invalid maneuver!)</span>' if not maneuver_valid else '')
builder.append(f"<details {_open}><summary><h3 style='display: inline-block;'>{title}</h3></summary>\n")
baseline_accel = lat_accel(controlsState[0].curvature, carState[0].vEgo)
v_ego = [m.vEgo for m in carState]
cross_markers = []
if description.startswith('sine'):
amplitude = max(abs(lat_accel(lp.desiredCurvature, v) - baseline_accel)
for lp, v in zip(lateralPlan, v_ego, strict=False))
threshold = amplitude * 0.5
builder.append('<h3 style="font-weight: normal">50% peak')
for t, cs, v in zip(t_controlsState, controlsState, v_ego, strict=False):
actual = lat_accel(cs.curvature, v) - baseline_accel
if abs(actual) > threshold:
builder.append(f', <strong>crossed in {t:.3f}s</strong>')
cross_markers.append((t, actual + baseline_accel))
if maneuver_valid:
target_cross_times[description].append(t)
break
else:
builder.append(', <strong>not crossed</strong>')
builder.append('</h3>')
if maneuver_valid:
target_cross_times.setdefault(description, [])
else:
action_targets = [(0, lat_accel(lateralPlan[0].desiredCurvature, v_ego[0]) - baseline_accel)]
for i in range(1, min(len(lateralPlan), len(v_ego))):
if abs(lateralPlan[i].desiredCurvature - lateralPlan[i - 1].desiredCurvature) > 0.001:
desired = lat_accel(lateralPlan[i].desiredCurvature, v_ego[i]) - baseline_accel
action_targets.append((i, desired))
for j, (start_i, act_target) in enumerate(action_targets):
start_time = t_lateralPlan[start_i]
end_time = t_lateralPlan[action_targets[j + 1][0]] if j + 1 < len(action_targets) else t_controlsState[-1]
builder.append(f'<h3 style="font-weight: normal">aTarget: {round(act_target, 1)} m/s^2')
prev_crossed = False
for t, cs, v in zip(t_controlsState, controlsState, v_ego, strict=False):
if not (start_time <= t <= end_time):
continue
actual_accel = lat_accel(cs.curvature, v) - baseline_accel
crossed = (0 < act_target < actual_accel) or (0 > act_target > actual_accel)
if crossed and prev_crossed:
cross_time = t - start_time
builder.append(f', <strong>crossed in {cross_time:.3f}s</strong>')
cross_markers.append((t, act_target + baseline_accel))
if maneuver_valid:
target_cross_times[description].append(cross_time)
break
prev_crossed = crossed
else:
builder.append(', <strong>not crossed</strong>')
builder.append('</h3>')
if maneuver_valid:
target_cross_times.setdefault(description, [])
plt.rcParams['font.size'] = 40
fig = plt.figure(figsize=(30, 30))
ax = fig.subplots(4, 1, sharex=True, gridspec_kw={'height_ratios': [5, 3, 3, 3]})
ax[0].grid(linewidth=4)
desired_lat_accel = [lat_accel(m.desiredCurvature, v) for m, v in zip(lateralPlan, v_ego, strict=False)]
if description.startswith('sine'):
ax[0].plot(t_lateralPlan[:len(desired_lat_accel)], desired_lat_accel, label='desired lat accel', linewidth=6)
else:
t_desired = [t_lateralPlan[0]] + t_lateralPlan[:len(desired_lat_accel)]
desired_lat_accel = [baseline_accel] + desired_lat_accel
ax[0].step(t_desired, desired_lat_accel, label='desired lat accel', linewidth=6, where='post')
actual_lat_accel = [lat_accel(cs.curvature, v) for cs, v in zip(controlsState, v_ego, strict=False)]
ax[0].plot(t_controlsState[:len(actual_lat_accel)], actual_lat_accel, label='actual lat accel', linewidth=6)
ax[0].set_ylabel('Lateral Accel (m/s^2)')
for ct, cv in cross_markers:
ax[0].plot(ct, cv, marker='o', markersize=50, markeredgewidth=7, markeredgecolor='black', markerfacecolor='None')
ax2 = ax[0].twinx()
if CP.steerControlType == car.CarParams.SteerControlType.angle:
ax2.plot(t_carOutput, [-m.actuatorsOutput.steeringAngleDeg for m in carOutput], 'C2', label='steer angle', linewidth=6)
else:
ax2.plot(t_carOutput, [-m.actuatorsOutput.torque for m in carOutput], 'C2', label='steer torque', linewidth=6)
h1, l1 = ax[0].get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
ax[0].legend(h1 + h2, l1 + l2, prop={'size': 30})
ax[1].grid(linewidth=4)
ax[1].plot(t_carState, [v * CV.MS_TO_MPH for v in v_ego], label='vEgo', linewidth=6)
ax[1].set_ylabel('Velocity (mph)')
ax[1].yaxis.set_major_formatter(plt.FormatStrFormatter('%.1f'))
ax[1].legend()
t_accel = np.array(t_controlsState[:len(actual_lat_accel)])
raw_jerk = np.gradient(actual_lat_accel, t_accel)
dt_avg = np.mean(np.diff(t_accel))
jerk_filter = FirstOrderFilter(0.0, 1 / (2 * np.pi * LP_FILTER_CUTOFF_HZ), dt_avg)
filtered_jerk = [jerk_filter.update(j) for j in raw_jerk]
ax[2].grid(linewidth=4)
ax[2].plot(t_accel, filtered_jerk, label='actual jerk', linewidth=6)
if CP.steerControlType == car.CarParams.SteerControlType.torque:
desired_jerk = [cs.lateralControlState.torqueState.desiredLateralJerk for cs in controlsState]
ax[2].plot(t_controlsState[:len(controlsState)], desired_jerk, label='desired jerk', linewidth=6)
ax[2].set_ylabel('Jerk (m/s^3)')
ax[2].legend()
ax[3].grid(linewidth=4)
ax[3].plot(t_carControl, [math.degrees(m.orientationNED[0]) for m in carControl], label='roll', linewidth=6)
ax[3].set_ylabel('Roll (deg)')
ax[3].legend()
ax[-1].set_xlabel("Time (s)")
fig.tight_layout()
buffer = io.BytesIO()
fig.savefig(buffer, format='webp')
plt.close(fig)
buffer.seek(0)
builder.append(f"<img src='data:image/webp;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n")
builder.append("</details>\n")
summary = ["<h2>Summary</h2>\n"]
cols = ['maneuver', 'crossed', 'mean', 'min', 'max']
table = []
for description, times in target_cross_times.items():
l = [description, len(times)]
if len(times):
l.extend([round(sum(times) / len(times), 2), round(min(times), 2), round(max(times), 2)])
table.append(l)
summary.append(tabulate(table, headers=cols, tablefmt='html', numalign='left') + '\n')
sum_idx = builder.index('{ summary }')
builder[sum_idx:sum_idx + 1] = summary
with open(output_fn, "w") as f:
f.write(''.join(builder))
print(f"\nOpening report: {output_fn}\n")
webbrowser.open_new_tab(str(output_fn))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generate lateral maneuver report from route')
parser.add_argument('route', type=str, help='Route name (e.g. 00000000--5f742174be)')
parser.add_argument('description', type=str, nargs='?')
args = parser.parse_args()
if '/' in args.route or '|' in args.route:
lr = LogReader(args.route, only_union_types=True)
else:
segs = [seg for seg in os.listdir(Paths.log_root()) if args.route in seg]
lr = LogReader([os.path.join(Paths.log_root(), seg, 'rlog.zst') for seg in segs], only_union_types=True)
CP = lr.first('carParams')
ID = lr.first('initData')
platform = CP.carFingerprint
print('processing report for', platform)
maneuvers: list[tuple[str, list[list]]] = []
active_prev = False
description_prev = None
for msg in lr:
if msg.which() == 'alertDebug':
active = 'Active' in msg.alertDebug.alertText1 or msg.alertDebug.alertText1 == 'Complete'
if active and not active_prev:
if msg.alertDebug.alertText2 == description_prev:
maneuvers[-1][1].append([])
else:
maneuvers.append((msg.alertDebug.alertText2, [[]]))
description_prev = maneuvers[-1][0]
active_prev = active
if active_prev:
maneuvers[-1][1][-1].append(msg)
report(platform, args.route, args.description, CP, ID, maneuvers)

View File

@@ -0,0 +1,180 @@
#!/usr/bin/env python3
import numpy as np
from dataclasses import dataclass
from cereal import messaging, car
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib.drive_helpers import MIN_SPEED
from openpilot.tools.longitudinal_maneuvers.maneuversd import Action, Maneuver as _Maneuver
# thresholds for starting maneuvers
MAX_SPEED_DEV = 0.7 # deviation in m/s
MAX_CURV = 0.002 # 500 m radius
MAX_ROLL = 0.08 # 4.56°
TIMER = 2.0 # sec stable conditions before starting maneuver
@dataclass
class Maneuver(_Maneuver):
_baseline_curvature: float = 0.0
def get_accel(self, v_ego: float, lat_active: bool, curvature: float, roll: float) -> float:
self._run_completed = False
# only start maneuver on straight, flat roads
ready = abs(v_ego - self.initial_speed) < MAX_SPEED_DEV and lat_active and abs(curvature) < MAX_CURV and abs(roll) < MAX_ROLL
self._ready_cnt = (self._ready_cnt + 1) if ready else max(self._ready_cnt - 1, 0)
if self._ready_cnt > (TIMER / DT_MDL):
if not self._active:
self._baseline_curvature = curvature
self._active = True
if not self._active:
return 0.0
return self._step()
def reset(self):
super().reset()
self._ready_cnt = 0
def _sine_action(amplitude, period, duration):
t = np.linspace(0, duration, int(duration / DT_MDL) + 1)
a = amplitude * np.sin(2 * np.pi * t / period)
return Action(a.tolist(), t.tolist())
MANEUVERS = [
Maneuver(
"step right 20mph",
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"step left 20mph",
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"sine 0.5Hz 20mph",
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"step right 30mph",
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
Maneuver(
"step left 30mph",
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
Maneuver(
"sine 0.5Hz 30mph",
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
]
def main():
params = Params()
cloudlog.info("lateral_maneuversd is waiting for CarParams")
messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2')
pm = messaging.PubMaster(['lateralManeuverPlan', 'alertDebug'])
maneuvers = iter(MANEUVERS)
maneuver = None
complete_cnt = 0
display_holdoff = 0
prev_text = ''
while True:
sm.update()
if maneuver is None:
maneuver = next(maneuvers, None)
alert_msg = messaging.new_message('alertDebug')
alert_msg.valid = True
plan_send = messaging.new_message('lateralManeuverPlan')
accel = 0
v_ego = max(sm['carState'].vEgo, 0)
curvature = sm['controlsState'].desiredCurvature
if complete_cnt > 0:
complete_cnt -= 1
alert_msg.alertDebug.alertText1 = 'Completed'
alert_msg.alertDebug.alertText2 = maneuver.description
elif maneuver is not None:
# reset maneuver on steering override or out of range speed
if sm['carState'].steeringPressed or (maneuver.active and abs(v_ego - maneuver.initial_speed) > MAX_SPEED_DEV):
maneuver.reset()
roll = sm['carControl'].orientationNED[0] if len(sm['carControl'].orientationNED) == 3 else 0.0
accel = maneuver.get_accel(v_ego, sm['carControl'].latActive, curvature, roll)
if maneuver._run_completed:
complete_cnt = int(1.0 / DT_MDL)
alert_msg.alertDebug.alertText1 = 'Complete'
alert_msg.alertDebug.alertText2 = maneuver.description
elif maneuver.active:
action_remaining = maneuver.actions[maneuver._action_index].time_bp[-1] - maneuver._action_frames * DT_MDL
if maneuver.description.startswith('sine'):
freq = maneuver.description.split()[1]
alert_msg.alertDebug.alertText1 = f'Active sine {freq} {max(action_remaining, 0):.1f}s'
else:
alert_msg.alertDebug.alertText1 = f'Active {accel:+.1f}m/s² {max(action_remaining, 0):.1f}s'
alert_msg.alertDebug.alertText2 = maneuver.description
elif not (abs(v_ego - maneuver.initial_speed) < MAX_SPEED_DEV and sm['carControl'].latActive):
alert_msg.alertDebug.alertText1 = f'Set speed to {maneuver.initial_speed * CV.MS_TO_MPH:0.0f} mph'
elif maneuver._ready_cnt > 0:
ready_time = max(TIMER - maneuver._ready_cnt * DT_MDL, 0)
alert_msg.alertDebug.alertText1 = f'Starting: {int(ready_time) + 1}'
alert_msg.alertDebug.alertText2 = maneuver.description
else:
curv_ok = abs(curvature) < MAX_CURV
reason = 'road not straight' if not curv_ok else 'road not flat'
alert_msg.alertDebug.alertText1 = f'Waiting: {reason}'
alert_msg.alertDebug.alertText2 = maneuver.description
else:
alert_msg.alertDebug.alertText1 = 'Maneuvers Finished'
# prevent flickering text
setup = ('Set speed', 'Starting', 'Waiting')
text = alert_msg.alertDebug.alertText1
same = text == prev_text or (text.startswith('Starting') and prev_text.startswith('Starting'))
if not same and text.startswith(setup) and prev_text.startswith(setup) and display_holdoff > 0:
alert_msg.alertDebug.alertText1 = prev_text
display_holdoff -= 1
else:
prev_text = text
display_holdoff = int(0.5 / DT_MDL) if text.startswith(setup) else 0
pm.send('alertDebug', alert_msg)
plan_send.valid = maneuver is not None and maneuver.active and complete_cnt == 0
if plan_send.valid:
plan_send.lateralManeuverPlan.desiredCurvature = maneuver._baseline_curvature + accel / max(v_ego, MIN_SPEED) ** 2
pm.send('lateralManeuverPlan', plan_send)
if maneuver is not None and maneuver.finished and complete_cnt == 0:
maneuver = None
if __name__ == "__main__":
main()

View File

@@ -27,23 +27,14 @@ class Maneuver:
_active: bool = False
_finished: bool = False
_run_completed: bool = False
_action_index: int = 0
_action_frames: int = 0
_ready_cnt: int = 0
_repeated: int = 0
def get_accel(self, v_ego: float, long_active: bool, standstill: bool, cruise_standstill: bool) -> float:
ready = abs(v_ego - self.initial_speed) < 0.3 and long_active and not cruise_standstill
if self.initial_speed < 0.01:
ready = ready and standstill
self._ready_cnt = (self._ready_cnt + 1) if ready else 0
if self._ready_cnt > (3. / DT_MDL):
self._active = True
if not self._active:
return min(max(self.initial_speed - v_ego, -2.), 2.)
def _step(self) -> float:
self._run_completed = False
action = self.actions[self._action_index]
action_accel = np.interp(self._action_frames * DT_MDL, action.time_bp, action.accel_bp)
@@ -58,15 +49,34 @@ class Maneuver:
# repeat maneuver
elif self._repeated < self.repeat:
self._repeated += 1
self._action_index = 0
self._action_frames = 0
self._active = False
self._run_completed = True
self.reset()
# finish maneuver
else:
self._run_completed = True
self._finished = True
return float(action_accel)
def get_accel(self, v_ego: float, long_active: bool, standstill: bool, cruise_standstill: bool) -> float:
ready = abs(v_ego - self.initial_speed) < 0.3 and long_active and not cruise_standstill
if self.initial_speed < 0.01:
ready = ready and standstill
self._ready_cnt = (self._ready_cnt + 1) if ready else 0
if self._ready_cnt > (3. / DT_MDL):
self._active = True
if not self._active:
return min(max(self.initial_speed - v_ego, -2.), 2.)
return self._step()
def reset(self):
self._active = False
self._action_frames = 0
self._action_index = 0
@property
def finished(self):
return self._finished