Compare commits

...

10 Commits

Author SHA1 Message Date
DevTekVE
240044c907 Condensing a little 2025-01-08 10:59:56 +01:00
Jason Wen
1b256f6204 remove this for now 2025-01-07 16:18:17 -05:00
Jason Wen
4597350df7 Merge branch 'master-new' into master-new-model-selector-again 2025-01-07 10:37:43 -05:00
Jason Wen
b8a5448148 do something 2025-01-07 10:37:36 -05:00
DevTekVE
6b0b74def1 Add support for detecting 20Hz models during operation
This commit introduces functionality to detect if the active model operates at 20Hz. A new helper function `is_active_model_20hz` was added, alongside updates to model initialization and configuration handling. Additionally, a new `is20hz` field was added to the model bundle for improved introspection and performance adaptation.
2025-01-07 09:45:51 +01:00
DevTekVE
7cad216a01 Add support for dynamic frame buffer length in DrivingModelFrame
Introduce an `is_20hz` flag to dynamically adjust buffer length and improve flexibility for different frame rates. This ensures better handling of 20Hz configurations by modifying initialization and processing logic. Updated related method signatures and implementation to accommodate the new behavior.
2025-01-07 09:32:50 +01:00
DevTekVE
f45a8f8334 Clean 2025-01-07 09:04:46 +01:00
DevTekVE
a17853e52b Add support for custom model loading via paths and metadata.
Introduced helper functions for retrieving and loading custom models in `run_helpers.py`. Updated model initialization logic in `model_runner.py` to dynamically choose between default and custom models based on environment variables. This enhances flexibility for using alternative model configurations.
2025-01-07 09:04:03 +01:00
DevTekVE
c80021ac48 Refactor model output parsing and cleanup unused code.
Added `input_keys` parameter to enhance output parsing logic in `parse_outputs`. Removed commented-out code and unnecessary variable in `modeld.py` for better clarity and maintainability. Simplified import statements in `modeld.py` to reduce redundancy.
2025-01-07 08:13:55 +01:00
DevTekVE
2ce950ac25 i think this might work? 2025-01-07 07:59:13 +01:00
11 changed files with 288 additions and 77 deletions

View File

@@ -73,6 +73,7 @@ struct ModelManagerSP @0xaedffd8f31e7b55d {
status @4 :DownloadStatus;
generation @5 :UInt32;
environment @6 :Text;
is20hz @7 :Bool;
}
}

View File

