mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-02 12:02:09 +08:00
New Lateral Changes
Revert "New Lateral Changes" This reverts commit 33da93ba70e7aafd8ebb171c1347d8a2a6e363f6. Reapply "New Lateral Changes" This reverts commit ce56770c167a032867d3e885a3bcea11cb455624. Update neural_network_feedforward.py
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
# Twilsonco's Lateral Neural Network Feedforward
|
||||
from collections import deque
|
||||
from difflib import SequenceMatcher
|
||||
from typing import NamedTuple
|
||||
|
||||
import json
|
||||
import math
|
||||
@@ -30,10 +29,9 @@ from openpilot.frogpilot.common.frogpilot_variables import NNFF_MODELS_PATH, get
|
||||
# dict used to rename activation functions whose names aren't valid python identifiers
|
||||
ACTIVATION_FUNCTION_NAMES = {'σ': 'sigmoid'}
|
||||
|
||||
LOW_SPEED_X = [0, 10, 20, 30]
|
||||
LOW_SPEED_Y = [15, 13, 10, 5]
|
||||
LOW_SPEED_Y_NN = [12, 3, 1, 0]
|
||||
|
||||
|
||||
LAT_PLAN_MIN_IDX = 5
|
||||
|
||||
class FluxModel:
|
||||
@@ -42,7 +40,6 @@ class FluxModel:
|
||||
params = json.load(f)
|
||||
|
||||
self.input_size = params["input_size"]
|
||||
self.output_size = params["output_size"]
|
||||
|
||||
self.input_mean = np.array(params["input_mean"], dtype=np.float32).T
|
||||
self.input_std = np.array(params["input_std"], dtype=np.float32).T
|
||||
@@ -152,11 +149,6 @@ def sign(x):
|
||||
def similarity(s1: str, s2: str) -> float:
|
||||
return SequenceMatcher(None, s1, s2).ratio()
|
||||
|
||||
class LatControlInputs(NamedTuple):
|
||||
lateral_acceleration: float
|
||||
roll_compensation: float
|
||||
vego: float
|
||||
aego: float
|
||||
|
||||
class NeuralNetworkFeedforward:
|
||||
def __init__(self, CP, LatControlTorque):
|
||||
@@ -212,7 +204,7 @@ class NeuralNetworkFeedforward:
|
||||
self.nn_future_times = [time + self.lateral_delay for time in self.future_times]
|
||||
self.past_future_len = len(self.past_times) + len(self.nn_future_times)
|
||||
|
||||
def compute_nnff(self, CS, VM, actual_lateral_accel, desired_lateral_accel, gravity_adjusted_lateral_accel, lateral_accel_deadzone, llk, measurement, model_data, params, pid_log, roll_compensation, setpoint, frogpilot_toggles):
|
||||
def compute_nnff(self, CS, VM, actual_lateral_accel, future_desired_lateral_accel, error, gravity_adjusted_future_lateral_accel, llk, measurement, model_data, params, pid_log, setpoint, frogpilot_toggles):
|
||||
if self.use_steering_angle:
|
||||
actual_curvature_rate = -VM.calc_curvature(math.radians(CS.steeringRateDeg), CS.vEgo, 0.0)
|
||||
actual_lateral_jerk = actual_curvature_rate * CS.vEgo ** 2
|
||||
@@ -227,7 +219,7 @@ class NeuralNetworkFeedforward:
|
||||
friction_upper_idx = next((idxs for idxs, value in enumerate(ModelConstants.T_IDXS) if value > lookahead), 16)
|
||||
|
||||
predicted_lateral_jerk = get_predicted_lateral_jerk(model_data.acceleration.y, self.t_diffs)
|
||||
desired_lateral_jerk = (interp(self.lateral_delay, ModelConstants.T_IDXS, model_data.acceleration.y) - desired_lateral_accel) / self.lateral_delay
|
||||
desired_lateral_jerk = (interp(self.lateral_delay, ModelConstants.T_IDXS, model_data.acceleration.y) - future_desired_lateral_accel) / self.lateral_delay
|
||||
|
||||
lookahead_lateral_jerk = get_lookahead_value(predicted_lateral_jerk[LAT_PLAN_MIN_IDX:friction_upper_idx], desired_lateral_jerk)
|
||||
|
||||
@@ -252,7 +244,7 @@ class NeuralNetworkFeedforward:
|
||||
roll = roll_pitch_adjust(roll, pitch)
|
||||
|
||||
self.roll_deque.append(roll)
|
||||
self.lateral_accel_desired_deque.append(desired_lateral_accel)
|
||||
self.lateral_accel_desired_deque.append(future_desired_lateral_accel)
|
||||
|
||||
# prepare past and future values
|
||||
# adjust future times to account for longitudinal acceleration
|
||||
@@ -275,16 +267,15 @@ class NeuralNetworkFeedforward:
|
||||
|
||||
pid_log.error = torque_from_setpoint - torque_from_measurement
|
||||
|
||||
error_blend = interp(abs(desired_lateral_accel), [1.0, 2.0], [0.0, 1.0])
|
||||
error_blend = interp(abs(future_desired_lateral_accel), [1.0, 2.0], [0.0, 1.0])
|
||||
if error_blend > 0.0: # blend in stronger error response when in high lat accel
|
||||
torque_from_error = self.lat_torque_nn_model.evaluate([CS.vEgo, setpoint - measurement, lateral_jerk_setpoint - lateral_jerk_measurement, 0.0])
|
||||
if sign(pid_log.error) == sign(torque_from_error) and abs(pid_log.error) < abs(torque_from_error):
|
||||
pid_log.error = pid_log.error * (1.0 - error_blend) + torque_from_error * error_blend
|
||||
|
||||
# compute feedforward (same as nn setpoint output)
|
||||
error = setpoint - measurement
|
||||
friction_input = self.lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk
|
||||
nn_input = [CS.vEgo, desired_lateral_accel, friction_input, roll] + past_lateral_accels_desired + future_lateral_accels + nnff_common
|
||||
nn_input = [CS.vEgo, future_desired_lateral_accel, friction_input, roll] + past_lateral_accels_desired + future_lateral_accels + nnff_common
|
||||
ff = self.lat_torque_nn_model.evaluate(nn_input)
|
||||
|
||||
# apply friction override for cars with low NN friction response
|
||||
@@ -296,8 +287,7 @@ class NeuralNetworkFeedforward:
|
||||
|
||||
pid_log.error = float(torque_from_setpoint - torque_from_measurement)
|
||||
|
||||
error = desired_lateral_accel - actual_lateral_accel
|
||||
friction_input = self.lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk
|
||||
ff = self.torque_from_lateral_accel(gravity_adjusted_lateral_accel, self.lat_control_torque.torque_params)
|
||||
ff = self.torque_from_lateral_accel(gravity_adjusted_future_lateral_accel, self.lat_control_torque.torque_params)
|
||||
|
||||
return pid_log, ff
|
||||
|
||||
@@ -97,6 +97,19 @@ def get_city_center(latitude, longitude):
|
||||
print(f"Falling back to (0, 0) for {latitude}, {longitude}")
|
||||
return float(0.0), float(0.0), "N/A", "N/A", "N/A"
|
||||
|
||||
def update_branch_commits(now):
|
||||
points = []
|
||||
for branch in ["FrogPilot", "FrogPilot-Staging", "FrogPilot-Testing"]:
|
||||
try:
|
||||
response = requests.get(f"https://api.github.com/repos/FrogAi/FrogPilot/commits/{branch}")
|
||||
response.raise_for_status()
|
||||
sha = response.json()["sha"]
|
||||
points.append(Point("branch_commits").field("commit", sha).tag("branch", branch).time(now))
|
||||
except Exception as e:
|
||||
print(f"Failed to fetch commit for {branch}: {e}")
|
||||
|
||||
return points
|
||||
|
||||
def is_up_to_date(build_metadata):
|
||||
remote_commit = run_cmd(["git", "ls-remote", "origin", build_metadata.channel], f"Fetched remote commit", "Failed to fetch remote commit", report=False)
|
||||
|
||||
@@ -144,7 +157,10 @@ def send_stats():
|
||||
|
||||
selected_theme = random.choice([item for item, count in most_common if count == max_count]).replace("-user_created", "").replace("_", " ")
|
||||
|
||||
point = (Point("user_stats")
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
user_point = (
|
||||
Point("user_stats")
|
||||
.field("blocked_user", frogpilot_toggles.block_user)
|
||||
.field("car_make", "GM" if frogpilot_toggles.car_make == "gm" else frogpilot_toggles.car_make.title())
|
||||
.field("car_model", frogpilot_toggles.car_model)
|
||||
@@ -180,10 +196,13 @@ def send_stats():
|
||||
.tag("branch", build_metadata.channel)
|
||||
.tag("dongle_id", params.get("FrogPilotDongleId", encoding="utf-8"))
|
||||
|
||||
.time(datetime.now(timezone.utc))
|
||||
.time(now)
|
||||
)
|
||||
|
||||
InfluxDBClient(org=org_ID, token=token, url=url).write_api(write_options=SYNCHRONOUS).write(bucket=bucket, org=org_ID, record=point)
|
||||
all_points = [user_point] + update_branch_commits(now)
|
||||
|
||||
client = InfluxDBClient(org=org_ID, token=token, url=url)
|
||||
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket, org=org_ID, record=all_points)
|
||||
print("Successfully sent FrogPilot stats!")
|
||||
except Exception as exception:
|
||||
print(f"Failed to send FrogPilot stats: {exception}")
|
||||
|
||||
@@ -29,6 +29,7 @@ from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, S
|
||||
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
|
||||
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
|
||||
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
|
||||
from openpilot.frogpilot.tinygrad_modeld.tinygrad_modeld import LAT_SMOOTH_SECONDS
|
||||
|
||||
from openpilot.system.hardware import HARDWARE
|
||||
|
||||
@@ -135,11 +136,11 @@ class Controls:
|
||||
|
||||
self.LaC: LatControl
|
||||
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
|
||||
self.LaC = LatControlAngle(self.CP, self.CI)
|
||||
self.LaC = LatControlAngle(self.CP, self.CI, DT_CTRL)
|
||||
elif self.FPCP.lateralTuning.which() == 'pid':
|
||||
self.LaC = LatControlPID(self.CP, self.CI)
|
||||
self.LaC = LatControlPID(self.CP, self.CI, DT_CTRL)
|
||||
elif self.FPCP.lateralTuning.which() == 'torque':
|
||||
self.LaC = LatControlTorque(self.CP, self.FPCP, self.CI)
|
||||
self.LaC = LatControlTorque(self.CP, self.FPCP, self.CI, DT_CTRL)
|
||||
|
||||
self.initialized = False
|
||||
self.state = State.disabled
|
||||
@@ -673,11 +674,12 @@ class Controls:
|
||||
# Reset desired curvature to current to avoid violating the limits on engage
|
||||
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
|
||||
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
|
||||
lat_delay = self.sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS
|
||||
|
||||
actuators.curvature = self.desired_curvature
|
||||
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
|
||||
self.steer_limited_by_safety, self.desired_curvature,
|
||||
curvature_limited,
|
||||
curvature_limited, lat_delay,
|
||||
self.sm['liveLocationKalman'],
|
||||
self.sm['modelV2'],
|
||||
self.frogpilot_toggles)
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import numpy as np
|
||||
from abc import abstractmethod, ABC
|
||||
|
||||
from openpilot.common.realtime import DT_CTRL
|
||||
|
||||
MIN_LATERAL_CONTROL_SPEED = 0.3 # m/s
|
||||
|
||||
|
||||
class LatControl(ABC):
|
||||
def __init__(self, CP, CI):
|
||||
self.sat_count_rate = 1.0 * DT_CTRL
|
||||
def __init__(self, CP, CI, dt):
|
||||
self.dt = dt
|
||||
self.sat_count_rate = 1.0 * self.dt
|
||||
self.sat_limit = CP.steerLimitTimer
|
||||
self.sat_count = 0.
|
||||
self.sat_check_min_speed = 10.
|
||||
@@ -17,7 +16,7 @@ class LatControl(ABC):
|
||||
self.steer_max = 1.0
|
||||
|
||||
@abstractmethod
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, llk, model_data, frogpilot_toggles):
|
||||
def update(self, active: bool, CS, VM, params, steer_limited_by_safety: bool, desired_curvature: float, curvature_limited: bool, lat_delay: float, llk, model_data, frogpilot_toggles):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
|
||||
@@ -8,12 +8,12 @@ STEER_ANGLE_SATURATION_THRESHOLD = 2.5 # Degrees
|
||||
|
||||
|
||||
class LatControlAngle(LatControl):
|
||||
def __init__(self, CP, CI):
|
||||
super().__init__(CP, CI)
|
||||
def __init__(self, CP, CI, dt):
|
||||
super().__init__(CP, CI, dt)
|
||||
self.sat_check_min_speed = 5.
|
||||
self.use_steer_limited_by_safety = CP.carName == "tesla"
|
||||
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, llk, model_data, frogpilot_toggles):
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, lat_delay, llk, model_data, frogpilot_toggles):
|
||||
angle_log = log.ControlsState.LateralAngleState.new_message()
|
||||
|
||||
if not active:
|
||||
|
||||
@@ -6,14 +6,14 @@ from openpilot.selfdrive.controls.lib.pid import PIDController
|
||||
|
||||
|
||||
class LatControlPID(LatControl):
|
||||
def __init__(self, CP, CI):
|
||||
super().__init__(CP, CI)
|
||||
def __init__(self, CP, CI, dt):
|
||||
super().__init__(CP, CI, dt)
|
||||
self.pid = PIDController((CP.lateralTuning.pid.kpBP, CP.lateralTuning.pid.kpV),
|
||||
(CP.lateralTuning.pid.kiBP, CP.lateralTuning.pid.kiV),
|
||||
k_f=CP.lateralTuning.pid.kf, pos_limit=self.steer_max, neg_limit=-self.steer_max)
|
||||
self.get_steer_feedforward = CI.get_steer_feedforward_function()
|
||||
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, llk, model_data, frogpilot_toggles):
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, lat_delay, llk, model_data, frogpilot_toggles):
|
||||
pid_log = log.ControlsState.LateralPIDState.new_message()
|
||||
pid_log.steeringAngleDeg = float(CS.steeringAngleDeg)
|
||||
pid_log.steeringRateDeg = float(CS.steeringRateDeg)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import math
|
||||
import numpy as np
|
||||
from collections import deque
|
||||
|
||||
from cereal import log
|
||||
from openpilot.selfdrive.controls.lib.drive_helpers import get_friction
|
||||
from openpilot.selfdrive.controls.lib.drive_helpers import MIN_SPEED, get_friction
|
||||
from openpilot.selfdrive.car.interfaces import FRICTION_THRESHOLD
|
||||
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
|
||||
from openpilot.selfdrive.controls.lib.pid import PIDController
|
||||
@@ -26,15 +27,17 @@ LOW_SPEED_Y = [15, 13, 10, 5]
|
||||
|
||||
|
||||
class LatControlTorque(LatControl):
|
||||
def __init__(self, CP, FPCP, CI):
|
||||
super().__init__(CP, CI)
|
||||
def __init__(self, CP, FPCP, CI, dt):
|
||||
super().__init__(CP, CI, dt)
|
||||
self.torque_params = FPCP.lateralTuning.torque
|
||||
self.torque_from_lateral_accel = CI.torque_from_lateral_accel()
|
||||
self.lateral_accel_from_torque = CI.lateral_accel_from_torque()
|
||||
self.pid = PIDController(self.torque_params.kp, self.torque_params.ki,
|
||||
k_f=self.torque_params.kf)
|
||||
k_f=self.torque_params.kf, rate=1/self.dt)
|
||||
self.update_limits()
|
||||
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
|
||||
self.LATACCEL_REQUEST_BUFFER_NUM_FRAMES = int(1 / self.dt)
|
||||
self.requested_lateral_accel_buffer = deque([0.] * self.LATACCEL_REQUEST_BUFFER_NUM_FRAMES , maxlen=self.LATACCEL_REQUEST_BUFFER_NUM_FRAMES)
|
||||
|
||||
# FrogPilot variables
|
||||
self.nnff = NeuralNetworkFeedforward(CP, self)
|
||||
@@ -51,7 +54,7 @@ class LatControlTorque(LatControl):
|
||||
self.pid.set_limits(self.lateral_accel_from_torque(self.steer_max, self.torque_params),
|
||||
self.lateral_accel_from_torque(-self.steer_max, self.torque_params))
|
||||
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, llk, model_data, frogpilot_toggles):
|
||||
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, curvature_limited, lat_delay, llk, model_data, frogpilot_toggles):
|
||||
pid_log = log.ControlsState.LateralTorqueState.new_message()
|
||||
if not active:
|
||||
output_torque = 0.0
|
||||
@@ -61,19 +64,26 @@ class LatControlTorque(LatControl):
|
||||
roll_compensation = params.roll * ACCELERATION_DUE_TO_GRAVITY
|
||||
curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0))
|
||||
|
||||
desired_lateral_accel = desired_curvature * CS.vEgo ** 2
|
||||
delay_frames = int(np.clip(lat_delay / self.dt, 1, self.LATACCEL_REQUEST_BUFFER_NUM_FRAMES))
|
||||
expected_lateral_accel = self.requested_lateral_accel_buffer[-delay_frames]
|
||||
# TODO factor out lateral jerk from error to later replace it with delay independent alternative
|
||||
future_desired_lateral_accel = desired_curvature * CS.vEgo ** 2
|
||||
self.requested_lateral_accel_buffer.append(future_desired_lateral_accel)
|
||||
gravity_adjusted_future_lateral_accel = future_desired_lateral_accel - roll_compensation
|
||||
desired_lateral_jerk = (future_desired_lateral_accel - expected_lateral_accel) / lat_delay
|
||||
actual_lateral_accel = actual_curvature * CS.vEgo ** 2
|
||||
lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2
|
||||
|
||||
low_speed_factor = np.interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y_NN if frogpilot_toggles.nnff else LOW_SPEED_Y)**2
|
||||
setpoint = desired_lateral_accel + low_speed_factor * desired_curvature
|
||||
measurement = actual_lateral_accel + low_speed_factor * actual_curvature
|
||||
gravity_adjusted_lateral_accel = desired_lateral_accel - roll_compensation
|
||||
low_speed_factor = np.interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y_NN if frogpilot_toggles.nnff else LOW_SPEED_Y)**2 / (np.clip(CS.vEgo, MIN_SPEED, np.inf) ** 2)
|
||||
setpoint = lat_delay * desired_lateral_jerk + expected_lateral_accel
|
||||
measurement = actual_lateral_accel
|
||||
error = setpoint - measurement
|
||||
error_lsf = error + low_speed_factor * error
|
||||
|
||||
if self.nnff_loaded and frogpilot_toggles.nnff or frogpilot_toggles.nnff_lite:
|
||||
pid_log, ff = self.nnff.compute_nnff(
|
||||
CS, VM, actual_lateral_accel, desired_lateral_accel, gravity_adjusted_lateral_accel, lateral_accel_deadzone,
|
||||
llk, measurement, model_data, params, pid_log, roll_compensation, setpoint, frogpilot_toggles
|
||||
CS, VM, actual_lateral_accel, error, future_desired_lateral_accel, gravity_adjusted_future_lateral_accel,
|
||||
llk, measurement, model_data, params, pid_log, setpoint, frogpilot_toggles
|
||||
)
|
||||
|
||||
freeze_integrator = steer_limited_by_safety or CS.steeringPressed or CS.vEgo < 5
|
||||
@@ -83,11 +93,12 @@ class LatControlTorque(LatControl):
|
||||
freeze_integrator=freeze_integrator)
|
||||
else:
|
||||
# do error correction in lateral acceleration space, convert at end to handle non-linear torque responses correctly
|
||||
pid_log.error = float(setpoint - measurement)
|
||||
ff = gravity_adjusted_lateral_accel
|
||||
pid_log.error = float(error_lsf)
|
||||
ff = gravity_adjusted_future_lateral_accel
|
||||
# latAccelOffset corrects roll compensation bias from device roll misalignment relative to car roll
|
||||
ff -= self.torque_params.latAccelOffset
|
||||
ff += get_friction(desired_lateral_accel - actual_lateral_accel, lateral_accel_deadzone, FRICTION_THRESHOLD, self.torque_params)
|
||||
# TODO jerk is weighted by lat_delay for legacy reasons, but should be made independent of it
|
||||
ff += get_friction(error, lateral_accel_deadzone, FRICTION_THRESHOLD, self.torque_params)
|
||||
|
||||
freeze_integrator = steer_limited_by_safety or CS.steeringPressed or CS.vEgo < 5
|
||||
output_lataccel = self.pid.update(pid_log.error,
|
||||
@@ -103,7 +114,7 @@ class LatControlTorque(LatControl):
|
||||
pid_log.f = float(self.pid.f)
|
||||
pid_log.output = float(-output_torque) # TODO: log lat accel?
|
||||
pid_log.actualLateralAccel = float(actual_lateral_accel)
|
||||
pid_log.desiredLateralAccel = float(desired_lateral_accel)
|
||||
pid_log.desiredLateralAccel = float(expected_lateral_accel)
|
||||
pid_log.saturated = bool(self._check_saturation(self.steer_max - abs(output_torque) < 1e-3, CS, steer_limited_by_safety, curvature_limited))
|
||||
|
||||
# TODO left is positive in this convention
|
||||
|
||||
@@ -16,6 +16,7 @@ from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.common.simple_kalman import KF1D
|
||||
|
||||
from openpilot.frogpilot.common.frogpilot_variables import THRESHOLD, get_frogpilot_toggles
|
||||
from openpilot.selfdrive.controls.controlsd import LaneChangeDirection, LaneChangeState
|
||||
|
||||
# Default lead acceleration decay set to 50% at 1s
|
||||
_LEAD_ACCEL_TAU = 0.6
|
||||
@@ -149,7 +150,15 @@ def laplacian_pdf(x: float, mu: float, b: float):
|
||||
return math.exp(-abs(x-mu)/b)
|
||||
|
||||
|
||||
def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks: dict[int, Track]):
|
||||
def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, model_data: capnp._DynamicStructReader, tracks: dict[int, Track], frogpilot_toggles: SimpleNamespace):
|
||||
if model_data.meta.laneChangeState == LaneChangeState.laneChangeStarting and frogpilot_toggles.human_lane_changes:
|
||||
direction = model_data.meta.laneChangeDirection
|
||||
|
||||
if direction == LaneChangeDirection.left:
|
||||
tracks = {k: v for k, v in tracks.items() if v.yRel > 0}
|
||||
elif direction == LaneChangeDirection.right:
|
||||
tracks = {k: v for k, v in tracks.items() if v.yRel < 0}
|
||||
|
||||
offset_vision_dist = lead.x[0] - RADAR_TO_CAMERA
|
||||
|
||||
def prob(c):
|
||||
@@ -195,11 +204,11 @@ def get_RadarState_from_vision(lead_msg: capnp._DynamicStructReader, v_ego: floa
|
||||
|
||||
def get_lead(v_ego: float, ready: bool, tracks: dict[int, Track], lead_msg: capnp._DynamicStructReader,
|
||||
model_v_ego: float, model_data: capnp._DynamicStructReader, standstill: bool,
|
||||
frogpilot_toggles: SimpleNamespace, frogpilotPlan: capnp._DynamicStructReader,
|
||||
frogpilotPlan: capnp._DynamicStructReader, frogpilot_toggles: SimpleNamespace,
|
||||
low_speed_override: bool = True) -> dict[str, Any]:
|
||||
# Determine leads, this is where the essential logic happens
|
||||
if len(tracks) > 0 and ready and lead_msg.prob > frogpilot_toggles.lead_detection_probability:
|
||||
track = match_vision_to_track(v_ego, lead_msg, tracks)
|
||||
track = match_vision_to_track(v_ego, lead_msg, model_data, tracks, frogpilot_toggles)
|
||||
else:
|
||||
track = None
|
||||
|
||||
@@ -316,8 +325,8 @@ class RadarD:
|
||||
model_v_ego = self.v_ego
|
||||
leads_v3 = sm['modelV2'].leadsV3
|
||||
if len(leads_v3) > 1:
|
||||
self.radar_state.leadOne = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[0], model_v_ego, sm['modelV2'], sm['carState'].standstill, self.frogpilot_toggles, sm['frogpilotPlan'], low_speed_override=True)
|
||||
self.radar_state.leadTwo = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[1], model_v_ego, sm['modelV2'], sm['carState'].standstill, self.frogpilot_toggles, sm['frogpilotPlan'], low_speed_override=False)
|
||||
self.radar_state.leadOne = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[0], model_v_ego, sm['modelV2'], sm['carState'].standstill, sm['frogpilotPlan'], self.frogpilot_toggles, low_speed_override=True)
|
||||
self.radar_state.leadTwo = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[1], model_v_ego, sm['modelV2'], sm['carState'].standstill, sm['frogpilotPlan'], self.frogpilot_toggles, low_speed_override=False)
|
||||
|
||||
if self.frogpilot_toggles.adjacent_lead_tracking and self.ready:
|
||||
self.frogpilot_radar_state.leadLeft = get_adjacent_lead(self.tracks, sm['carState'].standstill, sm['modelV2'], left=True)
|
||||
|
||||
Reference in New Issue
Block a user