#!/usr/bin/env python3 import os os.environ['GMMU'] = '0' # for usbgpu fast loading, noop for qcom from tinygrad.tensor import Tensor import time import pickle import numpy as np import cereal.messaging as messaging from cereal import car, log from cereal.messaging import PubMaster, SubMaster from msgq.visionipc import VisionIpcClient, VisionStreamType, VisionBuf from opendbc.car.car_helpers import get_demo_car_params from openpilot.common.swaglog import cloudlog from openpilot.common.params import Params from openpilot.common.filter_simple import FirstOrderFilter from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.transformations.camera import DEVICE_CAMERAS from openpilot.system.camerad.cameras.nv12_info import get_nv12_info from openpilot.common.transformations.model import get_warp_matrix from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper from openpilot.selfdrive.controls.lib.drive_helpers import get_accel_from_plan, smooth_value, get_curvature_from_plan from openpilot.selfdrive.modeld.parse_model_outputs import Parser from openpilot.selfdrive.modeld.compile_modeld import make_input_queues, WARP_INPUTS, POLICY_INPUTS from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState from openpilot.common.file_chunker import read_file_chunked, get_manifest_path from openpilot.selfdrive.modeld.constants import ModelConstants, Plan from openpilot.selfdrive.modeld.helpers import usbgpu_present, modeld_pkl_path, get_tg_input_devices from dragonpilot.selfdrive.controls.lib.road_edge_detector import RoadEdgeDetector LITE = os.getenv("LITE") is not None PROCESS_NAME = "selfdrive.modeld.modeld" SEND_RAW_PRED = os.getenv('SEND_RAW_PRED') LAT_SMOOTH_SECONDS = 0.0 LONG_SMOOTH_SECONDS = 0.3 MIN_LAT_CONTROL_SPEED = 0.3 def get_action_from_model(model_output: dict[str, np.ndarray], prev_action: log.ModelDataV2.Action, lat_action_t: float, long_action_t: float, v_ego: float, dp_lat_offset_cm: int) -> log.ModelDataV2.Action: plan = model_output['plan'][0] desired_accel, should_stop = get_accel_from_plan(plan[:,Plan.VELOCITY][:,0], plan[:,Plan.ACCELERATION][:,0], ModelConstants.T_IDXS, action_t=long_action_t) desired_accel = smooth_value(desired_accel, prev_action.desiredAcceleration, LONG_SMOOTH_SECONDS) desired_curvature = get_curvature_from_plan(plan[:,Plan.T_FROM_CURRENT_EULER][:,2], plan[:,Plan.ORIENTATION_RATE][:,2], ModelConstants.T_IDXS, v_ego, lat_action_t) if v_ego > MIN_LAT_CONTROL_SPEED: desired_curvature = smooth_value(desired_curvature, prev_action.desiredCurvature, LAT_SMOOTH_SECONDS) else: desired_curvature = prev_action.desiredCurvature # Apply lateral offset (driving style adjustment) if dp_lat_offset_cm != 0: lat_offset_m = dp_lat_offset_cm / 100.0 curvature_offset = 2.0 * lat_offset_m / 900.0 desired_curvature += curvature_offset return log.ModelDataV2.Action(desiredCurvature=float(desired_curvature), desiredAcceleration=float(desired_accel), shouldStop=bool(should_stop)) class FrameMeta: frame_id: int = 0 timestamp_sof: int = 0 timestamp_eof: int = 0 def __init__(self, vipc=None): if vipc is not None: self.frame_id, self.timestamp_sof, self.timestamp_eof = vipc.frame_id, vipc.timestamp_sof, vipc.timestamp_eof class ModelState: prev_desire: np.ndarray # for tracking the rising edge of the pulse def __init__(self, cam_w: int, cam_h: int, usbgpu: bool): input_devices = get_tg_input_devices(PROCESS_NAME, usbgpu) self.WARP_DEV, self.QUEUE_DEV = input_devices['WARP_DEV'], input_devices['QUEUE_DEV'] jits = pickle.loads(read_file_chunked(modeld_pkl_path(usbgpu))) vision_metadata = jits['metadata']['vision'] self.vision_input_shapes = vision_metadata['input_shapes'] self.vision_input_names = list(self.vision_input_shapes.keys()) self.vision_output_slices = vision_metadata['output_slices'] policy_metadata = jits['metadata']['policy'] self.policy_input_shapes = policy_metadata['input_shapes'] self.policy_output_slices = policy_metadata['output_slices'] self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32) self.frame_skip = ModelConstants.MODEL_RUN_FREQ // ModelConstants.MODEL_CONTEXT_FREQ self.input_queues, self.npy = make_input_queues(self.vision_input_shapes, self.policy_input_shapes, self.frame_skip, device=self.QUEUE_DEV) self.full_frames : dict[str, Tensor] = {} self._blob_cache : dict[int, Tensor] = {} self.parser = Parser() self.frame_buf_params = {k: get_nv12_info(cam_w, cam_h) for k in ('img', 'big_img')} self.run_policy = jits['run_policy'] self.warp_enqueue = jits[(cam_w,cam_h)] def slice_outputs(self, model_outputs: np.ndarray, output_slices: dict[str, slice]) -> dict[str, np.ndarray]: parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in output_slices.items()} return parsed_model_outputs def run(self, bufs: dict[str, VisionBuf], transforms: dict[str, np.ndarray], inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None: for key in bufs.keys(): ptr = np.frombuffer(bufs[key].data, dtype=np.uint8).ctypes.data yuv_size = self.frame_buf_params[key][3] # There is a ringbuffer of imgs, just cache tensors pointing to all of them cache_key = (key, ptr) if cache_key not in self._blob_cache: self._blob_cache[cache_key] = Tensor.from_blob(ptr, (yuv_size,), dtype='uint8', device=self.WARP_DEV) self.full_frames[key] = self._blob_cache[cache_key] # Model decides when action is completed, so desire input is just a pulse triggered on rising edge inputs['desire_pulse'][0] = 0 self.npy['desire'][:] = np.where(inputs['desire_pulse'] - self.prev_desire > .99, inputs['desire_pulse'], 0) self.prev_desire[:] = inputs['desire_pulse'] self.npy['traffic_convention'][:] = inputs['traffic_convention'] self.npy['tfm'][:,:] = transforms['img'][:,:] self.npy['big_tfm'][:,:] = transforms['big_img'][:,:] img, big_img = self.warp_enqueue(**{k: self.input_queues[k] for k in WARP_INPUTS}, frame=self.full_frames['img'], big_frame=self.full_frames['big_img']) if prepare_only: return None vision_output, policy_output = self.run_policy( **{k: self.input_queues[k] for k in POLICY_INPUTS}, img=img, big_img=big_img ) vision_output = vision_output.numpy().flatten() policy_output = policy_output.numpy().flatten() vision_outputs_dict = self.parser.parse_vision_outputs(self.slice_outputs(vision_output, self.vision_output_slices)) policy_outputs_dict = self.parser.parse_policy_outputs(self.slice_outputs(policy_output, self.policy_output_slices)) combined_outputs_dict = {**vision_outputs_dict, **policy_outputs_dict} if SEND_RAW_PRED: combined_outputs_dict['raw_pred'] = np.concatenate([vision_output.copy(), policy_output.copy()]) return combined_outputs_dict def main(demo=False): cloudlog.warning("modeld init") _present = usbgpu_present() _compiled = os.path.isfile(get_manifest_path(modeld_pkl_path(usbgpu=True))) USBGPU = _present and _compiled params = Params() params.put_bool("UsbGpuPresent", _present) params.put_bool("UsbGpuCompiled", _compiled) if not USBGPU: # USB GPU currently saturates a core so can't do this yet, # also need to move the aux USB interrupts for good timings config_realtime_process(7, 54) # visionipc clients while True: available_streams = VisionIpcClient.available_streams("camerad", block=False) if available_streams: use_extra_client = VisionStreamType.VISION_STREAM_WIDE_ROAD in available_streams and VisionStreamType.VISION_STREAM_ROAD in available_streams main_wide_camera = VisionStreamType.VISION_STREAM_ROAD not in available_streams break time.sleep(.1) vipc_client_main_stream = VisionStreamType.VISION_STREAM_WIDE_ROAD if main_wide_camera else VisionStreamType.VISION_STREAM_ROAD vipc_client_main = VisionIpcClient("camerad", vipc_client_main_stream, True) vipc_client_extra = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_WIDE_ROAD, False) cloudlog.warning(f"vision stream set up, main_wide_camera: {main_wide_camera}, use_extra_client: {use_extra_client}") while not vipc_client_main.connect(False): time.sleep(0.1) while use_extra_client and not vipc_client_extra.connect(False): time.sleep(0.1) cloudlog.warning(f"connected main cam with buffer size: {vipc_client_main.buffer_len} ({vipc_client_main.width} x {vipc_client_main.height})") if use_extra_client: cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})") st = time.monotonic() cloudlog.warning("loading model") model = ModelState(vipc_client_main.width, vipc_client_main.height, USBGPU) cloudlog.warning(f"models loaded in {time.monotonic() - st:.1f}s, modeld starting") # messaging pm = PubMaster(["modelV2", "drivingModelData", "cameraOdometry", "modelExt"]) sm = SubMaster(["deviceState", "carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "carControl", "liveDelay"]) publish_state = PublishState() params = Params() # setup filter to track dropped frames frame_dropped_filter = FirstOrderFilter(0., 10., 1. / ModelConstants.MODEL_RUN_FREQ) frame_id = 0 last_vipc_frame_id = 0 run_count = 0 model_transform_main = np.zeros((3, 3), dtype=np.float32) model_transform_extra = np.zeros((3, 3), dtype=np.float32) live_calib_seen = False buf_main, buf_extra = None, None meta_main = FrameMeta() meta_extra = FrameMeta() if demo: CP = get_demo_car_params() else: CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams) cloudlog.info("modeld got CarParams: %s", CP.brand) # TODO this needs more thought, use .2s extra for now to estimate other delays # TODO Move smooth seconds to action function long_delay = CP.longitudinalActuatorDelay + LONG_SMOOTH_SECONDS prev_action = log.ModelDataV2.Action() dp_lat_lca_speed = int(params.get("dp_lat_lca_speed")) dp_lat_lca_auto_sec = float(params.get("dp_lat_lca_auto_sec")) DH = DesireHelper(dp_lat_lca_speed=dp_lat_lca_speed, dp_lat_lca_auto_sec=dp_lat_lca_auto_sec) dp_dev_is_rhd = params.get_bool("dp_dev_is_rhd") RED = RoadEdgeDetector(params.get_bool("dp_lat_road_edge_detection")) dp_lat_offset_cm = int(params.get("dp_lat_offset_cm") or 0) while True: # Keep receiving frames until we are at least 1 frame ahead of previous extra frame while meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000: buf_main = vipc_client_main.recv() meta_main = FrameMeta(vipc_client_main) if buf_main is None: break if buf_main is None: cloudlog.debug("vipc_client_main no frame") continue if use_extra_client: # Keep receiving extra frames until frame id matches main camera while True: buf_extra = vipc_client_extra.recv() meta_extra = FrameMeta(vipc_client_extra) if buf_extra is None or meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000: break if buf_extra is None: cloudlog.debug("vipc_client_extra no frame") continue if abs(meta_main.timestamp_sof - meta_extra.timestamp_sof) > 10000000: cloudlog.error(f"frames out of sync! main: {meta_main.frame_id} ({meta_main.timestamp_sof / 1e9:.5f}),\ extra: {meta_extra.frame_id} ({meta_extra.timestamp_sof / 1e9:.5f})") else: # Use single camera buf_extra = buf_main meta_extra = meta_main sm.update(0) desire = DH.desire is_rhd = dp_dev_is_rhd if LITE else sm["driverMonitoringState"].isRHD frame_id = sm["roadCameraState"].frameId v_ego = max(sm["carState"].vEgo, 0.) lat_delay = sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS if sm.updated["liveCalibration"] and sm.seen['roadCameraState'] and sm.seen['deviceState']: device_from_calib_euler = np.array(sm["liveCalibration"].rpyCalib, dtype=np.float32) dc = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))] model_transform_main = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics if main_wide_camera else dc.fcam.intrinsics, False).astype(np.float32) model_transform_extra = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics, True).astype(np.float32) live_calib_seen = True traffic_convention = np.zeros(2) traffic_convention[int(is_rhd)] = 1 vec_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32) if desire >= 0 and desire < ModelConstants.DESIRE_LEN: vec_desire[desire] = 1 # tracked dropped frames vipc_dropped_frames = max(0, meta_main.frame_id - last_vipc_frame_id - 1) frames_dropped = frame_dropped_filter.update(min(vipc_dropped_frames, 10)) if run_count < 10: # let frame drops warm up frame_dropped_filter.x = 0. frames_dropped = 0. run_count = run_count + 1 frame_drop_ratio = frames_dropped / (1 + frames_dropped) prepare_only = vipc_dropped_frames > 0 if prepare_only: cloudlog.error(f"skipping model eval. Dropped {vipc_dropped_frames} frames") bufs = {name: buf_extra if 'big' in name else buf_main for name in model.vision_input_names} transforms = {name: model_transform_extra if 'big' in name else model_transform_main for name in model.vision_input_names} inputs:dict[str, np.ndarray] = { 'desire_pulse': vec_desire, 'traffic_convention': traffic_convention, } mt1 = time.perf_counter() model_output = model.run(bufs, transforms, inputs, prepare_only) mt2 = time.perf_counter() model_execution_time = mt2 - mt1 if model_output is not None: modelv2_send = messaging.new_message('modelV2') drivingdata_send = messaging.new_message('drivingModelData') posenet_send = messaging.new_message('cameraOdometry') model_ext_send = messaging.new_message('modelExt') frame_delay = DT_MDL # compensate for time passed since the frame was captured: current_time - timestamp_eof is 50ms on average action_delay = DT_MDL / 2 # middle of the interval between model output (current state) and next frame (expected state) action = get_action_from_model(model_output, prev_action, lat_delay + frame_delay + action_delay, long_delay + frame_delay + action_delay, v_ego, dp_lat_offset_cm) prev_action = action fill_model_msg(drivingdata_send, modelv2_send, model_output, action, publish_state, meta_main.frame_id, meta_extra.frame_id, frame_id, frame_drop_ratio, meta_main.timestamp_eof, model_execution_time, live_calib_seen) desire_state = modelv2_send.modelV2.meta.desireState l_lane_change_prob = desire_state[log.Desire.laneChangeLeft] r_lane_change_prob = desire_state[log.Desire.laneChangeRight] lane_change_prob = l_lane_change_prob + r_lane_change_prob RED.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs) model_ext_send.modelExt.leftEdgeDetected = RED.left_edge_detected model_ext_send.modelExt.rightEdgeDetected = RED.right_edge_detected DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RED.left_edge_detected, RED.right_edge_detected) modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction drivingdata_send.drivingModelData.meta.laneChangeState = DH.lane_change_state drivingdata_send.drivingModelData.meta.laneChangeDirection = DH.lane_change_direction fill_pose_msg(posenet_send, model_output, meta_main.frame_id, vipc_dropped_frames, meta_main.timestamp_eof, live_calib_seen) pm.send('modelV2', modelv2_send) pm.send('drivingModelData', drivingdata_send) pm.send('cameraOdometry', posenet_send) pm.send('modelExt', model_ext_send) last_vipc_frame_id = meta_main.frame_id if __name__ == "__main__": try: import argparse parser = argparse.ArgumentParser() parser.add_argument('--demo', action='store_true', help='A boolean for demo mode.') args = parser.parse_args() main(demo=args.demo) except KeyboardInterrupt: cloudlog.warning("got SIGINT")