Refactoring model handling in modeld.py with ModelRunner abstraction

A significant refactoring of `modeld.py` was performed to enhance the handling of model logic. A new abstraction called `ModelRunner` has been introduced which encapsulates the model-running logic. This refactor simplifies the `modeld.py` script and provides easier management across different hardware configurations. Using this segregation, varying processing methods for models can be handled distinctly ensuring cleaner and more maintainable code. An instance of the appropriate model runner is now created during initialization based on whether a TICI hardware or a different type is used.
This commit is contained in:
DevTekVE
2024-12-29 16:57:02 +01:00
parent 5fac11efbb
commit 5a3e80fc92
3 changed files with 128 additions and 45 deletions
+17 -45
View File
@@ -1,17 +1,10 @@
#!/usr/bin/env python3
import os
from openpilot.system.hardware import TICI
from openpilot.selfdrive.modeld.runners.model_runner import create_model_runner
#
if TICI:
from tinygrad.tensor import Tensor
from tinygrad.dtype import dtypes
from openpilot.selfdrive.modeld.runners.tinygrad_helpers import qcom_tensor_from_opencl_address
os.environ['QCOM'] = '1'
else:
from openpilot.selfdrive.modeld.runners.ort_helpers import make_onnx_cpu_runner
import time
import pickle
import numpy as np
import cereal.messaging as messaging
from cereal import car, log
@@ -33,9 +26,7 @@ from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.modeld.models.commonmodel_pyx import DrivingModelFrame, CLContext
PROCESS_NAME = "selfdrive.modeld.modeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_PATH = Path(__file__).parent / 'models/supercombo.onnx'
MODEL_PKL_PATH = Path(__file__).parent / 'models/supercombo_tinygrad.pkl'
@@ -69,27 +60,18 @@ class ModelState:
'features_buffer': np.zeros((1, ModelConstants.HISTORY_BUFFER_LEN, ModelConstants.FEATURE_LEN), dtype=np.float32),
}
with open(METADATA_PATH, 'rb') as f:
model_metadata = pickle.load(f)
self.input_shapes = model_metadata['input_shapes']
self.output_slices = model_metadata['output_slices']
net_output_size = model_metadata['output_shapes']['outputs'][1]
self.output = np.zeros(net_output_size, dtype=np.float32)
# Initialize model runner
self.model_runner = create_model_runner(
model_path=MODEL_PATH,
metadata_path=METADATA_PATH,
frames=self.frames,
tinygrad_path=MODEL_PKL_PATH,
is_tici=TICI
)
self.parser = Parser()
if TICI:
self.tensor_inputs = {k: Tensor(v, device='NPY').realize() for k,v in self.numpy_inputs.items()}
with open(MODEL_PKL_PATH, "rb") as f:
self.model_run = pickle.load(f)
else:
self.onnx_cpu_runner = make_onnx_cpu_runner(MODEL_PATH)
def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in self.output_slices.items()}
if SEND_RAW_PRED:
parsed_model_outputs['raw_pred'] = model_outputs.copy()
return parsed_model_outputs
net_output_size = self.model_runner.model_metadata['output_shapes']['outputs'][1]
self.output = np.zeros(net_output_size, dtype=np.float32)
def run(self, buf: VisionBuf, wbuf: VisionBuf, transform: np.ndarray, transform_wide: np.ndarray,
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
@@ -106,24 +88,15 @@ class ModelState:
imgs_cl = {'input_imgs': self.frames['input_imgs'].prepare(buf, transform.flatten()),
'big_input_imgs': self.frames['big_input_imgs'].prepare(wbuf, transform_wide.flatten())}
if TICI:
# The imgs tensors are backed by opencl memory, only need init once
for key in imgs_cl:
if key not in self.tensor_inputs:
self.tensor_inputs[key] = qcom_tensor_from_opencl_address(imgs_cl[key].mem_address, self.input_shapes[key], dtype=dtypes.uint8)
else:
for key in imgs_cl:
self.numpy_inputs[key] = self.frames[key].buffer_from_cl(imgs_cl[key]).reshape(self.input_shapes[key])
# Prepare inputs using the model runner
prepared_inputs = self.model_runner.prepare_inputs(imgs_cl, self.numpy_inputs)
if prepare_only:
return None
if TICI:
self.output = self.model_run(**self.tensor_inputs).numpy().flatten()
else:
self.output = self.onnx_cpu_runner.run(None, self.numpy_inputs)[0].flatten()
outputs = self.parser.parse_outputs(self.slice_outputs(self.output))
# Run model inference
self.output = self.model_runner.run_model(prepared_inputs)
outputs = self.parser.parse_outputs(self.model_runner.slice_outputs(self.output))
self.full_features_20Hz[:-1] = self.full_features_20Hz[1:]
self.full_features_20Hz[-1] = outputs['hidden_state'][0, :]
@@ -190,7 +163,6 @@ def main(demo=False):
meta_main = FrameMeta()
meta_extra = FrameMeta()
if demo:
CP = get_demo_car_params()
else:
+111
View File
@@ -0,0 +1,111 @@
import os
from openpilot.system.hardware import TICI
#
if TICI:
from tinygrad.tensor import Tensor
from tinygrad.dtype import dtypes
from openpilot.selfdrive.modeld.runners.tinygrad_helpers import qcom_tensor_from_opencl_address
os.environ['QCOM'] = '1'
else:
from openpilot.selfdrive.modeld.runners.ort_helpers import make_onnx_cpu_runner
import pickle
import numpy as np
from pathlib import Path
from abc import ABC, abstractmethod
from openpilot.selfdrive.modeld.models.commonmodel_pyx import DrivingModelFrame, CLContext
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
class ModelRunner(ABC):
"""Abstract base class for model runners that defines the interface for running ML models."""
def __init__(self, model_path: Path, metadata_path: Path, frames: dict[str, DrivingModelFrame]):
"""Initialize the model runner with paths to model and metadata files."""
self.model_path = model_path
self.frames = frames
with open(metadata_path, 'rb') as f:
self.model_metadata = pickle.load(f)
self.input_shapes = self.model_metadata['input_shapes']
self.output_slices = self.model_metadata['output_slices']
@abstractmethod
def prepare_inputs(self, imgs_cl: dict[str, any], numpy_inputs: dict[str, np.ndarray]) -> dict[str, any]:
"""Prepare inputs for model inference."""
pass
@abstractmethod
def run_model(self, inputs: dict[str, any]) -> np.ndarray:
"""Run model inference with prepared inputs."""
pass
def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
"""Slice model outputs according to metadata configuration."""
parsed_outputs = {k: model_outputs[np.newaxis, v] for k, v in self.output_slices.items()}
if SEND_RAW_PRED:
parsed_outputs['raw_pred'] = model_outputs.copy()
return parsed_outputs
class TinyGradRunner(ModelRunner):
"""TinyGrad implementation of model runner for TICI hardware."""
def __init__(self, model_path: Path, metadata_path: Path, frames: dict[str, DrivingModelFrame]):
super().__init__(model_path, metadata_path, frames)
# Load TinyGrad model
with open(model_path, "rb") as f:
self.model_run = pickle.load(f)
self.tensor_inputs = {}
def prepare_inputs(self, imgs_cl: dict[str, any], numpy_inputs: dict[str, np.ndarray]) -> dict[str, any]:
# Initialize image tensors if not already done
for key in imgs_cl:
if key not in self.tensor_inputs:
self.tensor_inputs[key] = qcom_tensor_from_opencl_address(
imgs_cl[key].mem_address,
self.input_shapes[key],
dtype=dtypes.uint8
)
# Update numpy inputs
for k, v in numpy_inputs.items():
if k not in self.tensor_inputs:
self.tensor_inputs[k] = Tensor(v, device='NPY').realize()
else:
self.tensor_inputs[k].assign(v)
return self.tensor_inputs
def run_model(self, inputs: dict[str, any]) -> np.ndarray:
return self.model_run(**inputs).flatten()
class ONNXRunner(ModelRunner):
"""ONNX implementation of model runner for non-TICI hardware."""
def __init__(self, model_path: Path, metadata_path: Path, frames: dict[str, DrivingModelFrame]):
super().__init__(model_path, metadata_path, frames)
self.runner = make_onnx_cpu_runner(model_path)
def prepare_inputs(self, imgs_cl: dict[str, any], numpy_inputs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
for key in imgs_cl:
numpy_inputs[key] = self.frames[key].buffer_from_cl(imgs_cl[key]).reshape(self.input_shapes[key])
return numpy_inputs
def run_model(self, inputs: dict[str, any]) -> np.ndarray:
return self.runner.run(None, inputs)[0].flatten()
def create_model_runner(
model_path: Path,
metadata_path: Path,
frames: dict[str, DrivingModelFrame],
tinygrad_path: Path | None = None,
is_tici: bool = False
) -> ModelRunner:
"""Factory function to create appropriate model runner based on hardware."""
if is_tici:
return TinyGradRunner(tinygrad_path or model_path, metadata_path, frames)
return ONNXRunner(model_path, metadata_path, frames)