mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-15 12:05:01 +08:00
249 lines
12 KiB
Python
249 lines
12 KiB
Python
import os
|
|
import capnp
|
|
import numpy as np
|
|
from cereal import log
|
|
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan, Meta
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import get_curvature_from_plan
|
|
|
|
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
|
|
|
|
ConfidenceClass = log.ModelDataV2.ConfidenceClass
|
|
|
|
# Return curvature for lateral action. If the model outputs desired_curvature and we're not in mlsim mode,
|
|
# use it directly; otherwise derive from the plan using yaw and yaw-rate.
|
|
def get_curvature_from_output(output: dict, plan: np.ndarray, v_ego: float, lat_action_t: float, mlsim: bool) -> float:
|
|
if not mlsim:
|
|
desired = output.get('desired_curvature')
|
|
if desired is not None:
|
|
return float(desired[0, 0])
|
|
|
|
# Use yaw (index 2) and yaw_rate (index 2)
|
|
theta = plan[:, Plan.T_FROM_CURRENT_EULER][:, 2]
|
|
theta_dot = plan[:, Plan.ORIENTATION_RATE][:, 2]
|
|
return float(get_curvature_from_plan(theta, theta_dot, ModelConstants.T_IDXS, v_ego, lat_action_t))
|
|
|
|
|
|
class PublishState:
|
|
def __init__(self):
|
|
self.disengage_buffer = np.zeros(ModelConstants.CONFIDENCE_BUFFER_LEN*ModelConstants.DISENGAGE_WIDTH, dtype=np.float32)
|
|
self.prev_brake_5ms2_probs = np.zeros(ModelConstants.FCW_5MS2_PROBS_WIDTH, dtype=np.float32)
|
|
self.prev_brake_3ms2_probs = np.zeros(ModelConstants.FCW_3MS2_PROBS_WIDTH, dtype=np.float32)
|
|
|
|
def fill_xyzt(builder, t, x, y, z, x_std=None, y_std=None, z_std=None):
|
|
builder.t = t
|
|
builder.x = x.tolist()
|
|
builder.y = y.tolist()
|
|
builder.z = z.tolist()
|
|
if x_std is not None:
|
|
builder.xStd = x_std.tolist()
|
|
if y_std is not None:
|
|
builder.yStd = y_std.tolist()
|
|
if z_std is not None:
|
|
builder.zStd = z_std.tolist()
|
|
|
|
def fill_xyvat(builder, t, x, y, v, a, x_std=None, y_std=None, v_std=None, a_std=None):
|
|
builder.t = t
|
|
builder.x = x.tolist()
|
|
builder.y = y.tolist()
|
|
builder.v = v.tolist()
|
|
builder.a = a.tolist()
|
|
if x_std is not None:
|
|
builder.xStd = x_std.tolist()
|
|
if y_std is not None:
|
|
builder.yStd = y_std.tolist()
|
|
if v_std is not None:
|
|
builder.vStd = v_std.tolist()
|
|
if a_std is not None:
|
|
builder.aStd = a_std.tolist()
|
|
|
|
def fill_xyz_poly(builder, degree, x, y, z):
|
|
xyz = np.stack([x, y, z], axis=1)
|
|
coeffs = np.polynomial.polynomial.polyfit(ModelConstants.T_IDXS, xyz, deg=degree)
|
|
builder.xCoefficients = coeffs[:, 0].tolist()
|
|
builder.yCoefficients = coeffs[:, 1].tolist()
|
|
builder.zCoefficients = coeffs[:, 2].tolist()
|
|
|
|
def fill_lane_line_meta(builder, lane_lines, lane_line_probs):
|
|
builder.leftY = lane_lines[1].y[0]
|
|
builder.leftProb = lane_line_probs[1]
|
|
builder.rightY = lane_lines[2].y[0]
|
|
builder.rightProb = lane_line_probs[2]
|
|
|
|
def fill_model_msg(base_msg: capnp._DynamicStructBuilder, extended_msg: capnp._DynamicStructBuilder,
|
|
net_output_data: dict[str, np.ndarray], action: log.ModelDataV2.Action,
|
|
publish_state: PublishState, vipc_frame_id: int, vipc_frame_id_extra: int,
|
|
frame_id: int, frame_drop: float, timestamp_eof: int, model_execution_time: float,
|
|
valid: bool) -> None:
|
|
frame_age = frame_id - vipc_frame_id if frame_id > vipc_frame_id else 0
|
|
frame_drop_perc = frame_drop * 100
|
|
extended_msg.valid = valid
|
|
base_msg.valid = valid
|
|
|
|
driving_model_data = base_msg.drivingModelData
|
|
|
|
driving_model_data.frameId = vipc_frame_id
|
|
driving_model_data.frameIdExtra = vipc_frame_id_extra
|
|
driving_model_data.frameDropPerc = frame_drop_perc
|
|
driving_model_data.modelExecutionTime = model_execution_time
|
|
|
|
driving_model_data.action = action
|
|
|
|
modelV2 = extended_msg.modelV2
|
|
modelV2.frameId = vipc_frame_id
|
|
modelV2.frameIdExtra = vipc_frame_id_extra
|
|
modelV2.frameAge = frame_age
|
|
modelV2.frameDropPerc = frame_drop_perc
|
|
modelV2.timestampEof = timestamp_eof
|
|
modelV2.modelExecutionTime = model_execution_time
|
|
|
|
# normalize plan tensors to (IDX_N, WIDTH)
|
|
plan_arr = net_output_data['plan'][0]
|
|
plan_stds_arr = net_output_data['plan_stds'][0]
|
|
|
|
# plan
|
|
fill_xyzt(modelV2.position, ModelConstants.T_IDXS, *plan_arr[:,Plan.POSITION].T, *plan_stds_arr[:,Plan.POSITION].T)
|
|
fill_xyzt(modelV2.velocity, ModelConstants.T_IDXS, *plan_arr[:,Plan.VELOCITY].T)
|
|
fill_xyzt(modelV2.acceleration, ModelConstants.T_IDXS, *plan_arr[:,Plan.ACCELERATION].T)
|
|
fill_xyzt(modelV2.orientation, ModelConstants.T_IDXS, *plan_arr[:,Plan.T_FROM_CURRENT_EULER].T)
|
|
fill_xyzt(modelV2.orientationRate, ModelConstants.T_IDXS, *plan_arr[:,Plan.ORIENTATION_RATE].T)
|
|
|
|
# temporal pose
|
|
try:
|
|
temporal_pose = modelV2.temporalPose
|
|
if 'sim_pose' in net_output_data:
|
|
temporal_pose.trans = net_output_data['sim_pose'][0,:ModelConstants.POSE_WIDTH//2].tolist()
|
|
temporal_pose.transStd = net_output_data['sim_pose_stds'][0,:ModelConstants.POSE_WIDTH//2].tolist()
|
|
temporal_pose.rot = net_output_data['sim_pose'][0,ModelConstants.POSE_WIDTH//2:].tolist()
|
|
temporal_pose.rotStd = net_output_data['sim_pose_stds'][0,ModelConstants.POSE_WIDTH//2:].tolist()
|
|
else:
|
|
temporal_pose.trans = plan_arr[0,Plan.VELOCITY].tolist()
|
|
temporal_pose.transStd = plan_stds_arr[0,Plan.VELOCITY].tolist()
|
|
temporal_pose.rot = plan_arr[0,Plan.ORIENTATION_RATE].tolist()
|
|
temporal_pose.rotStd = plan_stds_arr[0,Plan.ORIENTATION_RATE].tolist()
|
|
except AttributeError:
|
|
pass
|
|
|
|
# poly path
|
|
fill_xyz_poly(driving_model_data.path, ModelConstants.POLY_PATH_DEGREE, *plan_arr[:,Plan.POSITION].T)
|
|
|
|
# action
|
|
modelV2.action = action
|
|
|
|
# times at X_IDXS of edges and lines aren't used
|
|
LINE_T_IDXS: list[float] = []
|
|
|
|
# lane lines
|
|
has_lane_lines = all(k in net_output_data for k in ("lane_lines", "lane_lines_stds", "lane_lines_prob"))
|
|
modelV2.init('laneLines', 6)
|
|
if has_lane_lines:
|
|
for i in range(6):
|
|
lane_line = modelV2.laneLines[i]
|
|
if i < 4:
|
|
fill_xyzt(lane_line, LINE_T_IDXS, np.array(ModelConstants.X_IDXS), net_output_data['lane_lines'][0,i,:,0], net_output_data['lane_lines'][0,i,:,1])
|
|
elif i == 4:
|
|
leftLane_x = 0.5 * (net_output_data['lane_lines'][0,0,:,0] + net_output_data['lane_lines'][0,1,:,0])
|
|
leftLane_y = 0.5 * (net_output_data['lane_lines'][0,0,:,1] + net_output_data['lane_lines'][0,1,:,1])
|
|
fill_xyzt(lane_line, LINE_T_IDXS, np.array(ModelConstants.X_IDXS), leftLane_x, leftLane_y)
|
|
elif i == 5:
|
|
rightLane_x = 0.5 * (net_output_data['lane_lines'][0,2,:,0] + net_output_data['lane_lines'][0,3,:,0])
|
|
rightLane_y = 0.5 * (net_output_data['lane_lines'][0,2,:,1] + net_output_data['lane_lines'][0,3,:,1])
|
|
fill_xyzt(lane_line, LINE_T_IDXS, np.array(ModelConstants.X_IDXS), rightLane_x, rightLane_y)
|
|
modelV2.laneLineStds = net_output_data['lane_lines_stds'][0,:,0,0].tolist()
|
|
modelV2.laneLineProbs = net_output_data['lane_lines_prob'][0,1::2].tolist()
|
|
else:
|
|
x_vals = np.array(ModelConstants.X_IDXS)
|
|
zeros = np.zeros_like(x_vals, dtype=np.float32)
|
|
for i in range(6):
|
|
fill_xyzt(modelV2.laneLines[i], LINE_T_IDXS, x_vals, zeros, zeros)
|
|
modelV2.laneLineStds = [0.0] * 4
|
|
modelV2.laneLineProbs = [0.0] * 4
|
|
|
|
fill_lane_line_meta(driving_model_data.laneLineMeta, modelV2.laneLines, modelV2.laneLineProbs)
|
|
|
|
# road edges
|
|
modelV2.init('roadEdges', 2)
|
|
for i in range(2):
|
|
road_edge = modelV2.roadEdges[i]
|
|
fill_xyzt(road_edge, LINE_T_IDXS, np.array(ModelConstants.X_IDXS), net_output_data['road_edges'][0,i,:,0], net_output_data['road_edges'][0,i,:,1])
|
|
modelV2.roadEdgeStds = net_output_data['road_edges_stds'][0,:,0,0].tolist()
|
|
|
|
# leads
|
|
modelV2.init('leadsV3', 3)
|
|
for i in range(3):
|
|
lead = modelV2.leadsV3[i]
|
|
fill_xyvat(lead, ModelConstants.LEAD_T_IDXS, *net_output_data['lead'][0,i].T, *net_output_data['lead_stds'][0,i].T)
|
|
lead.prob = net_output_data['lead_prob'][0,i].tolist()
|
|
lead.probTime = ModelConstants.LEAD_T_OFFSETS[i]
|
|
|
|
# meta
|
|
meta = modelV2.meta
|
|
if 'desire_state' in net_output_data:
|
|
meta.desireState = net_output_data['desire_state'][0].reshape(-1).tolist()
|
|
else:
|
|
meta.desireState = [0.0] * ModelConstants.DESIRE_PRED_WIDTH
|
|
meta.desirePrediction = net_output_data['desire_pred'][0].reshape(-1).tolist()
|
|
meta.engagedProb = net_output_data['meta'][0,Meta.ENGAGED].item()
|
|
meta.init('disengagePredictions')
|
|
disengage_predictions = meta.disengagePredictions
|
|
disengage_predictions.t = ModelConstants.META_T_IDXS
|
|
disengage_predictions.brakeDisengageProbs = net_output_data['meta'][0,Meta.BRAKE_DISENGAGE].tolist()
|
|
disengage_predictions.gasDisengageProbs = net_output_data['meta'][0,Meta.GAS_DISENGAGE].tolist()
|
|
disengage_predictions.steerOverrideProbs = net_output_data['meta'][0,Meta.STEER_OVERRIDE].tolist()
|
|
disengage_predictions.brake3MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_3].tolist()
|
|
disengage_predictions.brake4MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_4].tolist()
|
|
disengage_predictions.brake5MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_5].tolist()
|
|
disengage_predictions.gasPressProbs = net_output_data['meta'][0,Meta.GAS_PRESS].tolist()
|
|
disengage_predictions.brakePressProbs = net_output_data['meta'][0,Meta.BRAKE_PRESS].tolist()
|
|
|
|
publish_state.prev_brake_5ms2_probs[:-1] = publish_state.prev_brake_5ms2_probs[1:]
|
|
publish_state.prev_brake_5ms2_probs[-1] = net_output_data['meta'][0,Meta.HARD_BRAKE_5][0]
|
|
publish_state.prev_brake_3ms2_probs[:-1] = publish_state.prev_brake_3ms2_probs[1:]
|
|
publish_state.prev_brake_3ms2_probs[-1] = net_output_data['meta'][0,Meta.HARD_BRAKE_3][0]
|
|
hard_brake_predicted = (publish_state.prev_brake_5ms2_probs > ModelConstants.FCW_THRESHOLDS_5MS2).all() and \
|
|
(publish_state.prev_brake_3ms2_probs > ModelConstants.FCW_THRESHOLDS_3MS2).all()
|
|
meta.hardBrakePredicted = hard_brake_predicted.item()
|
|
|
|
# confidence
|
|
if vipc_frame_id % (2*ModelConstants.MODEL_RUN_FREQ) == 0:
|
|
# any disengage prob
|
|
brake_disengage_probs = net_output_data['meta'][0,Meta.BRAKE_DISENGAGE]
|
|
gas_disengage_probs = net_output_data['meta'][0,Meta.GAS_DISENGAGE]
|
|
steer_override_probs = net_output_data['meta'][0,Meta.STEER_OVERRIDE]
|
|
any_disengage_probs = 1-((1-brake_disengage_probs)*(1-gas_disengage_probs)*(1-steer_override_probs))
|
|
# independent disengage prob for each 2s slice
|
|
ind_disengage_probs = np.r_[any_disengage_probs[0], np.diff(any_disengage_probs) / (1 - any_disengage_probs[:-1])]
|
|
# rolling buf for 2, 4, 6, 8, 10s
|
|
publish_state.disengage_buffer[:-ModelConstants.DISENGAGE_WIDTH] = publish_state.disengage_buffer[ModelConstants.DISENGAGE_WIDTH:]
|
|
publish_state.disengage_buffer[-ModelConstants.DISENGAGE_WIDTH:] = ind_disengage_probs
|
|
|
|
score = 0.
|
|
for i in range(ModelConstants.DISENGAGE_WIDTH):
|
|
score += publish_state.disengage_buffer[i*ModelConstants.DISENGAGE_WIDTH+ModelConstants.DISENGAGE_WIDTH-1-i].item() / ModelConstants.DISENGAGE_WIDTH
|
|
if score < ModelConstants.RYG_GREEN:
|
|
modelV2.confidence = ConfidenceClass.green
|
|
elif score < ModelConstants.RYG_YELLOW:
|
|
modelV2.confidence = ConfidenceClass.yellow
|
|
else:
|
|
modelV2.confidence = ConfidenceClass.red
|
|
|
|
# raw prediction if enabled
|
|
if SEND_RAW_PRED:
|
|
modelV2.rawPredictions = net_output_data['raw_pred'].tobytes()
|
|
|
|
def fill_pose_msg(msg: capnp._DynamicStructBuilder, net_output_data: dict[str, np.ndarray],
|
|
vipc_frame_id: int, vipc_dropped_frames: int, timestamp_eof: int, live_calib_seen: bool) -> None:
|
|
msg.valid = live_calib_seen & (vipc_dropped_frames < 1)
|
|
cameraOdometry = msg.cameraOdometry
|
|
|
|
cameraOdometry.frameId = vipc_frame_id
|
|
cameraOdometry.timestampEof = timestamp_eof
|
|
|
|
cameraOdometry.trans = net_output_data['pose'][0,:3].tolist()
|
|
cameraOdometry.rot = net_output_data['pose'][0,3:].tolist()
|
|
cameraOdometry.wideFromDeviceEuler = net_output_data['wide_from_device_euler'][0,:].tolist()
|
|
cameraOdometry.roadTransformTrans = net_output_data['road_transform'][0,:3].tolist()
|
|
cameraOdometry.transStd = net_output_data['pose_stds'][0,:3].tolist()
|
|
cameraOdometry.rotStd = net_output_data['pose_stds'][0,3:].tolist()
|
|
cameraOdometry.wideFromDeviceEulerStd = net_output_data['wide_from_device_euler_stds'][0,:].tolist()
|
|
cameraOdometry.roadTransformTransStd = net_output_data['road_transform_stds'][0,:3].tolist()
|