@@ -1,21 +1,15 @@
#!/usr/bin/env python3
import os
from openpilot.system.hardware import TICI
from openpilot.selfdrive.modeld.runners.model_runner import ONNXRunner, TinygradRunner
from openpilot.sunnypilot.models.helpers import is_active_model_20hz
#
if TICI:
from tinygrad.tensor import Tensor
from tinygrad.dtype import dtypes
from openpilot.selfdrive.modeld.runners.tinygrad_helpers import qcom_tensor_from_opencl_address
os.environ['QCOM'] = '1'
else:
from openpilot.selfdrive.modeld.runners.ort_helpers import make_onnx_cpu_runner
import os
import time
import pickle
import numpy as np
import cereal.messaging as messaging
from cereal import car, log
from pathlib import Path
from setproctitle import setproctitle
from cereal.messaging import PubMaster, SubMaster
from msgq.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
@@ -27,19 +21,16 @@ from openpilot.common.realtime import config_realtime_process
from openpilot.common.transformations.camera import DEVICE_CAMERAS
from openpilot.common.transformations.model import get_warp_matrix
from openpilot.system import sentry
from openpilot.system.hardware import PC
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.modeld.models.commonmodel_pyx import DrivingModelFrame, CLContext
PROCESS_NAME = "selfdrive.modeld.modeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_PATH = Path(__file__).parent / 'models/supercombo.onnx'
MODEL_PKL_PATH = Path(__file__).parent / 'models/supercombo_tinygrad.pkl'
METADATA_PATH = Path(__file__).parent / 'models/supercombo_metadata.pkl'
USE_ONNX = bool(os.getenv('USE_ONNX', PC))
IS_20HZ_MODEL_DEFAULT = False
class FrameMeta:
frame_id: int = 0
@@ -57,81 +48,86 @@ class ModelState:
prev_desire: np.ndarray # for tracking the rising edge of the pulse
def __init__(self, context: CLContext):
self.frames = {'input_imgs': DrivingModelFrame(context), 'big_input_imgs': DrivingModelFrame(context)}
self.is_20hz = IS_20HZ_MODEL_DEFAULT
if (active_20hz := is_active_model_20hz(None)) is not None:
self.is_20hz = active_20hz
self.frames = {'input_imgs': DrivingModelFrame(context, self.is_20hz), 'big_input_imgs': DrivingModelFrame(context, self.is_20hz)}
self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
self.full_features_20Hz = np.zeros((ModelConstants.FULL_HISTORY_BUFFER_LEN, ModelConstants.FEATURE_LEN), dtype=np.float32)
self.desire_20Hz = np.zeros((ModelConstants.FULL_HISTORY_BUFFER_LEN + 1, ModelConstants.DESIRE_LEN), dtype=np.float32)
# Initialize model runner
self.model_runner = ONNXRunner(self.frames) if (not TICI) and USE_ONNX else TinygradRunner(self.frames)
# img buffers are managed in openCL transform code
self.numpy_inputs = {
'desire': np.zeros((1, (ModelConstants.FULL_HISTORY_BUFFER_LEN+1), ModelConstants.DESIRE_LEN), dtype=np.float32),
'traffic_convention': np.zeros((1, ModelConstants.TRAFFIC_CONVENTION_LEN), dtype=np.float32),
'lateral_control_params': np.zeros((1, ModelConstants.LATERAL_CONTROL_PARAMS_LEN), dtype=np.float32),
'prev_desired_curv': np.zeros((1, (ModelConstants.FULL_HISTORY_BUFFER_LEN+1), ModelConstants.PREV_DESIRED_CURV_LEN), dtype=np.float32),
'features_buffer': np.zeros((1, ModelConstants.FULL_HISTORY_BUFFER_LEN, ModelConstants.FEATURE_LEN), dtype=np.float32),
}
self.numpy_inputs = {}
with open(METADATA_PATH, 'rb') as f:
model_metadata = pickle.load(f)
self.input_shapes = model_metadata['input_shapes']
for key, shape in self.model_runner.input_shapes.items():
if key not in self.frames: # Managed by opencl
self.numpy_inputs[key] = np.zeros(shape, dtype=np.float32)
self.output_slices = model_metadata['output_slices']
net_output_size = model_metadata['output_shapes']['outputs'][1]
self.output = np.zeros(net_output_size, dtype=np.float32)
self.parser = Parser()
if TICI:
self.tensor_inputs = {k: Tensor(v, device='NPY').realize() for k,v in self.numpy_inputs.items()}
with open(MODEL_PKL_PATH, "rb") as f:
self.model_run = pickle.load(f)
else:
self.onnx_cpu_runner = make_onnx_cpu_runner(MODEL_PATH)
net_output_size = self.model_runner.model_metadata['output_shapes']['outputs'][1]
self.output = np.zeros(net_output_size, dtype=np.float32)
def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in self.output_slices.items()}
if SEND_RAW_PRED:
parsed_model_outputs['raw_pred'] = model_outputs.copy()
return parsed_model_outputs
num_elements = self.numpy_inputs['features_buffer'].shape[1]
step_size = int(-100 / num_elements)
self.full_features_20Hz_idxs = np.arange(step_size, step_size * (num_elements + 1), step_size)[::-1]
self.desire_reshape_dims = (self.numpy_inputs['desire'].shape[0], self.numpy_inputs['desire'].shape[1], -1, self.numpy_inputs['desire'].shape[2])
def run(self, buf: VisionBuf, wbuf: VisionBuf, transform: np.ndarray, transform_wide: np.ndarray,
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
# Model decides when action is completed, so desire input is just a pulse triggered on rising edge
inputs['desire'][0] = 0
new_desire = np.where(inputs['desire'] - self.prev_desire > .99, inputs['desire'], 0)
self.prev_desire[:] = inputs['desire']
self.numpy_inputs['desire'][0,:-1] = self.numpy_inputs['desire'][0,1:]
self.numpy_inputs['desire'][0,-1] = new_desire
if self.is_20hz:
self.desire_20Hz[:-1] = self.desire_20Hz[1:]
self.desire_20Hz[-1] = new_desire
self.numpy_inputs['desire'][:] = self.desire_20Hz.reshape(self.desire_reshape_dims).max(axis=2)
else:
self.numpy_inputs['desire'][0,:-1] = self.numpy_inputs['desire'][0,1:]
self.numpy_inputs['desire'][0,-1] = new_desire
for key in self.numpy_inputs:
if key in inputs and key not in ['desire']:
self.numpy_inputs[key][:] = inputs[key]
self.numpy_inputs['traffic_convention'][:] = inputs['traffic_convention']
self.numpy_inputs['lateral_control_params'][:] = inputs['lateral_control_params']
imgs_cl = {'input_imgs': self.frames['input_imgs'].prepare(buf, transform.flatten()),
'big_input_imgs': self.frames['big_input_imgs'].prepare(wbuf, transform_wide.flatten())}
if TICI:
# The imgs tensors are backed by opencl memory, only need init once
for key in imgs_cl:
if key not in self.tensor_inputs:
self.tensor_inputs[key] = qcom_tensor_from_opencl_address(imgs_cl[key].mem_address, self.input_shapes[key], dtype=dtypes.uint8)
else:
for key in imgs_cl:
self.numpy_inputs[key] = self.frames[key].buffer_from_cl(imgs_cl[key]).reshape(self.input_shapes[key]).astype(dtype=np.float32)
# Prepare inputs using the model runner
self.model_runner.prepare_inputs(imgs_cl, self.numpy_inputs)
if prepare_only:
return None
if TICI:
self.output = self.model_run(**self.tensor_inputs).numpy().flatten()
# Run model inference
self.output = self.model_runner.run_model()
outputs = self.parser.parse_outputs(self.model_runner.slice_outputs(self.output))
if self.is_20hz:
self.full_features_20Hz[:-1] = self.full_features_20Hz[1:]
self.full_features_20Hz[-1] = outputs['hidden_state'][0, :]
self.numpy_inputs['features_buffer'][:] = self.full_features_20Hz[self.full_features_20Hz_idxs]
else:
self.output = self.onnx_cpu_runner.run(None, self.numpy_inputs)[0].flatten()
self.numpy_inputs['features_buffer'][0,:-1] = self.numpy_inputs['features_buffer'][0,1:]
self.numpy_inputs['features_buffer'][0,-1] = outputs['hidden_state'][0, :]
outputs = self.parser.parse_outputs(self.slice_outputs(self.output))
if "desired_curvature" in outputs:
input_name_prev = None
self.numpy_inputs['features_buffer'][0,:-1] = self.numpy_inputs['features_buffer'][0,1:]
self.numpy_inputs['features_buffer'][0,-1] = outputs['hidden_state'][0, :]
if "prev_desired_curvs" in self.numpy_inputs.keys():
input_name_prev = 'prev_desired_curvs'
elif "prev_desired_curv" in self.numpy_inputs.keys():
input_name_prev = 'prev_desired_curv'
# TODO model only uses last value now
self.numpy_inputs['prev_desired_curv'][0,:-1] = self.numpy_inputs['prev_desired_curv'][0,1:]
self.numpy_inputs['prev_desired_curv'][0,-1,:] = outputs['desired_curvature'][0, :]
if input_name_prev is not None:
len = outputs['desired_curvature'][0].size
self.numpy_inputs['prev_desired_curv'][0,:-len] = self.numpy_inputs['prev_desired_curv'][0,len:]
self.numpy_inputs['prev_desired_curv'][0,-len,:] = outputs['desired_curvature'][0, :]
return outputs
@@ -242,7 +238,6 @@ def main(demo=False):
is_rhd = sm["driverMonitoringState"].isRHD
frame_id = sm["roadCameraState"].frameId
v_ego = max(sm["carState"].vEgo, 0.)
lateral_control_params = np.array([v_ego, steer_delay], dtype=np.float32)
if sm.updated["liveCalibration"] and sm.seen['roadCameraState'] and sm.seen['deviceState']:
device_from_calib_euler = np.array(sm["liveCalibration"].rpyCalib, dtype=np.float32)
dc = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
@@ -273,8 +268,10 @@ def main(demo=False):
inputs:dict[str, np.ndarray] = {
'desire': vec_desire,
'traffic_convention': traffic_convention,
'lateral_control_params': lateral_control_params,
}
}
if "lateral_control_params" in model.numpy_inputs.keys():
inputs['lateral_control_params'] = np.array([v_ego, steer_delay], dtype=np.float32)
mt1 = time.perf_counter()
model_output = model.run(buf_main, buf_extra, model_transform_main, model_transform_extra, inputs, prepare_only)

