mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-03 12:32:06 +08:00
@@ -1,7 +1,6 @@
|
||||
import math
|
||||
import numpy as np
|
||||
from collections import deque
|
||||
import os
|
||||
|
||||
from cereal import log
|
||||
from openpilot.selfdrive.car.interfaces import FRICTION_THRESHOLD, get_friction_threshold
|
||||
@@ -10,8 +9,6 @@ from openpilot.common.filter_simple import FirstOrderFilter
|
||||
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
|
||||
from openpilot.selfdrive.controls.lib.pid import PIDController
|
||||
from openpilot.selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
from openpilot.selfdrive.controls.lib.torque_ml_model import TorqueMLModel
|
||||
|
||||
# At higher speeds (25+mph) we can assume:
|
||||
# Lateral acceleration achieved by a specific car correlates to
|
||||
@@ -40,19 +37,6 @@ JERK_GAIN = 0.22
|
||||
LAT_ACCEL_REQUEST_BUFFER_SECONDS = 1.0
|
||||
VERSION = 2
|
||||
|
||||
# ==== ML torque override config (hardcoded globals) ====
|
||||
# Toggle this to enable/disable steering on the ML model.
|
||||
TORQUE_ML_ENABLED = True
|
||||
|
||||
# Trained model artifacts vendored into openpilot (so the device only needs openpilot)
|
||||
_ARTIFACTS_DIR = os.path.join(BASEDIR, "selfdrive", "controls", "lib", "torque_ml_artifacts")
|
||||
TORQUE_ML_WEIGHTS_PATH = os.path.join(_ARTIFACTS_DIR, "weights.npz")
|
||||
TORQUE_ML_NORM_PATH = os.path.join(_ARTIFACTS_DIR, "norm.pkl")
|
||||
|
||||
# 1.0 = full ML torque, 0.0 = stock torque controller
|
||||
TORQUE_ML_BLEND = 1.0
|
||||
# ======================================================
|
||||
|
||||
class LatControlTorque(LatControl):
|
||||
def __init__(self, CP, CI, dt):
|
||||
super().__init__(CP, CI, dt)
|
||||
@@ -70,15 +54,6 @@ class LatControlTorque(LatControl):
|
||||
self.measurement_rate_filter = FirstOrderFilter(0.0, 1 / (2 * np.pi * (MAX_LAT_JERK_UP - 0.5)), self.dt)
|
||||
self.low_speed_reset_threshold = max(CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED)
|
||||
|
||||
# ML torque override (enabled via env vars)
|
||||
self._ml = None
|
||||
if TORQUE_ML_ENABLED:
|
||||
self._ml = TorqueMLModel(TORQUE_ML_WEIGHTS_PATH, TORQUE_ML_NORM_PATH)
|
||||
self._ml_blend = float(TORQUE_ML_BLEND)
|
||||
self._prev_des_lat_accel = 0.0
|
||||
self._prev_meas_lat_accel = 0.0
|
||||
self._prev_ml_torque = 0.0
|
||||
|
||||
def update_live_torque_params(self, latAccelFactor, latAccelOffset, friction):
|
||||
self.torque_params.latAccelFactor = latAccelFactor
|
||||
self.torque_params.latAccelOffset = latAccelOffset
|
||||
@@ -149,48 +124,5 @@ class LatControlTorque(LatControl):
|
||||
pid_log.desiredLateralJerk = float(desired_lateral_jerk)
|
||||
pid_log.saturated = bool(self._check_saturation(self.steer_max - abs(output_torque) < 1e-3, CS, steer_limited_by_safety, curvature_limited))
|
||||
|
||||
# ML override uses a different "desired" definition than the PID loop:
|
||||
# it matches the dataset (controlsState.desiredCurvature * v^2), not the delayed setpoint.
|
||||
steer_cmd = -output_torque
|
||||
if self._ml is not None and active:
|
||||
desired_lat_accel = float(desired_curvature * CS.vEgo ** 2)
|
||||
meas_lat_accel = float(measurement)
|
||||
lat_accel_error = desired_lat_accel - meas_lat_accel
|
||||
|
||||
desired_lat_jerk = (desired_lat_accel - self._prev_des_lat_accel) / self.dt
|
||||
meas_lat_jerk = (meas_lat_accel - self._prev_meas_lat_accel) / self.dt
|
||||
|
||||
prev_des_mag = abs(self._prev_des_lat_accel)
|
||||
cur_des_mag = abs(desired_lat_accel)
|
||||
unwind = float((cur_des_mag < prev_des_mag) and (abs(meas_lat_accel) > 0.5))
|
||||
|
||||
# Build features in the same order as training (cmd_torque_dataset.py)
|
||||
features = [
|
||||
desired_lat_accel,
|
||||
meas_lat_accel,
|
||||
lat_accel_error,
|
||||
float(desired_lat_jerk),
|
||||
float(meas_lat_jerk),
|
||||
float(CS.vEgo),
|
||||
float(CS.aEgo),
|
||||
float(CS.steeringAngleDeg),
|
||||
float(CS.steeringRateDeg),
|
||||
unwind,
|
||||
float(self._prev_ml_torque),
|
||||
]
|
||||
|
||||
ml_torque = float(self._ml.predict(features))
|
||||
ml_torque = float(np.clip(ml_torque, -self.steer_max, self.steer_max))
|
||||
|
||||
# Optional blend (default 1.0 = full ML)
|
||||
a = float(np.clip(self._ml_blend, 0.0, 1.0))
|
||||
steer_cmd = (1.0 - a) * steer_cmd + a * ml_torque
|
||||
|
||||
self._prev_des_lat_accel = desired_lat_accel
|
||||
self._prev_meas_lat_accel = meas_lat_accel
|
||||
self._prev_ml_torque = steer_cmd
|
||||
|
||||
|
||||
# TODO left is positive in this convention
|
||||
return steer_cmd, 0.0, pid_log
|
||||
|
||||
return -output_torque, 0.0, pid_log
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -1,45 +0,0 @@
|
||||
import pickle
|
||||
import numpy as np
|
||||
|
||||
|
||||
def _leaky_relu(x: np.ndarray, negative_slope: float = 0.3) -> np.ndarray:
|
||||
return np.where(x > 0, x, x * negative_slope)
|
||||
|
||||
|
||||
class TorqueMLModel:
|
||||
"""
|
||||
Minimal Konverter-style Dense+LeakyReLU(0.3) inference.
|
||||
|
||||
Expects:
|
||||
- weights.npz containing wb = [w_list, b_list]
|
||||
- norm.pkl containing feature_names/x_center/x_scale/y_center/y_scale
|
||||
"""
|
||||
|
||||
def __init__(self, weights_npz: str, norm_pkl: str):
|
||||
wb = np.load(weights_npz, allow_pickle=True)
|
||||
self.w, self.b = wb["wb"]
|
||||
|
||||
with open(norm_pkl, "rb") as f:
|
||||
payload = pickle.load(f)
|
||||
|
||||
self.feature_names = list(payload["feature_names"])
|
||||
self.x_center = np.asarray(payload["x_center"], dtype=np.float32)
|
||||
self.x_scale = np.asarray(payload["x_scale"], dtype=np.float32)
|
||||
self.y_center = float(payload["y_center"])
|
||||
self.y_scale = float(payload["y_scale"])
|
||||
|
||||
# Sanity
|
||||
if len(self.feature_names) != len(self.x_center) or len(self.x_center) != len(self.x_scale):
|
||||
raise ValueError("Normalization shape mismatch")
|
||||
|
||||
def predict(self, features: list[float]) -> float:
|
||||
x = np.asarray(features, dtype=np.float32).reshape((1, -1))
|
||||
x = (x - self.x_center) / self.x_scale
|
||||
|
||||
l = x
|
||||
# Dense, LeakyReLU, Dense, LeakyReLU, Dense(1)
|
||||
l = _leaky_relu(l @ self.w[0] + self.b[0])
|
||||
l = _leaky_relu(l @ self.w[1] + self.b[1])
|
||||
y = l @ self.w[2] + self.b[2]
|
||||
return float(y.reshape(-1)[0] * self.y_scale + self.y_center)
|
||||
|
||||
Reference in New Issue
Block a user