Lateral maneuvers

This commit is contained in:
firestar5683
2026-04-07 00:20:20 -05:00
parent 088e4a6e56
commit 4beb1f6e2e
19 changed files with 1022 additions and 35 deletions
+6
View File
@@ -130,6 +130,7 @@ struct OnroadEvent @0xc4fa6047f024e718 {
userBookmark @95;
excessiveActuation @96;
audioFeedback @97;
lateralManeuver @98;
soundsUnavailableDEPRECATED @47;
}
@@ -1238,6 +1239,10 @@ struct DriverAssistance {
# FCW, AEB, etc. will go here
}
struct LateralManeuverPlan {
desiredCurvature @0 :Float32; # 1/m
}
struct LongitudinalPlan @0xe00b5b3eba12876c {
modelMonoTime @9 :UInt64;
hasLead @7 :Bool;
@@ -2603,6 +2608,7 @@ struct Event {
userBookmark @93 :UserBookmark;
bookmarkButton @148 :UserBookmark;
audioFeedback @149 :AudioFeedback;
lateralManeuverPlan @150 :LateralManeuverPlan;
# *********** debug ***********
testJoystick @52 :Joystick;
+1
View File
@@ -32,6 +32,7 @@ static std::map<std::string, service> services = {
{ "carControl", {"carControl", true, 100.000000, 10, 256000}},
{ "carOutput", {"carOutput", true, 100.000000, 10, 256000}},
{ "longitudinalPlan", {"longitudinalPlan", true, 20.000000, 10, 256000}},
{ "lateralManeuverPlan", {"lateralManeuverPlan", true, 20.000000, -1, 256000}},
{ "driverAssistance", {"driverAssistance", true, 20.000000, 20, 256000}},
{ "procLog", {"procLog", true, 0.500000, 15, 10485760}},
{ "gpsLocationExternal", {"gpsLocationExternal", true, 10.000000, 10, 256000}},
+1
View File
@@ -49,6 +49,7 @@ _services: dict[str, tuple] = {
"carControl": (True, 100., 10),
"carOutput": (True, 100., 10),
"longitudinalPlan": (True, 20., 10),
"lateralManeuverPlan": (True, 20.),
"driverAssistance": (True, 20., 20),
"procLog": (True, 0.5, 15, QueueSize.BIG),
"gpsLocationExternal": (True, 10., 10),
+2
View File
@@ -83,6 +83,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LiveParameters", {PERSISTENT, JSON}},
{"LiveParametersV2", {PERSISTENT, BYTES}},
{"LiveTorqueParameters", {PERSISTENT | DONT_LOG, BYTES}},
{"LateralManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
{"LocationFilterInitialState", {PERSISTENT, BYTES}},
{"LongitudinalManeuverMode", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, BOOL}},
{"LongitudinalPersonality", {PERSISTENT, INT, std::to_string(static_cast<int>(cereal::LongitudinalPersonality::STANDARD))}},
@@ -323,6 +324,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"LongDistanceButtonControl", {PERSISTENT, INT, "5", "0", 2}},
{"LongitudinalActuatorDelay", {PERSISTENT, FLOAT, "0.0", "0.0", 3}},
{"LongitudinalActuatorDelayStock", {PERSISTENT, FLOAT, "0.0", "0.0", 3}},
{"LateralManeuverStatus", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, JSON, "{}", "{}"}},
{"LongitudinalManeuverPaddleMode", {PERSISTENT, STRING, "auto", "auto"}},
{"LongitudinalManeuverStatus", {CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION, JSON, "{}", "{}"}},
{"LongitudinalTune", {PERSISTENT, BOOL, "1", "0", 0}},
+5 -2
View File
@@ -48,7 +48,7 @@ class Controls:
self.CI = interfaces[self.CP.carFingerprint](self.CP, self.FPCP)
self.sm = messaging.SubMaster(['liveDelay', 'liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
'liveCalibration', 'livePose', 'longitudinalPlan', 'lateralManeuverPlan', 'carState', 'carOutput',
'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
self.pm = messaging.PubMaster(['carControl', 'controlsState'])
@@ -147,7 +147,10 @@ class Controls:
# Steering PID loop and lateral MPC
# Reset desired curvature to current to avoid violating the limits on engage
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
if self.sm.valid['lateralManeuverPlan']:
new_desired_curvature = self.sm['lateralManeuverPlan'].desiredCurvature if CC.latActive else self.curvature
else:
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
lat_delay = self.sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS
+6
View File
@@ -504,6 +504,12 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
"Ensure road ahead is clear"),
},
EventName.lateralManeuver: {
ET.WARNING: longitudinal_maneuver_alert,
ET.PERMANENT: NormalPermanentAlert("Lateral Maneuver Mode",
"Ensure road ahead is clear"),
},
EventName.selfdriveInitializing: {
ET.NO_ENTRY: NoEntryAlert("System Initializing"),
},
+7 -3
View File
@@ -82,7 +82,7 @@ class SelfdriveD:
# TODO: de-couple selfdrived with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + self.gps_packets + ['alertDebug']
ignore = self.sensor_packets + self.gps_packets + ['alertDebug', 'lateralManeuverPlan']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
@@ -91,7 +91,8 @@ class SelfdriveD:
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'livePose', 'liveDelay',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userBookmark', 'audioFeedback'] + \
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userBookmark', 'audioFeedback',
'lateralManeuverPlan'] + \
self.camera_packets + self.sensor_packets + self.gps_packets,
ignore_alive=ignore, ignore_avg_freq=ignore,
ignore_valid=ignore, frequency=int(1/DT_CTRL))
@@ -185,7 +186,10 @@ class SelfdriveD:
self.events.add(EventName.joystickDebug)
self.startup_event = None
if self.sm.recv_frame['alertDebug'] > 0:
if self.sm.recv_frame['lateralManeuverPlan'] > 0:
self.events.add(EventName.lateralManeuver)
self.startup_event = None
elif self.sm.recv_frame['alertDebug'] > 0:
self.events.add(EventName.longitudinalManeuver)
self.startup_event = None
@@ -419,6 +419,7 @@ CONFIGS = [
pubs=[
"carState", "deviceState", "pandaStates", "peripheralState", "liveCalibration", "driverMonitoringState",
"longitudinalPlan", "livePose", "liveDelay", "liveParameters", "radarState", "modelV2",
"lateralManeuverPlan",
"driverCameraState", "roadCameraState", "wideRoadCameraState", "managerState", "liveTorqueParameters",
"accelerometer", "gyroscope", "carOutput", "gpsLocationExternal", "gpsLocation", "controlsState",
"carControl", "driverAssistance", "alertDebug", "audioFeedback",
@@ -434,7 +435,7 @@ CONFIGS = [
ProcessConfig(
proc_name="controlsd",
pubs=["liveParameters", "liveTorqueParameters", "modelV2", "selfdriveState",
"liveCalibration", "livePose", "longitudinalPlan", "carState", "carOutput",
"liveCalibration", "livePose", "longitudinalPlan", "lateralManeuverPlan", "carState", "carOutput",
"driverMonitoringState", "onroadEvents", "driverAssistance"],
subs=["carControl", "controlsState"],
ignore=["logMonoTime", ],
+5 -3
View File
@@ -204,9 +204,11 @@ def pond_thread():
parked = sm["starpilotCarState"].isParked
started = sm["deviceState"].started
long_maneuver_mode = params.get_bool("LongitudinalManeuverMode")
lateral_maneuver_mode = params.get_bool("LateralManeuverMode")
maneuver_mode_active = long_maneuver_mode or lateral_maneuver_mode
state_changed = started != previous_started or parked != previous_parked
if params.get_bool("PondPaired") and not long_maneuver_mode:
if params.get_bool("PondPaired") and not maneuver_mode_active:
presence_interval = POND_PRESENCE_INTERVAL_ACTIVE if started or pond_active else POND_PRESENCE_INTERVAL_IDLE
ping_pond_presence(presence_interval, parked, started, state_changed)
@@ -220,7 +222,7 @@ def pond_thread():
if state_changed and parked:
next_toggle_check_at = 0.0
if long_maneuver_mode:
if maneuver_mode_active:
next_toggle_check_at = max(next_toggle_check_at, now + REMOTE_TOGGLE_CHECK_INTERVAL_IDLE)
elif boot_sync_complete and now >= next_toggle_check_at:
latest_pond_active = check_toggles(started, params, sm)
@@ -228,7 +230,7 @@ def pond_thread():
pond_active = latest_pond_active
next_toggle_check_at = now + REMOTE_TOGGLE_CHECK_INTERVAL_ACTIVE if pond_active else REMOTE_TOGGLE_CHECK_INTERVAL_IDLE
if params.get_bool("PondUploadPending") and not long_maneuver_mode:
if params.get_bool("PondUploadPending") and not maneuver_mode_active:
if not params.get_bool("PondPaired"):
params.put_bool("PondUploadPending", False)
elif upload_toggles(params):
@@ -6,6 +6,7 @@ import { ErrorLogs } from "/assets/components/tools/error_logs.js"
import { VehicleFeatures } from "/assets/components/tools/vehicle_features.js"
import { GalaxyPairing } from "/assets/components/tools/galaxy.js"
import { Home } from "/assets/components/home/home.js"
import { LateralManeuvers } from "/assets/components/tools/lateral_maneuvers.js"
import { LongitudinalManeuvers } from "/assets/components/tools/longitudinal_maneuvers.js"
import { MapsManager } from "/assets/components/tools/maps.js"
import { NavDestination } from "/assets/components/navigation/navigation_destination.js"
@@ -71,6 +72,7 @@ function Root() {
createRoute("settings", "/settings/:section/:subsection?", SettingsView),
createRoute("speed_limits", "/download_speed_limits", SpeedLimits),
createRoute("model_manager", "/manage_models", ModelManager),
createRoute("lateral_maneuvers", "/lateral_maneuvers", LateralManeuvers),
createRoute("longitudinal_maneuvers", "/longitudinal_maneuvers", LongitudinalManeuvers),
createRoute("maps", "/manage_maps", MapsManager),
createRoute("plots", "/plots", LivePlots),
@@ -14,6 +14,7 @@ const MENU_ITEMS = {
{ name: "Download Speed Limits", link: "/download_speed_limits", icon: "bi-download" },
{ name: "Error Logs", link: "/manage_error_logs", icon: "bi-exclamation-triangle" },
{ name: "Galaxy", link: "/galaxy", icon: "bi-globe2" },
{ name: "Lateral Maneuvers", link: "/lateral_maneuvers", icon: "bi-sign-turn-right" },
{ name: "Long Maneuvers", link: "/longitudinal_maneuvers", icon: "bi-signpost-split" },
{ name: "Maps", link: "/manage_maps", icon: "bi-map" },
{ name: "Navigation Keys", link: "/manage_navigation_keys", icon: "bi-key-fill" },
@@ -0,0 +1,188 @@
import { html, reactive } from "/assets/vendor/arrow-core.js"
const state = reactive({
loading: true,
busy: false,
error: "",
data: null,
})
let initialized = false
let pollHandle = null
const POLL_INTERVAL_MS = 3000
function isLateralManeuversRouteActive() {
return window.location.pathname === "/lateral_maneuvers"
}
function safeNumber(value, fallback = 0) {
const n = Number(value)
return Number.isFinite(n) ? n : fallback
}
function formatAgeSeconds(value) {
const sec = safeNumber(value, -1)
if (sec < 0) return "unknown"
if (sec < 1) return "just now"
if (sec < 60) return `${Math.round(sec)}s ago`
const min = sec / 60
if (min < 60) return `${Math.round(min)}m ago`
const hr = min / 60
return `${Math.round(hr)}h ago`
}
function normalizeStatusPayload(payload) {
const normalized = payload && typeof payload === "object" ? { ...payload } : {}
normalized.history = Array.isArray(normalized.history) ? normalized.history : []
return normalized
}
async function fetchStatus() {
try {
const response = await fetch("/api/lateral_maneuvers/status")
const payload = await response.json()
if (!response.ok) {
throw new Error(payload.error || response.statusText || "Failed to load maneuver status")
}
state.data = normalizeStatusPayload(payload)
state.error = ""
} catch (error) {
state.error = error?.message || "Failed to load maneuver status"
} finally {
state.loading = false
}
}
function stopPolling() {
if (!pollHandle) return
clearTimeout(pollHandle)
pollHandle = null
}
function ensurePolling() {
if (pollHandle) return
const poll = async () => {
if (!isLateralManeuversRouteActive()) {
stopPolling()
return
}
if (document.visibilityState !== "visible") {
pollHandle = setTimeout(poll, POLL_INTERVAL_MS)
return
}
await fetchStatus()
pollHandle = setTimeout(poll, POLL_INTERVAL_MS)
}
pollHandle = setTimeout(poll, POLL_INTERVAL_MS)
}
async function runAction(action) {
if (state.busy) return
state.busy = true
try {
const response = await fetch(`/api/lateral_maneuvers/${action}`, { method: "POST" })
const payload = await response.json()
if (!response.ok) {
throw new Error(payload.error || response.statusText || `Failed to ${action} maneuvers`)
}
state.data = normalizeStatusPayload(payload)
state.error = ""
showSnackbar(payload.message || "Action complete.")
} catch (error) {
const message = error?.message || `Failed to ${action} maneuvers`
state.error = message
showSnackbar(message, "error")
} finally {
state.busy = false
}
}
function initialize() {
if (initialized) return
initialized = true
fetchStatus()
ensurePolling()
}
function statusLine(label, value) {
return html`<p><strong>${label}:</strong> ${value}</p>`
}
export function LateralManeuvers() {
initialize()
return html`
<div class="longManeuverPage">
<h2>Lateral Maneuvers</h2>
<div class="longManeuverCard">
<p class="longManeuverIntro">
Arm the lateral maneuver suite from your phone and monitor progress live.
</p>
<div class="longManeuverActions">
<button
class="longManeuverButton"
disabled="${() => state.busy || false}"
@click="${() => runAction("start")}">
Start / Arm
</button>
<button
class="longManeuverButton danger"
disabled="${() => state.busy || false}"
@click="${() => runAction("stop")}">
Stop
</button>
</div>
${() => state.loading ? html`<p class="longManeuverMuted">Loading status...</p>` : ""}
${() => state.error ? html`<p class="longManeuverError">${state.error}</p>` : ""}
${() => state.data ? html`
<div class="longManeuverStatusGrid">
${statusLine("Mode Enabled", state.data.modeEnabled ? "Yes" : "No")}
${statusLine("State", state.data.state || "idle")}
${statusLine("Onroad", state.data.isOnroad ? "Yes" : "No")}
${statusLine("Engaged", state.data.isEngaged ? "Yes" : "No")}
${statusLine("Phase", state.data.phase || "n/a")}
${statusLine("Step", `${safeNumber(state.data.stepIndex, 0)}/${safeNumber(state.data.stepTotal, 0)}`)}
${statusLine("Run", `${safeNumber(state.data.runIndex, 0)}/${safeNumber(state.data.runTotal, 0)}`)}
${statusLine("Updated", formatAgeSeconds(state.data.updatedAgeSec))}
</div>
<div class="longManeuverCurrent">
<p><strong>Current Maneuver:</strong> ${state.data.maneuver || "n/a"}</p>
<p><strong>Popup Text:</strong> ${state.data.uiText1 || ""} ${state.data.uiText2 ? `| ${state.data.uiText2}` : ""}</p>
</div>
<div class="longManeuverInstructions">
<h3>Quick Guide</h3>
<ol>
<li>Find a large, empty, straight, flat road or lot with no traffic.</li>
<li>Press <strong>Start / Arm</strong> here, then engage openpilot and set the requested speed manually.</li>
<li>Keep full supervision and be ready to steer or disengage immediately at all times.</li>
<li>Do not touch the wheel during a run. Steering input or speed drift can reset the current maneuver.</li>
<li>When the status says complete, collect logs and generate the HTML report if needed.</li>
</ol>
</div>
<div class="longManeuverHistory">
<h3>Progress Chain</h3>
${() => (state.data.history || []).length ? html`
<ol>
${(state.data.history || []).slice().reverse().map((line) => html`<li>${line}</li>`)}
</ol>
` : html`<p class="longManeuverMuted">No steps logged yet.</p>`}
</div>
` : ""}
</div>
</div>
`
}
+142 -4
View File
@@ -2864,7 +2864,7 @@ def _save_longitudinal_maneuver_status(status):
if not isinstance(history, list):
history = []
status_copy["history"] = [str(line) for line in history if str(line).strip()][-120:]
status_copy["updatedAtSec"] = float(status_copy.get("updatedAtSec") or time.time())
status_copy["updatedAtSec"] = float(status_copy.get("updatedAtSec") or time.monotonic())
params.put("LongitudinalManeuverStatus", json.dumps(status_copy, separators=(",", ":")))
return status_copy
@@ -2889,7 +2889,7 @@ def _get_longitudinal_maneuver_support_snapshot():
def _serialize_longitudinal_maneuver_status(status):
updated_at = _safe_float(status.get("updatedAtSec"), 0.0)
age_seconds = max(0.0, time.time() - updated_at) if updated_at > 0 else None
age_seconds = max(0.0, time.monotonic() - updated_at) if updated_at > 0 else None
mode_enabled = params.get_bool("LongitudinalManeuverMode")
paddle_mode = params.get("LongitudinalManeuverPaddleMode", encoding="utf-8") or str(status.get("paddleMode") or "auto")
support = _get_longitudinal_maneuver_support_snapshot() or status.get("support") or {}
@@ -2924,7 +2924,7 @@ def _set_longitudinal_maneuver_mode(enabled):
"uiSize": "small",
"uiText1": "Long Maneuvers Armed",
"uiText2": "Engage with SET to start the test suite.",
"updatedAtSec": time.time(),
"updatedAtSec": time.monotonic(),
})
_append_longitudinal_maneuver_history(status, "Armed from The Pond. Engage with SET to start.")
else:
@@ -2936,12 +2936,125 @@ def _set_longitudinal_maneuver_mode(enabled):
"uiSize": "small",
"uiText1": "Long Maneuvers Stopped",
"uiText2": "Test mode disabled.",
"updatedAtSec": time.time(),
"updatedAtSec": time.monotonic(),
})
_append_longitudinal_maneuver_history(status, "Stopped from The Pond.")
return _save_longitudinal_maneuver_status(status)
def _default_lateral_maneuver_status():
return {
"state": "idle",
"phase": "",
"maneuver": "",
"runIndex": 0,
"runTotal": 0,
"stepIndex": 0,
"stepTotal": 0,
"phaseStepIndex": 0,
"phaseStepTotal": 0,
"uiShow": False,
"uiSize": "small",
"uiText1": "Lateral Maneuvers",
"uiText2": "",
"updatedAtSec": 0.0,
"history": [],
}
def _load_lateral_maneuver_status():
status = _default_lateral_maneuver_status()
raw = params.get("LateralManeuverStatus", encoding="utf-8") or ""
if raw:
try:
payload = json.loads(raw)
if isinstance(payload, dict):
status.update(payload)
except Exception:
pass
history = status.get("history")
if not isinstance(history, list):
history = []
status["history"] = [str(line) for line in history if str(line).strip()][-120:]
try:
status["updatedAtSec"] = float(status.get("updatedAtSec") or 0.0)
except Exception:
status["updatedAtSec"] = 0.0
return status
def _save_lateral_maneuver_status(status):
status_copy = dict(status)
history = status_copy.get("history")
if not isinstance(history, list):
history = []
status_copy["history"] = [str(line) for line in history if str(line).strip()][-120:]
status_copy["updatedAtSec"] = float(status_copy.get("updatedAtSec") or time.monotonic())
params.put("LateralManeuverStatus", json.dumps(status_copy, separators=(",", ":")))
return status_copy
def _append_lateral_maneuver_history(status, line):
if not line:
return status
history = list(status.get("history", []))
history.append(str(line))
status["history"] = history[-120:]
return status
def _serialize_lateral_maneuver_status(status):
updated_at = _safe_float(status.get("updatedAtSec"), 0.0)
age_seconds = max(0.0, time.monotonic() - updated_at) if updated_at > 0 else None
return {
**status,
"modeEnabled": params.get_bool("LateralManeuverMode"),
"isOnroad": params.get_bool("IsOnroad"),
"isEngaged": params.get_bool("IsEngaged"),
"updatedAgeSec": age_seconds,
}
def _set_lateral_maneuver_mode(enabled):
status = _load_lateral_maneuver_status()
if enabled:
params.put_bool("LateralManeuverMode", True)
params.put_bool("LongitudinalManeuverMode", False)
status.update({
"state": "armed",
"phase": "",
"maneuver": "",
"runIndex": 0,
"runTotal": 0,
"stepIndex": 0,
"stepTotal": 0,
"phaseStepIndex": 0,
"phaseStepTotal": 0,
"uiShow": True,
"uiSize": "small",
"uiText1": "Lateral Maneuvers Armed",
"uiText2": "Stabilize on a straight, flat road to start.",
"updatedAtSec": time.monotonic(),
})
_append_lateral_maneuver_history(status, "Armed from The Pond. Stabilize on a straight, flat road to start.")
else:
params.put_bool("LateralManeuverMode", False)
status.update({
"state": "stopped",
"uiShow": True,
"uiSize": "small",
"uiText1": "Lateral Maneuvers Stopped",
"uiText2": "Test mode disabled.",
"updatedAtSec": time.monotonic(),
})
_append_lateral_maneuver_history(status, "Stopped from The Pond.")
return _save_lateral_maneuver_status(status)
def setup(app):
model_status_debug = {
"last_signature": None,
@@ -4518,6 +4631,8 @@ def setup(app):
@app.route("/api/longitudinal_maneuvers/start", methods=["POST"])
def start_longitudinal_maneuvers():
if params.get_bool("LateralManeuverMode"):
_set_lateral_maneuver_mode(False)
status = _set_longitudinal_maneuver_mode(True)
return jsonify({
"message": "Longitudinal maneuver mode armed. Engage with SET to start.",
@@ -4532,6 +4647,29 @@ def setup(app):
**_serialize_longitudinal_maneuver_status(status),
}), 200
@app.route("/api/lateral_maneuvers/status", methods=["GET"])
def get_lateral_maneuvers_status():
status = _load_lateral_maneuver_status()
return jsonify(_serialize_lateral_maneuver_status(status)), 200
@app.route("/api/lateral_maneuvers/start", methods=["POST"])
def start_lateral_maneuvers():
if params.get_bool("LongitudinalManeuverMode"):
_set_longitudinal_maneuver_mode(False)
status = _set_lateral_maneuver_mode(True)
return jsonify({
"message": "Lateral maneuver mode armed. Stabilize on a straight, flat road to start.",
**_serialize_lateral_maneuver_status(status),
}), 200
@app.route("/api/lateral_maneuvers/stop", methods=["POST"])
def stop_lateral_maneuvers():
status = _set_lateral_maneuver_mode(False)
return jsonify({
"message": "Lateral maneuver mode disabled.",
**_serialize_lateral_maneuver_status(status),
}), 200
@app.route("/api/update/fast/status", methods=["GET"])
def get_fast_update_status():
state_data = _get_fast_update_state()
+5 -1
View File
@@ -41,7 +41,10 @@ def not_joystick(started: bool, params: Params, CP: car.CarParams, starpilot_tog
return started and not params.get_bool("JoystickDebugMode")
def long_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get_bool("LongitudinalManeuverMode")
return started and params.get_bool("LongitudinalManeuverMode") and not params.get_bool("LateralManeuverMode")
def lat_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get_bool("LateralManeuverMode") and not params.get_bool("LongitudinalManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
@@ -115,6 +118,7 @@ procs = [
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("lateral_maneuversd", "tools.lateral_maneuvers.lateral_maneuversd", lat_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),
+1
View File
@@ -0,0 +1 @@
/lateral_reports/
+30
View File
@@ -0,0 +1,30 @@
# Lateral Maneuvers Testing Tool
> [!WARNING]
> Use caution when using this tool.
Test your vehicle's lateral control tuning with this tool. The tool will test the vehicle's ability to follow a few lateral maneuvers and includes a tool to generate a report from the route.
## Instructions
1. Check out a development branch such as `master` on your comma device.
2. The full maneuver suite runs at 20 and 30 mph.
3. Enable "Lateral Maneuver Mode" in Galaxy or set the parameter manually:
```sh
echo -n 1 > /data/params/d/LateralManeuverMode
```
4. Turn your vehicle back on. You will see "Lateral Maneuver Mode".
5. Ensure the area ahead is clear, as openpilot will command lateral acceleration steps in this mode. Once you are ready, set ACC manually to the target speed shown on screen and let openpilot stabilize lateral. After 2 seconds of steady straight driving, the maneuver will begin automatically. openpilot lateral control stays engaged between maneuvers while waiting for the next maneuver's readiness conditions. The maneuver will be aborted and repeated if speed is out of range, steering is touched or openpilot disengages.
6. When the testing is complete, you'll see an alert that says "Maneuvers Finished." Complete the route by pulling over and turning off the vehicle.
7. Locate the route and ensure all logs are uploaded.
8. Gather the route ID and then run the report generator:
```sh
python tools/lateral_maneuvers/generate_report.py 98395b7c5b27882e/000001cc--5a73bde686
```
+239
View File
@@ -0,0 +1,239 @@
#!/usr/bin/env python3
import argparse
import base64
import io
import math
import os
import webbrowser
from collections import defaultdict
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from tabulate import tabulate
from cereal import car
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.controls.lib.latcontrol_torque import LP_FILTER_CUTOFF_HZ
from openpilot.system.hardware.hw import Paths
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.longitudinal_maneuvers.generate_report import format_car_params
def lat_accel(curvature, v_ego):
return curvature * max(v_ego, 1.0) ** 2
def report(platform, route, description_override, CP, ID, maneuvers):
output_path = Path(__file__).resolve().parent / "lateral_reports"
output_fn = output_path / f"{platform}_{route.replace('/', '_')}.html"
output_path.mkdir(exist_ok=True)
target_cross_times = defaultdict(list)
builder = [
"<style>summary { cursor: pointer; }\n td, th { padding: 8px; } </style>\n",
"<h1>Lateral maneuver report</h1>\n",
f"<h3>{platform}</h3>\n",
f"<h3>{route}</h3>\n",
f"<h3>{ID.gitCommit}, {ID.gitBranch}, {ID.gitRemote}</h3>\n",
]
if description_override is not None:
builder.append(f"<h3>Description: {description_override}</h3>\n")
builder.append(f"<details><summary><h3 style='display: inline-block;'>CarParams</h3></summary><pre>{format_car_params(CP)}</pre></details>\n")
builder.append("{ summary }")
for description, runs in maneuvers:
completed_runs = [msgs for msgs in runs if any(m.alertDebug.alertText1 == "Complete" for m in msgs if m.which() == "alertDebug")]
print(f"plotting maneuver: {description}, runs: {len(completed_runs)}")
if not completed_runs:
continue
builder.append("<div style='border-top: 1px solid #000; margin: 20px 0;'></div>\n")
builder.append(f"<h2>{description}</h2>\n")
for run, msgs in enumerate(completed_runs, start=1):
t_car_control, car_control = zip(*[(m.logMonoTime, m.carControl) for m in msgs if m.which() == "carControl"], strict=True)
t_car_state, car_state = zip(*[(m.logMonoTime, m.carState) for m in msgs if m.which() == "carState"], strict=True)
t_controls_state, controls_state = zip(*[(m.logMonoTime, m.controlsState) for m in msgs if m.which() == "controlsState"], strict=True)
t_lateral_plan, lateral_plan = zip(*[(m.logMonoTime, m.lateralManeuverPlan) for m in msgs if m.which() == "lateralManeuverPlan" and m.valid], strict=True)
t_car_output, car_output = zip(*[(m.logMonoTime, m.carOutput) for m in msgs if m.which() == "carOutput"], strict=True)
t_car_control = [(t - t_car_control[0]) / 1e9 for t in t_car_control]
t_car_state = [(t - t_car_state[0]) / 1e9 for t in t_car_state]
t_controls_state = [(t - t_controls_state[0]) / 1e9 for t in t_controls_state]
t_lateral_plan = [(t - t_lateral_plan[0]) / 1e9 for t in t_lateral_plan]
t_car_output = [(t - t_car_output[0]) / 1e9 for t in t_car_output]
lat_active = [m.latActive for m in car_control]
maneuver_valid = all(lat_active) and not any(cs.steeringPressed for cs in car_state)
details_open = "open" if maneuver_valid else ""
title = f"Run #{run}" + (" <span style='color: red'>(invalid maneuver!)</span>" if not maneuver_valid else "")
builder.append(f"<details {details_open}><summary><h3 style='display: inline-block;'>{title}</h3></summary>\n")
baseline_accel = lat_accel(controls_state[0].curvature, car_state[0].vEgo)
v_ego = [m.vEgo for m in car_state]
cross_markers = []
if description.startswith("sine"):
amplitude = max(abs(lat_accel(lp.desiredCurvature, v) - baseline_accel) for lp, v in zip(lateral_plan, v_ego, strict=False))
threshold = amplitude * 0.5
builder.append("<h3 style='font-weight: normal'>50% peak")
for t, cs, v in zip(t_controls_state, controls_state, v_ego, strict=False):
actual = lat_accel(cs.curvature, v) - baseline_accel
if abs(actual) > threshold:
builder.append(f", <strong>crossed in {t:.3f}s</strong>")
cross_markers.append((t, actual + baseline_accel))
if maneuver_valid:
target_cross_times[description].append(t)
break
else:
builder.append(", <strong>not crossed</strong>")
builder.append("</h3>")
else:
action_targets = [(0, lat_accel(lateral_plan[0].desiredCurvature, v_ego[0]) - baseline_accel)]
for i in range(1, min(len(lateral_plan), len(v_ego))):
if abs(lateral_plan[i].desiredCurvature - lateral_plan[i - 1].desiredCurvature) > 0.001:
desired = lat_accel(lateral_plan[i].desiredCurvature, v_ego[i]) - baseline_accel
action_targets.append((i, desired))
for action_idx, (start_idx, action_target) in enumerate(action_targets):
start_time = t_lateral_plan[start_idx]
end_time = t_lateral_plan[action_targets[action_idx + 1][0]] if action_idx + 1 < len(action_targets) else t_controls_state[-1]
builder.append(f"<h3 style='font-weight: normal'>aTarget: {round(action_target, 1)} m/s^2")
prev_crossed = False
for t, cs, v in zip(t_controls_state, controls_state, v_ego, strict=False):
if not (start_time <= t <= end_time):
continue
actual_accel = lat_accel(cs.curvature, v) - baseline_accel
crossed = (0 < action_target < actual_accel) or (0 > action_target > actual_accel)
if crossed and prev_crossed:
cross_time = t - start_time
builder.append(f", <strong>crossed in {cross_time:.3f}s</strong>")
cross_markers.append((t, action_target + baseline_accel))
if maneuver_valid:
target_cross_times[description].append(cross_time)
break
prev_crossed = crossed
else:
builder.append(", <strong>not crossed</strong>")
builder.append("</h3>")
plt.rcParams["font.size"] = 40
fig = plt.figure(figsize=(30, 30))
ax = fig.subplots(4, 1, sharex=True, gridspec_kw={"height_ratios": [5, 3, 3, 3]})
ax[0].grid(linewidth=4)
desired_lat_accel = [lat_accel(m.desiredCurvature, v) for m, v in zip(lateral_plan, v_ego, strict=False)]
if description.startswith("sine"):
ax[0].plot(t_lateral_plan[:len(desired_lat_accel)], desired_lat_accel, label="desired lat accel", linewidth=6)
else:
t_desired = [t_lateral_plan[0]] + t_lateral_plan[:len(desired_lat_accel)]
desired_lat_accel = [baseline_accel] + desired_lat_accel
ax[0].step(t_desired, desired_lat_accel, label="desired lat accel", linewidth=6, where="post")
actual_lat_accel = [lat_accel(cs.curvature, v) for cs, v in zip(controls_state, v_ego, strict=False)]
ax[0].plot(t_controls_state[:len(actual_lat_accel)], actual_lat_accel, label="actual lat accel", linewidth=6)
ax[0].set_ylabel("Lateral Accel (m/s^2)")
for cross_time, cross_value in cross_markers:
ax[0].plot(cross_time, cross_value, marker="o", markersize=50, markeredgewidth=7, markeredgecolor="black", markerfacecolor="None")
ax2 = ax[0].twinx()
if CP.steerControlType == car.CarParams.SteerControlType.angle:
ax2.plot(t_car_output, [-m.actuatorsOutput.steeringAngleDeg for m in car_output], "C2", label="steer angle", linewidth=6)
else:
ax2.plot(t_car_output, [-m.actuatorsOutput.torque for m in car_output], "C2", label="steer torque", linewidth=6)
h1, l1 = ax[0].get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
ax[0].legend(h1 + h2, l1 + l2, prop={"size": 30})
ax[1].grid(linewidth=4)
ax[1].plot(t_car_state, [v * CV.MS_TO_MPH for v in v_ego], label="vEgo", linewidth=6)
ax[1].set_ylabel("Velocity (mph)")
ax[1].yaxis.set_major_formatter(plt.FormatStrFormatter("%.1f"))
ax[1].legend()
t_accel = np.array(t_controls_state[:len(actual_lat_accel)])
raw_jerk = np.gradient(actual_lat_accel, t_accel)
dt_avg = np.mean(np.diff(t_accel))
jerk_filter = FirstOrderFilter(0.0, 1 / (2 * np.pi * LP_FILTER_CUTOFF_HZ), dt_avg)
filtered_jerk = [jerk_filter.update(j) for j in raw_jerk]
ax[2].grid(linewidth=4)
ax[2].plot(t_accel, filtered_jerk, label="actual jerk", linewidth=6)
if CP.steerControlType == car.CarParams.SteerControlType.torque:
desired_jerk = [cs.lateralControlState.torqueState.desiredLateralJerk for cs in controls_state]
ax[2].plot(t_controls_state[:len(controls_state)], desired_jerk, label="desired jerk", linewidth=6)
ax[2].set_ylabel("Jerk (m/s^3)")
ax[2].legend()
ax[3].grid(linewidth=4)
ax[3].plot(t_car_control, [math.degrees(m.orientationNED[0]) for m in car_control], label="roll", linewidth=6)
ax[3].set_ylabel("Roll (deg)")
ax[3].legend()
ax[-1].set_xlabel("Time (s)")
fig.tight_layout()
buffer = io.BytesIO()
fig.savefig(buffer, format="webp")
plt.close(fig)
buffer.seek(0)
builder.append(f"<img src='data:image/webp;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n")
builder.append("</details>\n")
summary = ["<h2>Summary</h2>\n"]
rows = []
for description, times in target_cross_times.items():
row = [description, len(times)]
if times:
row.extend([round(sum(times) / len(times), 2), round(min(times), 2), round(max(times), 2)])
rows.append(row)
summary.append(tabulate(rows, headers=["maneuver", "crossed", "mean", "min", "max"], tablefmt="html", numalign="left") + "\n")
summary_index = builder.index("{ summary }")
builder[summary_index:summary_index + 1] = summary
with open(output_fn, "w") as f:
f.write("".join(builder))
print(f"\nOpening report: {output_fn}\n")
webbrowser.open_new_tab(str(output_fn))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate lateral maneuver report from route")
parser.add_argument("route", type=str, help="Route name (e.g. 00000000--5f742174be)")
parser.add_argument("description", type=str, nargs="?")
args = parser.parse_args()
if "/" in args.route or "|" in args.route:
lr = LogReader(args.route, only_union_types=True)
else:
segs = [seg for seg in os.listdir(Paths.log_root()) if args.route in seg]
lr = LogReader([os.path.join(Paths.log_root(), seg, "rlog.zst") for seg in segs], only_union_types=True)
CP = lr.first("carParams")
ID = lr.first("initData")
platform = CP.carFingerprint
print("processing report for", platform)
maneuvers = []
active_prev = False
description_prev = None
for msg in lr:
if msg.which() == "alertDebug":
active = "Active" in msg.alertDebug.alertText1 or msg.alertDebug.alertText1 == "Complete"
if active and not active_prev:
if msg.alertDebug.alertText2 == description_prev:
maneuvers[-1][1].append([])
else:
maneuvers.append((msg.alertDebug.alertText2, [[]]))
description_prev = maneuvers[-1][0]
active_prev = active
if active_prev:
maneuvers[-1][1][-1].append(msg)
report(platform, args.route, args.description, CP, ID, maneuvers)
+348
View File
@@ -0,0 +1,348 @@
#!/usr/bin/env python3
import json
import time
from dataclasses import dataclass
import numpy as np
from cereal import messaging, car
from openpilot.common.constants import CV
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL, Priority, config_realtime_process
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib.drive_helpers import MIN_SPEED
from openpilot.tools.longitudinal_maneuvers.maneuversd import Action, Maneuver as _Maneuver
STATUS_PARAM = "LateralManeuverStatus"
# thresholds for starting maneuvers
MAX_SPEED_DEV = 0.7 # deviation in m/s
MAX_CURV = 0.002 # 500 m radius
MAX_ROLL = 0.08 # 4.56 deg
TIMER = 2.0 # sec stable conditions before starting maneuver
def _default_status():
return {
"state": "idle",
"phase": "",
"maneuver": "",
"runIndex": 0,
"runTotal": 0,
"stepIndex": 0,
"stepTotal": 0,
"phaseStepIndex": 0,
"phaseStepTotal": 0,
"uiShow": False,
"uiSize": "small",
"uiText1": "Lateral Maneuvers",
"uiText2": "",
"updatedAtSec": 0.0,
"history": [],
}
def _load_status(params: Params):
status = _default_status()
raw = params.get(STATUS_PARAM, encoding="utf-8") or ""
if raw:
try:
payload = json.loads(raw)
if isinstance(payload, dict):
status.update(payload)
except Exception:
pass
history = status.get("history")
if not isinstance(history, list):
history = []
status["history"] = [str(line) for line in history if str(line).strip()][-120:]
try:
status["updatedAtSec"] = float(status.get("updatedAtSec") or 0.0)
except Exception:
status["updatedAtSec"] = 0.0
return status
def _save_status(params: Params, status):
status_copy = dict(status)
history = status_copy.get("history")
if not isinstance(history, list):
history = []
status_copy["history"] = [str(line) for line in history if str(line).strip()][-120:]
status_copy["updatedAtSec"] = float(status_copy.get("updatedAtSec") or time.monotonic())
params.put_nonblocking(STATUS_PARAM, json.dumps(status_copy, separators=(",", ":")))
return status_copy
def _append_history(status, line):
if not line:
return status
history = list(status.get("history", []))
history.append(str(line))
status["history"] = history[-120:]
return status
def _status_signature(status) -> str:
status_copy = dict(status)
status_copy.pop("updatedAtSec", None)
return json.dumps(status_copy, sort_keys=True, separators=(",", ":"))
@dataclass
class Maneuver(_Maneuver):
_baseline_curvature: float = 0.0
def get_accel(self, v_ego: float, lat_active: bool, curvature: float, roll: float) -> float:
self._run_completed = False
ready = abs(v_ego - self.initial_speed) < MAX_SPEED_DEV and lat_active and abs(curvature) < MAX_CURV and abs(roll) < MAX_ROLL
self._ready_cnt = (self._ready_cnt + 1) if ready else max(self._ready_cnt - 1, 0)
if self._ready_cnt > (TIMER / DT_MDL):
if not self._active:
self._baseline_curvature = curvature
self._active = True
if not self._active:
return 0.0
return self._step()
def reset(self):
super().reset()
self._ready_cnt = 0
def _sine_action(amplitude, period, duration):
t = np.linspace(0, duration, int(duration / DT_MDL) + 1)
accel = amplitude * np.sin(2 * np.pi * t / period)
return Action(accel.tolist(), t.tolist())
MANEUVERS = [
Maneuver(
"step right 20mph",
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"step left 20mph",
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"sine 0.5Hz 20mph",
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
repeat=2,
initial_speed=20. * CV.MPH_TO_MS,
),
Maneuver(
"step right 30mph",
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
Maneuver(
"step left 30mph",
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
Maneuver(
"sine 0.5Hz 30mph",
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
repeat=2,
initial_speed=30. * CV.MPH_TO_MS,
),
]
def main():
config_realtime_process(5, Priority.CTRL_LOW)
params = Params()
cloudlog.info("lateral_maneuversd is waiting for CarParams")
messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2')
pm = messaging.PubMaster(['lateralManeuverPlan', 'alertDebug'])
maneuvers = iter(MANEUVERS)
maneuver = None
complete_cnt = 0
display_holdoff = 0
prev_text = ""
last_started_run = None
last_completed_run = None
all_finished_logged = False
status = _load_status(params)
if status.get("state") in ("idle", "stopped"):
status.update({
"state": "armed",
"phase": "",
"maneuver": "",
"runIndex": 0,
"runTotal": 0,
"stepIndex": 0,
"stepTotal": 0,
"phaseStepIndex": 0,
"phaseStepTotal": 0,
"uiShow": True,
"uiSize": "small",
"uiText1": "Lateral Maneuvers Armed",
"uiText2": "Set target speed and let lateral stabilize.",
"updatedAtSec": time.monotonic(),
})
_append_history(status, "Armed from The Pond. Stabilize on a straight, flat road to start.")
last_status_signature = ""
last_status_write = 0.0
while True:
sm.update()
if maneuver is None:
maneuver = next(maneuvers, None)
alert_msg = messaging.new_message('alertDebug')
alert_msg.valid = True
plan_send = messaging.new_message('lateralManeuverPlan')
accel = 0.0
v_ego = max(sm['carState'].vEgo, 0.0)
curvature = sm['controlsState'].desiredCurvature
state = "idle"
phase = ""
maneuver_desc = maneuver.description if maneuver is not None else ""
run_total = maneuver.repeat + 1 if maneuver is not None else 0
run_index = maneuver._repeated + 1 if maneuver is not None else 0
step_total = len(maneuver.actions) if maneuver is not None else 0
step_index = maneuver._action_index + 1 if maneuver is not None and maneuver.active else 0
if complete_cnt > 0:
complete_cnt -= 1
alert_msg.alertDebug.alertText1 = "Completed"
alert_msg.alertDebug.alertText2 = maneuver_desc
state = "completed"
phase = "holdoff"
elif maneuver is not None:
if sm['carState'].steeringPressed or (maneuver.active and abs(v_ego - maneuver.initial_speed) > MAX_SPEED_DEV):
maneuver.reset()
roll = sm['carControl'].orientationNED[0] if len(sm['carControl'].orientationNED) == 3 else 0.0
speed_ready = abs(v_ego - maneuver.initial_speed) < MAX_SPEED_DEV
lat_ready = sm['carControl'].latActive
curv_ok = abs(curvature) < MAX_CURV
roll_ok = abs(roll) < MAX_ROLL
accel = maneuver.get_accel(v_ego, lat_ready, curvature, roll)
run_index = maneuver._repeated + 1 if not maneuver.finished else maneuver.repeat + 1
step_index = maneuver._action_index + 1 if maneuver.active else 0
if maneuver._run_completed:
completed_run = maneuver._repeated + 1 if maneuver.finished else max(maneuver._repeated, 1)
completed_key = (maneuver.description, completed_run)
if last_completed_run != completed_key:
_append_history(status, f"Completed {maneuver.description} run {completed_run}/{run_total}.")
last_completed_run = completed_key
complete_cnt = int(1.0 / DT_MDL)
alert_msg.alertDebug.alertText1 = "Complete"
alert_msg.alertDebug.alertText2 = maneuver.description
state = "completed"
phase = "run_complete"
elif maneuver.active:
started_key = (maneuver.description, maneuver._repeated + 1)
if last_started_run != started_key:
_append_history(status, f"Started {maneuver.description} run {maneuver._repeated + 1}/{run_total}.")
last_started_run = started_key
action_remaining = maneuver.actions[maneuver._action_index].time_bp[-1] - maneuver._action_frames * DT_MDL
if maneuver.description.startswith("sine"):
freq = maneuver.description.split()[1]
alert_msg.alertDebug.alertText1 = f"Active sine {freq} {max(action_remaining, 0):.1f}s"
else:
alert_msg.alertDebug.alertText1 = f"Active {accel:+.1f}m/s² {max(action_remaining, 0):.1f}s"
alert_msg.alertDebug.alertText2 = maneuver.description
state = "running"
phase = "active"
elif not (speed_ready and lat_ready):
alert_msg.alertDebug.alertText1 = f"Set speed to {maneuver.initial_speed * CV.MS_TO_MPH:0.0f} mph"
alert_msg.alertDebug.alertText2 = maneuver.description
state = "setup"
phase = "set_speed"
elif maneuver._ready_cnt > 0:
ready_time = max(TIMER - maneuver._ready_cnt * DT_MDL, 0)
alert_msg.alertDebug.alertText1 = f"Starting: {int(ready_time) + 1}"
alert_msg.alertDebug.alertText2 = maneuver.description
state = "countdown"
phase = "stabilizing"
else:
reason = "road not straight" if not curv_ok else "road not flat"
if curv_ok and roll_ok:
reason = "lateral not engaged"
alert_msg.alertDebug.alertText1 = f"Waiting: {reason}"
alert_msg.alertDebug.alertText2 = maneuver.description
state = "waiting"
phase = "road_check"
else:
alert_msg.alertDebug.alertText1 = "Maneuvers Finished"
state = "finished"
phase = "done"
if not all_finished_logged:
_append_history(status, "Finished all lateral maneuvers.")
all_finished_logged = True
setup_prefixes = ("Set speed", "Starting", "Waiting")
text = alert_msg.alertDebug.alertText1
same_text = text == prev_text or (text.startswith("Starting") and prev_text.startswith("Starting"))
if not same_text and text.startswith(setup_prefixes) and prev_text.startswith(setup_prefixes) and display_holdoff > 0:
alert_msg.alertDebug.alertText1 = prev_text
display_holdoff -= 1
else:
prev_text = text
display_holdoff = int(0.5 / DT_MDL) if text.startswith(setup_prefixes) else 0
pm.send('alertDebug', alert_msg)
plan_send.valid = maneuver is not None and maneuver.active and complete_cnt == 0
if plan_send.valid:
plan_send.lateralManeuverPlan.desiredCurvature = maneuver._baseline_curvature + accel / max(v_ego, MIN_SPEED) ** 2
pm.send('lateralManeuverPlan', plan_send)
status.update({
"state": state,
"phase": phase,
"maneuver": alert_msg.alertDebug.alertText2 or maneuver_desc,
"runIndex": run_index if maneuver is not None else 0,
"runTotal": run_total,
"stepIndex": step_index,
"stepTotal": step_total,
"phaseStepIndex": step_index,
"phaseStepTotal": step_total,
"uiShow": True,
"uiSize": "mid" if alert_msg.alertDebug.alertText2 else "small",
"uiText1": alert_msg.alertDebug.alertText1,
"uiText2": alert_msg.alertDebug.alertText2,
})
status_signature = _status_signature(status)
now = time.monotonic()
if status_signature != last_status_signature or (now - last_status_write) >= 1.0:
status["updatedAtSec"] = time.monotonic()
_save_status(params, status)
last_status_signature = status_signature
last_status_write = now
if maneuver is not None and maneuver.finished and complete_cnt == 0:
maneuver = None
if __name__ == "__main__":
main()
+31 -21
View File
@@ -31,11 +31,37 @@ class Maneuver:
_active: bool = False
_finished: bool = False
_run_completed: bool = False
_action_index: int = 0
_action_frames: int = 0
_ready_cnt: int = 0
_repeated: int = 0
def _step(self) -> float:
self._run_completed = False
action = self.actions[self._action_index]
action_accel = np.interp(self._action_frames * DT_MDL, action.time_bp, action.accel_bp)
self._action_frames += 1
# reached duration of action
if self._action_frames > (action.time_bp[-1] / DT_MDL):
# next action
if self._action_index < len(self.actions) - 1:
self._action_index += 1
self._action_frames = 0
# repeat maneuver
elif self._repeated < self.repeat:
self._repeated += 1
self._run_completed = True
self.reset()
# finish maneuver
else:
self._run_completed = True
self._finished = True
return float(action_accel)
def get_accel(self, v_ego: float, long_active: bool, standstill: bool, cruise_standstill: bool) -> float:
ready = abs(v_ego - self.initial_speed) < 0.3 and long_active
if self.initial_speed < 0.01:
@@ -50,28 +76,12 @@ class Maneuver:
if not self._active:
return min(max(self.initial_speed - v_ego, -2.), 2.)
action = self.actions[self._action_index]
action_accel = np.interp(self._action_frames * DT_MDL, action.time_bp, action.accel_bp)
return self._step()
self._action_frames += 1
# reached duration of action
if self._action_frames > (action.time_bp[-1] / DT_MDL):
# next action
if self._action_index < len(self.actions) - 1:
self._action_index += 1
self._action_frames = 0
# repeat maneuver
elif self._repeated < self.repeat:
self._repeated += 1
self._action_index = 0
self._action_frames = 0
self._active = False
# finish maneuver
else:
self._finished = True
return float(action_accel)
def reset(self):
self._active = False
self._action_frames = 0
self._action_index = 0
@property
def finished(self):