View File

@@ -5,11 +5,11 @@
#include "common/clutil.h"
DrivingModelFrame::DrivingModelFrame(cl_device_id device_id, cl_context context) : ModelFrame(device_id, context) {
DrivingModelFrame::DrivingModelFrame(cl_device_id device_id, cl_context context, bool is_20hz) : ModelFrame(device_id, context), is_20hz(is_20hz) {
input_frames = std::make_unique<uint8_t[]>(buf_size);
input_frames_cl = CL_CHECK_ERR(clCreateBuffer(context, CL_MEM_READ_WRITE, buf_size, NULL, &err));
img_buffer_20hz_cl = CL_CHECK_ERR(clCreateBuffer(context, CL_MEM_READ_WRITE, 2*frame_size_bytes, NULL, &err));
region.origin = 1 * frame_size_bytes;
img_buffer_20hz_cl = CL_CHECK_ERR(clCreateBuffer(context, CL_MEM_READ_WRITE, buf_len*frame_size_bytes, NULL, &err));
region.origin = (buf_len - 1) * frame_size_bytes;
region.size = frame_size_bytes;
last_img_cl = CL_CHECK_ERR(clCreateSubBuffer(img_buffer_20hz_cl, CL_MEM_READ_WRITE, CL_BUFFER_CREATE_TYPE_REGION, &region, &err));
@@ -20,7 +20,7 @@ DrivingModelFrame::DrivingModelFrame(cl_device_id device_id, cl_context context)
cl_mem* DrivingModelFrame::prepare(cl_mem yuv_cl, int frame_width, int frame_height, int frame_stride, int frame_uv_offset, const mat3& projection) {
run_transform(yuv_cl, MODEL_WIDTH, MODEL_HEIGHT, frame_width, frame_height, frame_stride, frame_uv_offset, projection);
for (int i = 0; i < 1; i++) {
for (int i = 0; i < (buf_len - 1); i++) {
CL_CHECK(clEnqueueCopyBuffer(q, img_buffer_20hz_cl, img_buffer_20hz_cl, (i+1)*frame_size_bytes, i*frame_size_bytes, frame_size_bytes, 0, nullptr, nullptr));
}
loadyuv_queue(&loadyuv, q, y_cl, u_cl, v_cl, last_img_cl);

View File

@@ -64,7 +64,7 @@ protected:
class DrivingModelFrame : public ModelFrame {
public:
DrivingModelFrame(cl_device_id device_id, cl_context context);
DrivingModelFrame(cl_device_id device_id, cl_context context, bool is_20hz = false);
~DrivingModelFrame();
cl_mem* prepare(cl_mem yuv_cl, int frame_width, int frame_height, int frame_stride, int frame_uv_offset, const mat3& projection);
@@ -74,6 +74,9 @@ public:
const int buf_size = MODEL_FRAME_SIZE * 2;
const size_t frame_size_bytes = MODEL_FRAME_SIZE * sizeof(uint8_t);
const bool is_20hz;
const int buf_len = is_20hz ? 5 : 2;
private:
LoadYUVState loadyuv;
cl_mem img_buffer_20hz_cl, last_img_cl, input_frames_cl;

View File

@@ -19,7 +19,7 @@ cdef extern from "selfdrive/modeld/models/commonmodel.h":
cppclass DrivingModelFrame:
int buf_size
DrivingModelFrame(cl_device_id, cl_context)
DrivingModelFrame(cl_device_id, cl_context, bint)
cppclass MonitoringModelFrame:
int buf_size

View File

@@ -55,8 +55,8 @@ cdef class ModelFrame:
cdef class DrivingModelFrame(ModelFrame):
cdef cppDrivingModelFrame * _frame
def __cinit__(self, CLContext context):
self._frame = new cppDrivingModelFrame(context.device_id, context.context)
def __cinit__(self, CLContext context, bint is_20hz=False):
self._frame = new cppDrivingModelFrame(context.device_id, context.context, is_20hz)
self.frame = <cppModelFrame*>(self._frame)
self.buf_size = self._frame.buf_size

View File

@@ -85,6 +85,7 @@ class Parser:
outs[name + '_stds'] = pred_std_final.reshape(final_shape)
def parse_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
""" Parse the model outputs into a dictionary of numpy arrays. The input_keys are used to determine how the output should be parsed. """
self.parse_mdn('plan', outs, in_N=ModelConstants.PLAN_MHP_N, out_N=ModelConstants.PLAN_MHP_SELECTION,
out_shape=(ModelConstants.IDX_N,ModelConstants.PLAN_WIDTH))
self.parse_mdn('lane_lines', outs, in_N=0, out_N=0, out_shape=(ModelConstants.NUM_LANE_LINES,ModelConstants.IDX_N,ModelConstants.LANE_LINES_WIDTH))

View File

@@ -0,0 +1,120 @@
import os
from openpilot.system.hardware import TICI
from openpilot.sunnypilot.modeld.run_helpers import get_custom_model_paths
#
from tinygrad.tensor import Tensor, dtypes
from openpilot.selfdrive.modeld.runners.tinygrad_helpers import qcom_tensor_from_opencl_address
from openpilot.selfdrive.modeld.runners.ort_helpers import make_onnx_cpu_runner, ORT_TYPES_TO_NP_TYPES
import pickle
import numpy as np
from pathlib import Path
from abc import ABC, abstractmethod
from openpilot.selfdrive.modeld.models.commonmodel_pyx import DrivingModelFrame, CLMem
from openpilot.system.hardware import PC
if TICI:
os.environ['QCOM'] = '1'
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_PATH = Path(__file__).parent / '../models/supercombo.onnx'
MODEL_PKL_PATH = Path(__file__).parent / '../models/supercombo_tinygrad.pkl'
METADATA_PATH = Path(__file__).parent / '../models/supercombo_metadata.pkl'
USE_ONNX = os.getenv('USE_ONNX', PC)
class ModelRunner(ABC):
"""Abstract base class for model runners that defines the interface for running ML models."""
def __init__(self):
"""Initialize the model runner with paths to model and metadata files."""
self.model_paths = ({"model": MODEL_PATH, "metadata": METADATA_PATH} if USE_ONNX else
get_custom_model_paths() or {"model": MODEL_PKL_PATH, "metadata": METADATA_PATH})
with open(self.model_paths["metadata"], 'rb') as f:
self.model_metadata = pickle.load(f)
self.input_shapes = self.model_metadata['input_shapes']
self.output_slices = self.model_metadata['output_slices']
self.inputs: dict = {}
@abstractmethod
def prepare_inputs(self, imgs_cl: dict[str, CLMem], numpy_inputs: dict[str, np.ndarray]) -> dict:
"""Prepare inputs for model inference."""
@abstractmethod
def run_model(self):
"""Run model inference with prepared inputs."""
def slice_outputs(self, model_outputs: np.ndarray) -> dict:
"""Slice model outputs according to metadata configuration."""
parsed_outputs = {k: model_outputs[np.newaxis, v] for k, v in self.output_slices.items()}
if SEND_RAW_PRED:
parsed_outputs['raw_pred'] = model_outputs.copy()
return parsed_outputs
class TinygradRunner(ModelRunner):
"""Tinygrad implementation of model runner for TICI hardware."""
def __init__(self, frames: dict[str, DrivingModelFrame] | None = None):
super().__init__()
if not str(self.model_paths["model"]).endswith("_tinygrad.pkl"):
raise ValueError(f"Tinygrad model must be a _tinygrad.pkl file, we got {self.model_paths['model']}")
# Load Tinygrad model
with open(self.model_paths["model"], "rb") as f:
self.model_run = pickle.load(f)
self.input_to_dtype = {}
self.input_to_device = {}
for idx, name in enumerate(self.model_run.captured.expected_names):
self.input_to_dtype[name] = self.model_run.captured.expected_st_vars_dtype_device[idx][2] # 2 is the dtype
self.input_to_device[name] = self.model_run.captured.expected_st_vars_dtype_device[idx][3] # 3 is the device
assert TICI or frames is not None, "TinygradRunner requires frames for non-TICI hardware"
self.frames = frames
self.is_memory_model = None # Use None to indicate that it hasn't been determined yet
def prepare_inputs(self, imgs_cl: dict[str, CLMem], numpy_inputs: dict[str, np.ndarray]) -> dict:
# Initialize image tensors if not already done
for key in imgs_cl:
if TICI and key not in self.inputs:
self.inputs[key] = qcom_tensor_from_opencl_address(imgs_cl[key].mem_address, self.input_shapes[key], dtype=dtypes.uint8)
elif not TICI:
shape = self.frames[key].buffer_from_cl(imgs_cl[key]).reshape(self.input_shapes[key])
self.inputs[key] = Tensor(shape, device=self.input_to_device[key], dtype=self.input_to_dtype[key]).realize()
# Update numpy inputs
for key, value in numpy_inputs.items():
if key not in imgs_cl:
self.inputs[key] = Tensor(value, device=self.input_to_device[key], dtype=self.input_to_dtype[key]).realize()
return self.inputs
def run_model(self):
return self.model_run(**self.inputs).numpy().flatten()
class ONNXRunner(ModelRunner):
"""ONNX implementation of model runner for non-TICI hardware."""
def __init__(self, frames: dict[str, DrivingModelFrame]):
super().__init__()
self.runner = make_onnx_cpu_runner(self.model_paths["model"])
self.frames = frames
self.input_to_nptype = {
model_input.name: ORT_TYPES_TO_NP_TYPES[model_input.type]
for model_input in self.runner.get_inputs()
}
def prepare_inputs(self, imgs_cl: dict[str, CLMem], numpy_inputs: dict[str, np.ndarray]) -> dict:
self.inputs = numpy_inputs.copy()
for key in imgs_cl:
self.inputs[key] = self.frames[key].buffer_from_cl(imgs_cl[key]).reshape(self.input_shapes[key]).astype(dtype=np.float32)
return self.inputs
def run_model(self):
return self.runner.run(None, self.inputs)[0].flatten()

View File

@@ -0,0 +1,48 @@
# Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
#
# This file is part of sunnypilot and is licensed under the MIT License.
# See the LICENSE.md file in the root directory for more details.
import pickle
import numpy as np
from pathlib import Path
from cereal import custom
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.system.hardware.hw import Paths
CUSTOM_MODEL_PATH = Paths.model_root()
METADATA_PATH = Path(__file__).parent / '../models/supercombo_metadata.pkl'
ModelManager = custom.ModelManagerSP
def get_custom_model_paths():
bundle = get_active_bundle(None)
if bundle:
drive_model = next((model for model in bundle.models if model.type == ModelManager.Type.drive), None)
metadata_model = next(model for model in bundle.models if model.type == ModelManager.Type.metadata)
if drive_model and metadata_model:
return {"model": f"{CUSTOM_MODEL_PATH}/{drive_model.fileName}", "metadata": f"{CUSTOM_MODEL_PATH}/{metadata_model.fileName}"}
return None
def load_custom_metadata():
if not (bundle := get_active_bundle(None)):
return None
metadata_model = next(model for model in bundle.models if model.type == ModelManager.Type.metadata)
metadata_path = f"{CUSTOM_MODEL_PATH}/{metadata_model.fileName}"
with open(metadata_path, 'rb') as f:
return pickle.load(f)
def prepare_inputs(model_metadata) -> dict[str, np.ndarray]:
# img buffers are managed in openCL transform code so we don't pass them as inputs
inputs: dict[str, np.ndarray] = {
key: np.zeros(shape, dtype=np.float32).flatten() # Inputs were defined flattened back then
for key, shape in model_metadata['input_shapes'].items()
if key not in ['input_imgs', 'big_input_imgs']
}
return inputs

View File

@@ -71,6 +71,7 @@ class ModelParser:
model_bundle.status = 0
model_bundle.generation = int(value["generation"])
model_bundle.environment = value["environment"]
model_bundle.is20hz = value.get("is_20hz", False)
return model_bundle

View File

@@ -21,8 +21,25 @@ async def verify_file(file_path: str, expected_hash: str) -> bool:
return sha256_hash.hexdigest().lower() == expected_hash.lower()
def get_active_bundle(params: Params) -> custom.ModelManagerSP.ModelBundle:
"""Gets the active model bundle from cache"""
def get_active_bundle(params: Params | None) -> custom.ModelManagerSP.ModelBundle:
"""
Retrieves and deserializes the active model bundle from the provided parameters.
This function attempts to extract and deserialize a model bundle from the given
`params`. If the parameter `ModelManager_ActiveBundle` is present, it is
deserialized into a `ModelBundle` object. If not, the function returns `None`.
Note: We intentionally don't set a default value for `params` to encourage
the caller to provide it explicitly. If not provided, the function will
instantiate a new `Params` object internally.
:param params: Optional. Can provide None and will instantiate it itself.
:type params: Params | None
:return: A deserialized `ModelBundle` instance if the active bundle
is found in the provided parameters, otherwise `None`.
:rtype: custom.ModelManagerSP.ModelBundle | None
"""
if params is None:
params = Params()
@@ -30,3 +47,26 @@ def get_active_bundle(params: Params) -> custom.ModelManagerSP.ModelBundle:
return messaging.log_from_bytes(active_bundle, custom.ModelManagerSP.ModelBundle)
return None
def is_active_model_20hz(params: Params | None) -> [bool | None]:
"""
Determine if the active model is operating at 20Hz.
This function evaluates the provided parameters to determine the active
bundle and checks its frequency status. If there is no active bundle,
the function will return None. If a bundle is active, the function will
return a boolean value indicating whether the model is operating at
20Hz.
:param params: System or configuration parameters used to fetch the
active bundle. The parameter can be None.
:type params: Params | None
:return: A boolean indicating if the active model is operating at 20Hz,
or None if no active bundle is found.
:rtype: bool | None
"""
if active_bundle := get_active_bundle(params):
return active_bundle.is20hz
return None