Compare commits

..

10 Commits

Author SHA1 Message Date
Jason Wen
4ba620cdd2 modeld_v2: add CUSTOM→CONST shape migration to pkl compat shim
Old tinygrad used Ops.CUSTOM for scalar shape constants promoted to
vec types. New tinygrad uses Ops.CONST for the same representation.
2026-06-06 03:42:53 -04:00
Jason Wen
b5daec54e2 modeld_v2: full pkl backwards compat shim
Handles three tinygrad pkl format migrations during deserialization:
- SLICE UOps with realized buffers (QCOM compilation artifact)
- PARAM int arg → ParamArg dataclass
- PERMUTE(NOOP, CONST(vec)) → RESHAPE(NOOP, STACK(CONST...)) shape repr

Also makes action_t conditional in input queues for old models.
2026-06-06 03:39:53 -04:00
Jason Wen
dded76d28e Merge remote-tracking branch 'sunnypilot/sunnypilot/master' into deep-model-wee 2026-06-06 03:15:37 -04:00
Jason Wen
41c7f23a74 send it 2026-06-06 03:07:38 -04:00
Jason Wen
69749f400f modeld_v2: add pkl compat shim for SLICE buffer and PARAM migration
Monkey-patches UOpMetaClass.__call__ during pickle.loads() to handle:
- SLICE UOps with realized buffers (both old and new tinygrad pkls)
- PARAM UOps with int arg (old pkl format, pre-ParamArg)

Zero runtime overhead — shim only active during deserialization.
2026-06-06 03:04:57 -04:00
Jason Wen
c7f770d29a modeld_v2: adapt for deep model support
Parse action MDN output in split parser for deep model on_policy.

Only suppress off_policy plan when on_policy has its own plan output.
Deep models have on_policy producing action (no plan), so off_policy
plan must be kept.
2026-06-05 18:36:27 -04:00
Harald Schäfer
e405157bdb Op model16 deep (#38073)
* modeld: RL driving model with 3-file split

Split the driving model into vision + off_policy + on_policy ONNX
files and wire up the RL policy:

- 3-file model split (vision / off_policy / on_policy), replacing the
  combined big_driving_policy/vision models
- compiler updates for the split models
- actually consume the policy action in modeld
- add desire state to the driving model
- model iterations (smoothness, off/on-policy weight updates)

* modeld: update driving model

* 1e72cf5a-785f-45ea-888f-28cdb14785de/100

* tinygrad hack

* fix parsing

* looser timing

* big

* Remove unnecessary modeld rebase changes

* Tighten modeld split cleanup

---------

Co-authored-by: Comma Device <device@comma.ai>
Co-authored-by: Armandpl <adpl33@gmail.com>
2026-06-05 18:34:10 -04:00
Harald Schäfer
bdb6c15753 Refactor compile_modeld model setup (#38128) 2026-06-05 18:33:56 -04:00
Jason Wen
798d1836b2 tinygrad: bump to synced master + IMAGE hack for deep models
Bumps tinygrad submodule to sunnypilot/tinygrad master (synced with
upstream 556defa0f) plus comma's IMAGE hack (cherry-picked from
fd992d668) needed for deep model compilation with IMAGE=1.
2026-06-05 18:33:51 -04:00
Jason Wen
6dffdcca0b modeld_v2 prereqs deep models 2026-06-05 17:37:37 -04:00
38 changed files with 1755 additions and 546 deletions

View File

@@ -12,11 +12,11 @@ on:
required: false
type: string
recompiled_dir:
description: 'Existing recompiled directory number (e.g. 1 for recompiled1)'
description: 'Existing recompiled directory number (e.g. 3 for recompiled3)'
required: true
type: string
json_version:
description: 'driving_models version number to update (e.g. 18 for driving_models_v18.json)'
description: 'driving_models version number to update (e.g. 5 for driving_models_v5.json)'
required: true
type: string
artifact_suffix:
@@ -63,11 +63,12 @@ on:
default: 'None'
options:
- None
- Master Models
- Release Models
- 2025 World Models
- 2026 World Models
- Simple Plan Models
- Space Lab Models
- TR Models
- DTR Models
- Custom Merge Models
- FOF series models
- Other
custom_model_folder:
description: 'Custom model folder name (if "Other" selected)'

View File

@@ -30,11 +30,6 @@ on:
required: false
type: string
default: ''
target_hardware:
description: 'Hardware target to compile for (qcom or usbgpu)'
required: false
type: string
default: 'qcom'
workflow_dispatch:
inputs:
upstream_branch:
@@ -51,14 +46,6 @@ on:
required: false
type: boolean
default: true
target_hardware:
description: 'Hardware target to compile for'
required: true
type: choice
options:
- qcom
- usbgpu
default: 'qcom'
run-name: Build model [${{ inputs.custom_name || inputs.upstream_branch }}] from ref [${{ inputs.upstream_branch }}]
@@ -182,17 +169,7 @@ jobs:
COMPILE_MODELD="${{ github.workspace }}/sunnypilot/modeld_v2/compile_modeld.py"
MODEL_SIZE=$(python3 -c "from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE as s; print(f'{s[0]}x{s[1]}')")
CAMERA_RES=$(python3 -c "from openpilot.common.transformations.camera import _ar_ox_fisheye as a, _os_fisheye as o; print(f'{a.width}x{a.height} {o.width}x{o.height}')")
if [ "${{ inputs.target_hardware }}" == "usbgpu" ]; then
echo "USBGPU build"
export USBGPU=1
TG_FLAGS="DEV=AMD USBGPU=1 IMAGE=1 FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1"
OUTPUT_PKL="${{ env.MODELS_DIR }}/big_driving_tinygrad.pkl"
else
echo "QCOM build"
TG_FLAGS="DEV=QCOM IMAGE=1 FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1"
OUTPUT_PKL="${{ env.MODELS_DIR }}/driving_tinygrad.pkl"
fi
TG_FLAGS="DEV=QCOM IMAGE=1 FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1"
# Generate metadata for all ONNX files
find "${{ env.MODELS_DIR }}" -maxdepth 1 -name '*.onnx' | while IFS= read -r onnx_file; do
@@ -226,13 +203,13 @@ jobs:
fi
if [ -n "$MODEL_TYPE" ]; then
echo "Detected: $MODEL_TYPE -> $OUTPUT_PKL"
echo "Detected: $MODEL_TYPE -> driving_tinygrad.pkl"
env ${TG_FLAGS} python3 "$COMPILE_MODELD" \
--model-type $MODEL_TYPE \
--model-size $MODEL_SIZE \
--camera-resolutions $CAMERA_RES \
$ONNX_ARGS \
--output "$OUTPUT_PKL"
--output "${{ env.MODELS_DIR }}/driving_tinygrad.pkl"
fi
- name: Validate Model Outputs

View File

@@ -137,16 +137,10 @@ struct ModelManagerSP @0xaedffd8f31e7b55d {
eta @2 :UInt32;
}
struct Chunk {
fileName @0 :Text;
sha256 @1 :Text;
}
struct Artifact {
fileName @0 :Text;
downloadUri @1 :DownloadUri;
downloadProgress @2 :DownloadProgress;
chunks @3 :List(Chunk);
}
struct Model {

View File

@@ -53,7 +53,7 @@ def validate_model_outputs(metadata_paths: list[Path]) -> None:
print(f"Optional output keys detected: {detected_optional}")
def create_short_name(full_name: str) -> str:
def create_short_name(full_name):
# Remove parentheses and extract alphanumeric words
clean_name = re.sub(r'\([^)]*\)', '', full_name)
words = [re.sub(r'[^a-zA-Z0-9]', '', word) for word in clean_name.split() if re.sub(r'[^a-zA-Z0-9]', '', word)]
@@ -121,7 +121,7 @@ def _rename_pkl_with_chunks(old_pkl: Path, new_pkl: Path) -> Path:
return old_pkl.rename(new_pkl)
def generate_metadata(model_path: Path, output_dir: Path, short_name: str, driving_pkl: Path) -> dict | None:
def generate_metadata(model_path: Path, output_dir: Path, short_name: str, driving_pkl: Path):
base = model_path.stem
metadata_file = output_dir / f"{base}_metadata.pkl"
@@ -134,7 +134,7 @@ def generate_metadata(model_path: Path, output_dir: Path, short_name: str, drivi
if not metadata_file.exists():
print(f"Warning: Missing metadata for {base} ({metadata_file}), skipping", file=sys.stderr)
return None
return
tinygrad_hash = hashlib.sha256(_read_pkl_bytes(driving_pkl)).hexdigest()
@@ -143,33 +143,15 @@ def generate_metadata(model_path: Path, output_dir: Path, short_name: str, drivi
model_type = "offPolicy" if "off_policy" in base else "onPolicy" if "on_policy" in base else base.split("_")[-1]
chunks_config = []
manifest_file = Path(f"{driving_pkl}.chunkmanifest")
if manifest_file.exists():
num_chunks = int(manifest_file.read_text().strip())
for i in range(num_chunks):
chunk_path = Path(f"{driving_pkl}.chunk{i + 1:02d}of{num_chunks:02d}")
if chunk_path.exists():
chunk_hash = hashlib.sha256(chunk_path.read_bytes()).hexdigest()
chunks_config.append({
"file_name": chunk_path.name,
"sha256": chunk_hash
})
artifact_data = {
"file_name": driving_pkl.name,
"download_uri": {
"url": "https://gitlab.com/sunnypilot/public/docs.sunnypilot.ai/-/raw/main/",
"sha256": tinygrad_hash
}
}
if chunks_config:
artifact_data["chunks"] = chunks_config
return {
"type": model_type,
"artifact": artifact_data,
"artifact": {
"file_name": driving_pkl.name,
"download_uri": {
"url": "https://gitlab.com/sunnypilot/public/docs.sunnypilot.ai/-/raw/main/",
"sha256": tinygrad_hash
}
},
"metadata": {
"file_name": metadata_file.name,
"download_uri": {
@@ -180,8 +162,8 @@ def generate_metadata(model_path: Path, output_dir: Path, short_name: str, drivi
}
def create_metadata_json(models: list, output_dir: Path, custom_name=None, short_name=None, is_20hz=False, upstream_branch="unknown") -> None:
bundle_json = {
def create_metadata_json(models: list, output_dir: Path, custom_name=None, short_name=None, is_20hz=False, upstream_branch="unknown"):
metadata_json = {
"short_name": short_name,
"display_name": custom_name or upstream_branch,
"is_20hz": is_20hz,
@@ -197,10 +179,6 @@ def create_metadata_json(models: list, output_dir: Path, custom_name=None, short
}
# Write metadata to output_dir
metadata_json = {
"bundles": [bundle_json]
}
with open(output_dir / "metadata.json", "w") as f:
json.dump(metadata_json, f, indent=2)

View File

@@ -87,13 +87,14 @@ frame_skip = ModelConstants.MODEL_RUN_FREQ // ModelConstants.MODEL_CONTEXT_FREQ
for usbgpu in [False, True] if USBGPU else [False]:
target_pkl_path = File(modeld_pkl_path(usbgpu)).abspath
file_prefix, cmd_flags = ('big_', usbgpu_tg_flags) if usbgpu else ('', tg_flags)
driving_onnx_deps = [p for m in [f'{file_prefix}driving_vision', f'{file_prefix}driving_on_policy']
driving_onnx_deps = [p for m in [f'{file_prefix}driving_vision', f'{file_prefix}driving_on_policy', f'{file_prefix}driving_off_policy']
for p in get_existing_chunks(File(f"models/{m}.onnx").abspath)]
camera_res_args = ' '.join(f'{cw}x{ch}' for cw, ch in CAMERA_CONFIGS)
cmd = (f'{cmd_flags} {mac_brew_string} python3 {modeld_dir}/compile_modeld.py '
f'--model-size {model_w}x{model_h} '
f'--camera-resolutions {camera_res_args} '
f'--vision-onnx {File(f"models/{file_prefix}driving_vision.onnx").abspath} '
f'--off-policy-onnx {File(f"models/{file_prefix}driving_off_policy.onnx").abspath} '
f'--on-policy-onnx {File(f"models/{file_prefix}driving_on_policy.onnx").abspath} '
f'--output {target_pkl_path} --frame-skip {frame_skip}')
onnx_sizes_sum = sum(os.path.getsize(f) for f in driving_onnx_deps)

View File

@@ -5,7 +5,7 @@ import os
import pickle
import time
from functools import partial
from collections import namedtuple, defaultdict
from collections import namedtuple
import numpy as np
@@ -113,31 +113,43 @@ def make_frame_prepare(nv12: NV12Frame, model_w, model_h):
return frame_prepare_tinygrad
def make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, device):
def make_warp_input_queues(vision_input_shapes, frame_skip, device):
img = vision_input_shapes['img'] # (1, 12, 128, 256)
n_frames = img[1] // 6
img_buf_shape = (frame_skip * (n_frames - 1) + 1, 6, img[2], img[3])
npy = {
'tfm': np.zeros((3, 3), dtype=np.float32),
'big_tfm': np.zeros((3, 3), dtype=np.float32),
}
input_queues = {
'img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
'big_img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
**{k: Tensor(v, device='NPY').realize() for k, v in npy.items()},
}
return input_queues, npy
def make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, device):
input_queues, npy = make_warp_input_queues(vision_input_shapes, frame_skip, device)
fb = policy_input_shapes['features_buffer'] # (1, 25, 512)
dp = policy_input_shapes['desire_pulse'] # (1, 25, 8)
tc = policy_input_shapes['traffic_convention'] # (1, 2)
#TODO action_t is hardcoded to match tc for future compatibility
at = tc
npy = {
policy_npy = {
'desire': np.zeros(dp[2], dtype=np.float32),
'traffic_convention': np.zeros(tc, dtype=np.float32),
'tfm': np.zeros((3, 3), dtype=np.float32),
'big_tfm': np.zeros((3, 3), dtype=np.float32),
'action_t': np.zeros(at, dtype=np.float32),
}
input_queues = {
'img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
'big_img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
npy.update(policy_npy)
input_queues.update({
'feat_q': Tensor(np.zeros((frame_skip * (fb[1] - 1) + 1, fb[0], fb[2]), dtype=np.float32), device=device).contiguous().realize(),
'desire_q': Tensor(np.zeros((frame_skip * dp[1], dp[0], dp[2]), dtype=np.float32), device=device).contiguous().realize(),
**{k: Tensor(v, device='NPY').realize() for k, v in npy.items()},
}
**{k: Tensor(v, device='NPY').realize() for k, v in policy_npy.items()},
})
return input_queues, npy
@@ -171,9 +183,10 @@ def make_warp(nv12, model_w, model_h, frame_skip):
return warp_enqueue
def make_run_policy(vision_runner, on_policy_runner, vision_features_slice, frame_skip):
def make_run_policy(model_runners, model_metadata, frame_skip):
sample_desire_fn = partial(sample_desire, frame_skip=frame_skip)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
vision_features_slice = model_metadata['vision']['output_slices']['hidden_state']
def run_policy(img, big_img, feat_q, desire_q, desire, traffic_convention, action_t):
desire = desire.to(Device.DEFAULT)
@@ -181,7 +194,7 @@ def make_run_policy(vision_runner, on_policy_runner, vision_features_slice, fram
action_t = action_t.to(Device.DEFAULT)
Tensor.realize(desire, traffic_convention, action_t)
desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
vision_out = next(iter(vision_runner({'img': img, 'big_img': big_img}).values())).cast('float32')
vision_out = next(iter(model_runners['vision']({'img': img, 'big_img': big_img}).values())).cast('float32')
new_feat = vision_out[:, vision_features_slice].reshape(1, -1).unsqueeze(0)
feat_buf = shift_and_sample(feat_q, new_feat, sample_skip_fn)
@@ -192,20 +205,16 @@ def make_run_policy(vision_runner, on_policy_runner, vision_features_slice, fram
'traffic_convention': traffic_convention,
'action_t': action_t,
}
on_policy_out = next(iter(on_policy_runner(inputs).values())).cast('float32')
#off_policy_out = next(iter(off_policy_runner(inputs).values())).cast('float32')
return vision_out, on_policy_out
on_policy_out = next(iter(model_runners['on_policy'](inputs).values())).cast('float32')
off_policy_out = next(iter(model_runners['off_policy'](inputs).values())).cast('float32')
return vision_out, on_policy_out, off_policy_out
return run_policy
def compile_jit(jit, make_random_inputs, input_keys, frame_skip, vision_metadata, policy_metadata):
vision_input_shapes = vision_metadata['input_shapes']
policy_input_shapes = policy_metadata['input_shapes']
def compile_jit(jit, make_random_inputs, input_keys, make_queues):
SEED = 42
def random_inputs_run(fn, seed, test_val=None, test_buffers=None, expect_match=True):
input_queues, npy = make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, Device.DEFAULT)
input_queues, npy = make_queues(Device.DEFAULT)
np.random.seed(seed)
Tensor.manual_seed(seed)
@@ -269,30 +278,38 @@ if __name__ == "__main__":
p.add_argument('--camera-resolutions', type=_parse_size, nargs='+', required=True,
help='camera resolutions WxH (one or more)')
p.add_argument('--vision-onnx', required=True)
p.add_argument('--off-policy-onnx', required=True)
p.add_argument('--on-policy-onnx', required=True)
p.add_argument('--output', required=True)
p.add_argument('--frame-skip', type=int, required=True)
args = p.parse_args()
out = defaultdict(dict)
vision_path, on_policy_path = read_file_chunked_to_shm(args.vision_onnx), read_file_chunked_to_shm(args.on_policy_onnx)
model_paths = {
'vision': read_file_chunked_to_shm(args.vision_onnx),
'off_policy': read_file_chunked_to_shm(args.off_policy_onnx),
'on_policy': read_file_chunked_to_shm(args.on_policy_onnx),
}
model_w, model_h = args.model_size
vision_runner = OnnxRunner(vision_path)
on_policy_runner = OnnxRunner(on_policy_path)
vision_metadata, on_policy_metadata = make_metadata_dict(vision_path), make_metadata_dict(on_policy_path)
model_runners = {name: OnnxRunner(path) for name, path in model_paths.items()}
out = {'metadata': {name: make_metadata_dict(path) for name, path in model_paths.items()}}
run_policy_jit = TinyJit(make_run_policy(vision_runner, on_policy_runner, vision_metadata['output_slices']['hidden_state'], args.frame_skip), prune=True)
out['metadata']['vision'], out['metadata']['on_policy'] = vision_metadata, on_policy_metadata
assert out['metadata']['off_policy']['input_shapes'] == out['metadata']['on_policy']['input_shapes']
make_random_model_inputs = partial(make_random_images, keys=['img', 'big_img'], shape=vision_metadata['input_shapes']['img'])
out['run_policy'] = compile_jit(run_policy_jit, make_random_model_inputs, POLICY_INPUTS, args.frame_skip, vision_metadata, on_policy_metadata)
run_policy_jit = TinyJit(make_run_policy(model_runners, out['metadata'], args.frame_skip), prune=True)
make_policy_queues = partial(make_input_queues, out['metadata']['vision']['input_shapes'],
out['metadata']['on_policy']['input_shapes'], args.frame_skip)
make_random_model_inputs = partial(make_random_images, keys=['img', 'big_img'], shape=out['metadata']['vision']['input_shapes']['img'])
out['run_policy'] = compile_jit(run_policy_jit, make_random_model_inputs, POLICY_INPUTS,
make_policy_queues)
for cam_w, cam_h in args.camera_resolutions:
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
make_random_warp_inputs = partial(make_random_images, keys=['frame', 'big_frame'], shape=nv12.size, device=WARP_DEV)
warp_enqueue = TinyJit(make_warp(nv12, model_w, model_h, args.frame_skip), prune=True)
out[(cam_w,cam_h)] = compile_jit(warp_enqueue, make_random_warp_inputs, WARP_INPUTS, args.frame_skip, vision_metadata, on_policy_metadata)
make_warp_queues = partial(make_warp_input_queues, out['metadata']['vision']['input_shapes'], args.frame_skip)
out[(cam_w,cam_h)] = compile_jit(warp_enqueue, make_random_warp_inputs, WARP_INPUTS, make_warp_queues)
with open(args.output, "wb") as f:
pickle.dump(out, f)

View File

@@ -89,6 +89,9 @@ class ModelState(ModelStateBase):
self.vision_input_names = list(self.vision_input_shapes.keys())
self.vision_output_slices = vision_metadata['output_slices']
off_policy_metadata = jits['metadata']['off_policy']
self.off_policy_output_slices = off_policy_metadata['output_slices']
policy_metadata = jits['metadata']['on_policy']
self.policy_input_shapes = policy_metadata['input_shapes']
self.policy_output_slices = policy_metadata['output_slices']
@@ -133,18 +136,20 @@ class ModelState(ModelStateBase):
if prepare_only:
return None
vision_output, on_policy_output = self.run_policy(
**{k: self.input_queues[k] for k in POLICY_INPUTS}, img=img, big_img=big_img
vision_output, on_policy_output, off_policy_output = self.run_policy(
**{k: self.input_queues[k] for k in POLICY_INPUTS if k in self.input_queues}, img=img, big_img=big_img
)
vision_output = vision_output.numpy().flatten()
off_policy_output = off_policy_output.numpy().flatten()
on_policy_output = on_policy_output.numpy().flatten()
vision_outputs_dict = self.parser.parse_vision_outputs(self.slice_outputs(vision_output, self.vision_output_slices))
off_policy_outputs_dict = self.parser.parse_off_policy_outputs(self.slice_outputs(off_policy_output, self.off_policy_output_slices))
policy_outputs_dict = self.parser.parse_policy_outputs(self.slice_outputs(on_policy_output, self.policy_output_slices))
combined_outputs_dict = {**vision_outputs_dict, **policy_outputs_dict}
combined_outputs_dict = {**vision_outputs_dict, **off_policy_outputs_dict, **policy_outputs_dict}
if SEND_RAW_PRED:
combined_outputs_dict['raw_pred'] = np.concatenate([vision_output.copy(), on_policy_output.copy()])
combined_outputs_dict['raw_pred'] = np.concatenate([vision_output.copy(), on_policy_output.copy(), off_policy_output.copy()])
return combined_outputs_dict

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:8a26866121d1d3a1152bfce024ed7584b8569507d120d4bc8917320093dcd31a
size 41191256

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:565e53c38dcd64c50dd3fe4d5ee1530213aeefd66c3f6b67ea6a72a32612a6bf
size 14061419
oid sha256:94b07ef7a0f65d5c41ac696b4ae7bdc59e2d4c5f504460e2b0d720620892c2e8
size 33679037

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:1f0cab5033fe9e3bc5e174a2e790fa277f7d9fc44c65822d734064d2f899a9a0
size 296203378
oid sha256:eda005282417ffa825092ece5c16b5584142044cdbcf15b6d0246136ac6db601
size 120584466

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:6173be8a69b1d9633a09969c80b2a8bd990bfe7d3e76e192a0e537f6fd72222b
size 41192485

View File

@@ -96,11 +96,17 @@ class Parser:
self.parse_mdn('pose', outs, in_N=0, out_N=0, out_shape=(ModelConstants.POSE_WIDTH,))
self.parse_mdn('wide_from_device_euler', outs, in_N=0, out_N=0, out_shape=(ModelConstants.WIDE_FROM_DEVICE_WIDTH,))
self.parse_mdn('road_transform', outs, in_N=0, out_N=0, out_shape=(ModelConstants.POSE_WIDTH,))
self.parse_categorical_crossentropy('desire_pred', outs, out_shape=(ModelConstants.DESIRE_PRED_LEN,ModelConstants.DESIRE_PRED_WIDTH))
self.parse_binary_crossentropy('meta', outs)
self.parse_mdn('lane_lines', outs, in_N=0, out_N=0, out_shape=(ModelConstants.NUM_LANE_LINES,ModelConstants.IDX_N,ModelConstants.LANE_LINES_WIDTH))
self.parse_mdn('road_edges', outs, in_N=0, out_N=0, out_shape=(ModelConstants.NUM_ROAD_EDGES,ModelConstants.IDX_N,ModelConstants.LANE_LINES_WIDTH))
self.parse_binary_crossentropy('lane_lines_prob', outs)
self.parse_categorical_crossentropy('desire_pred', outs, out_shape=(ModelConstants.DESIRE_PRED_LEN,ModelConstants.DESIRE_PRED_WIDTH))
self.parse_binary_crossentropy('meta', outs)
return outs
def parse_off_policy_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
plan_mhp = self.is_mhp(outs, 'plan', ModelConstants.IDX_N * ModelConstants.PLAN_WIDTH)
plan_in_N, plan_out_N = (ModelConstants.PLAN_MHP_N, ModelConstants.PLAN_MHP_SELECTION) if plan_mhp else (0, 0)
self.parse_mdn('plan', outs, in_N=plan_in_N, out_N=plan_out_N, out_shape=(ModelConstants.IDX_N, ModelConstants.PLAN_WIDTH))
self.parse_binary_crossentropy('lead_prob', outs)
lead_mhp = self.is_mhp(outs, 'lead', ModelConstants.LEAD_MHP_SELECTION * ModelConstants.LEAD_TRAJ_LEN * ModelConstants.LEAD_WIDTH)
lead_in_N, lead_out_N = (ModelConstants.LEAD_MHP_N, ModelConstants.LEAD_MHP_SELECTION) if lead_mhp else (0, 0)
@@ -110,11 +116,11 @@ class Parser:
return outs
def parse_policy_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
self.parse_mdn('plan', outs, in_N=0, out_N=0, out_shape=(ModelConstants.IDX_N, ModelConstants.PLAN_WIDTH))
self.parse_categorical_crossentropy('desire_state', outs, out_shape=(ModelConstants.DESIRE_PRED_WIDTH,))
self.parse_mdn('action', outs, in_N=0, out_N=0, out_shape=(ModelConstants.ACTION_WIDTH,))
return outs
def parse_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
outs = self.parse_vision_outputs(outs)
outs = self.parse_off_policy_outputs(outs)
outs = self.parse_policy_outputs(outs)
return outs

View File

@@ -34,7 +34,7 @@ GITHUB = GithubUtils(API_TOKEN, DATA_TOKEN)
EXEC_TIMINGS = [
# model, instant max, average max
("modelV2", 0.05, 0.028),
("modelV2", 0.05, 0.032),
("driverStateV2", 0.05, 0.018),
]

View File

@@ -10,291 +10,473 @@ import argparse
import os
import pickle
import time
from collections import defaultdict
from functools import partial
from collections import defaultdict
import numpy as np
os.environ['GMMU'] = '0'
def _patch_tinygrad_fetch_fw():
import hashlib
import pathlib
import zstandard
from tinygrad import helpers
_orig_fetch_fw = helpers.fetch_fw
def fetch_fw(path, name, sha256):
p = pathlib.Path(f"/lib/firmware/{path}/{name}.zst")
if p.is_file():
blob = zstandard.ZstdDecompressor().stream_reader(p.read_bytes()).read()
if hashlib.sha256(blob).hexdigest() == sha256:
return blob
return _orig_fetch_fw(path, name, sha256)
helpers.fetch_fw = fetch_fw
_patch_tinygrad_fetch_fw()
from openpilot.selfdrive.modeld.compile_modeld import NV12Frame, make_frame_prepare, sample_desire, sample_skip, shift_and_sample
from tinygrad import dtypes
from tinygrad.tensor import Tensor
from tinygrad.device import Device
from tinygrad.engine.jit import TinyJit
from tinygrad.tensor import Tensor
from openpilot.selfdrive.modeld.compile_modeld import (
NV12Frame, make_frame_prepare,
shift_and_sample, sample_skip, sample_desire,
)
MODEL_TYPES = ('vision_policy', 'supercombo', 'vision_multi_policy')
def _detect_desire_key(shapes: dict) -> str | None:
return next((key for key in shapes if key.startswith('desire')), None)
def _detect_desire_key(policy_input_shapes):
for k in policy_input_shapes:
if k.startswith('desire'):
return k
return None
def _detect_vision_keys(shapes: dict) -> tuple[str | None, str | None]:
img_keys = sorted(key for key in shapes if 'img' in key)
return (
next((key for key in img_keys if 'big' not in key), None),
next((key for key in img_keys if 'big' in key), None)
)
def _detect_vision_keys(vision_input_shapes):
img_keys = sorted([k for k in vision_input_shapes if 'img' in k])
road_key = next((k for k in img_keys if 'big' not in k), None)
wide_key = next((k for k in img_keys if 'big' in k), None)
if road_key is None or wide_key is None:
raise ValueError(f"Cannot determine road/wide image keys from {list(vision_input_shapes.keys())}")
return road_key, wide_key
def derive_frame_skip(vision_input_shapes: dict, policy_input_shapes: dict) -> int:
features_buffer = policy_input_shapes.get('features_buffer')
return 1 if not features_buffer or features_buffer[1] >= 99 else 4
def make_split_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, device):
road_key, _ = _detect_vision_keys(vision_input_shapes)
img = vision_input_shapes[road_key]
n_frames = img[1] // 6
img_buf_shape = (frame_skip * (n_frames - 1) + 1, 6, img[2], img[3])
fb = policy_input_shapes['features_buffer']
desire_key = _detect_desire_key(policy_input_shapes)
dp = policy_input_shapes[desire_key]
tc = policy_input_shapes.get('traffic_convention', (1, 2))
def generate_queues_and_npy(input_shapes: dict, frame_skip: int, device: str = Device.DEFAULT) -> tuple[dict, dict]:
road_key, _ = _detect_vision_keys(input_shapes)
if not road_key:
raise ValueError("Vision road key missing from input shapes.")
img_shape = input_shapes[road_key]
n_frames = img_shape[1] // 6
img_buf_shape = (frame_skip * (n_frames - 1) + 1, 6, img_shape[2], img_shape[3])
desire_key = _detect_desire_key(input_shapes)
if not desire_key:
raise ValueError("Desire key missing from input shapes.")
desire_shape = input_shapes[desire_key]
features_buffer = input_shapes.get('features_buffer')
npy_arrays = {
'desire': np.zeros(desire_shape[2], dtype=np.float32),
npy = {
'desire': np.zeros(dp[2], dtype=np.float32),
'traffic_convention': np.zeros(tc, dtype=np.float32),
'tfm': np.zeros((3, 3), dtype=np.float32),
'big_tfm': np.zeros((3, 3), dtype=np.float32)
'big_tfm': np.zeros((3, 3), dtype=np.float32),
}
if 'action_t' in policy_input_shapes:
npy['action_t'] = np.zeros(tc, dtype=np.float32)
for key, shape in input_shapes.items():
if key not in npy_arrays and 'img' not in key and key not in ('features_buffer', desire_key):
npy_arrays[key] = np.zeros(shape, dtype=np.float32)
handled = {'features_buffer', desire_key, 'traffic_convention', 'action_t'}
for key, shape in policy_input_shapes.items():
if key in handled:
continue
npy[key] = np.zeros(shape, dtype=np.float32)
queues = {
input_queues = {
'img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
'big_img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
'desire_q': Tensor(np.zeros((frame_skip * desire_shape[1], desire_shape[0], desire_shape[2]),
dtype=np.float32), device=device).contiguous().realize()
'feat_q': Tensor(np.zeros((frame_skip * (fb[1] - 1) + 1, fb[0], fb[2]), dtype=np.float32), device=device).contiguous().realize(),
'desire_q': Tensor(np.zeros((frame_skip * dp[1], dp[0], dp[2]), dtype=np.float32), device=device).contiguous().realize(),
**{k: Tensor(v, device='NPY').realize() for k, v in npy.items()},
}
if features_buffer:
queues['feat_q'] = Tensor(np.zeros((frame_skip * (features_buffer[1] - 1) + 1, features_buffer[0], features_buffer[2]),
dtype=np.float32), device=device).contiguous().realize()
queues.update({key: Tensor(value, device='NPY').realize() for key, value in npy_arrays.items()})
return queues, npy_arrays
return input_queues, npy
def make_split_input_queues(vision_input_shapes: dict, policy_input_shapes: dict, frame_skip: int, device: str = Device.DEFAULT) -> tuple[dict, dict]:
return generate_queues_and_npy({**vision_input_shapes, **policy_input_shapes}, frame_skip, device)
def make_supercombo_input_queues(input_shapes: dict, frame_skip: int, device: str = Device.DEFAULT) -> tuple[dict, dict]:
return generate_queues_and_npy(input_shapes, frame_skip, device)
def create_jit_runner(vision_runner, policy_runners: list, nv12: NV12Frame, model_size: tuple[int, int],
features_slice: slice, frame_skip: int, input_shapes: dict, prepare_only: bool):
frame_prepare = make_frame_prepare(nv12, *model_size)
def make_run_split_policy(vision_runner, policy_runner, nv12: NV12Frame, model_w, model_h,
vision_features_slice, frame_skip, desire_key, extra_policy_keys,
vision_road_key, vision_wide_key, prepare_only=False):
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
sample_desire_fn = partial(sample_desire, frame_skip=frame_skip)
desire_key = _detect_desire_key(input_shapes)
road_key, wide_key = _detect_vision_keys(input_shapes)
def run_policy(img_q, big_img_q, feat_q, desire_q, desire, traffic_convention, tfm, big_tfm, frame, big_frame, **extra):
npy_tensors = [tfm.to(Device.DEFAULT), big_tfm.to(Device.DEFAULT),
desire.to(Device.DEFAULT), traffic_convention.to(Device.DEFAULT)]
extra_device = {k: extra[k].to(Device.DEFAULT) for k in extra_policy_keys}
Tensor.realize(*npy_tensors, *extra_device.values())
tfm, big_tfm, desire, traffic_convention = npy_tensors
if not desire_key or not road_key or not wide_key:
raise ValueError("Missing required vision or desire keys in input shapes.")
extra_keys = [key for key in input_shapes if key not in (desire_key, 'features_buffer', 'traffic_convention') and 'img' not in key]
def runner(img_q, big_img_q, feat_q, frame, big_frame, tfm, big_tfm, **kwargs):
desire_q = kwargs['desire_q']
desire = kwargs['desire']
traffic_convention = kwargs.get('traffic_convention')
npys = [tfm.to(Device.DEFAULT), big_tfm.to(Device.DEFAULT), desire.to(Device.DEFAULT)]
if traffic_convention is not None:
npys.append(traffic_convention.to(Device.DEFAULT))
extra_tensors = {key: kwargs[key].to(Device.DEFAULT) for key in extra_keys if key in kwargs}
Tensor.realize(*npys, *extra_tensors.values())
tfm_dev, big_tfm_dev, desire_dev = npys[:3]
traffic_conv_dev = npys[3] if traffic_convention is not None else None
img = shift_and_sample(img_q, frame_prepare(frame, tfm_dev).unsqueeze(0), sample_skip_fn)
big_img = shift_and_sample(big_img_q, frame_prepare(big_frame, big_tfm_dev).unsqueeze(0), sample_skip_fn)
img = shift_and_sample(img_q, frame_prepare(frame, tfm).unsqueeze(0), sample_skip_fn)
big_img = shift_and_sample(big_img_q, frame_prepare(big_frame, big_tfm).unsqueeze(0), sample_skip_fn)
if prepare_only:
return img, big_img
desire_buf = shift_and_sample(desire_q, desire_dev.reshape(1, 1, -1), sample_desire_fn)
inputs = {desire_key: desire_buf, **extra_tensors}
vision_out = next(iter(vision_runner({vision_road_key: img, vision_wide_key: big_img}).values())).cast('float32')
if traffic_conv_dev is not None:
inputs['traffic_convention'] = traffic_conv_dev
new_feat = vision_out[:, vision_features_slice].reshape(1, -1).unsqueeze(0)
feat_buf = shift_and_sample(feat_q, new_feat, sample_skip_fn)
desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
if vision_runner:
vision_out = next(iter(vision_runner({road_key: img, wide_key: big_img}).values()))
vision_out_cast = vision_out.cast('float32')
new_feat = vision_out_cast[:, features_slice].reshape(1, -1).unsqueeze(0)
inputs['features_buffer'] = shift_and_sample(feat_q, new_feat, sample_skip_fn).realize()
policy_outs = [next(iter(pol_runner(inputs).values())).cast('float32') for pol_runner in policy_runners]
return (vision_out_cast, *policy_outs) if len(policy_outs) > 1 else (vision_out_cast, policy_outs[0])
inputs.update({road_key: img, wide_key: big_img, 'features_buffer': sample_skip_fn(feat_q)})
policy_out = next(iter(policy_runners[0](inputs).values())).cast('float32')
new_feat = policy_out[:, features_slice].reshape(1, -1).unsqueeze(0)
shift_and_sample(feat_q, new_feat, sample_skip_fn).realize()
return policy_out
inputs = {'features_buffer': feat_buf, desire_key: desire_buf, 'traffic_convention': traffic_convention, **extra_device}
policy_out = next(iter(policy_runner(inputs).values())).cast('float32')
return runner
return vision_out, policy_out
return run_policy
def compile_and_warmup(nv12: NV12Frame, model_size: tuple[int, int], prepare_only: bool, frame_skip: int, vision_runner, policy_runners: list, metadata: dict):
print(f"Compiling combined JIT for {nv12.width}x{nv12.height} (prepare_only={prepare_only})...")
def compile_split_policy(nv12: NV12Frame, model_w, model_h, prepare_only, frame_skip,
vision_runner, policy_runner, vision_metadata, policy_metadata):
print(f"Compiling combined policy JIT for {nv12.width}x{nv12.height} (prepare_only={prepare_only})...")
all_shapes = {key: value for meta in metadata.values() for key, value in meta['input_shapes'].items()}
vision_features_slice = vision_metadata['output_slices']['hidden_state']
vision_input_shapes = vision_metadata['input_shapes']
policy_input_shapes = policy_metadata['input_shapes']
desire_key = _detect_desire_key(policy_input_shapes)
extra_policy_keys = [k for k in policy_input_shapes if k not in ('features_buffer', desire_key, 'traffic_convention')]
vision_road_key, vision_wide_key = _detect_vision_keys(vision_input_shapes)
feat_meta = metadata.get('vision') or metadata.get('model') or metadata.get('policy')
if not feat_meta:
raise ValueError("Could not find vision, model, or policy metadata.")
_run = make_run_split_policy(vision_runner, policy_runner, nv12, model_w, model_h,
vision_features_slice, frame_skip, desire_key, extra_policy_keys,
vision_road_key, vision_wide_key, prepare_only)
run_policy_jit = TinyJit(_run, prune=True)
features_slice = feat_meta['output_slices']['hidden_state']
WARP_DEV = 'CPU' if "USBGPU" in os.environ else Device.DEFAULT
SEED = 42
run_func = create_jit_runner(vision_runner, policy_runners, nv12, model_size, features_slice, frame_skip, all_shapes, prepare_only)
run_jit = TinyJit(run_func, prune=True)
queues, npy_arrays = generate_queues_and_npy(all_shapes, frame_skip, Device.DEFAULT)
def random_inputs_run_fn(fn, seed, test_val=None, test_buffers=None, expect_match=True):
input_queues, npy = make_split_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, Device.DEFAULT)
np.random.seed(seed)
Tensor.manual_seed(seed)
testing = test_val is not None or test_buffers is not None
n_runs = 1 if testing else 3
for i in range(n_runs):
frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
big_frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
for v in npy.values():
v[:] = np.random.randn(*v.shape).astype(v.dtype)
Device.default.synchronize()
st = time.perf_counter()
outs = fn(**input_queues, frame=frame, big_frame=big_frame)
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/{n_runs}] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
if i == 0:
val = [np.copy(v.numpy()) for v in outs]
buffers = [np.copy(v.numpy().copy()) for v in input_queues.values()]
if test_val is not None:
match = all(np.array_equal(a, b) for a, b in zip(val, test_val, strict=True))
assert match == expect_match, f"outputs {'differ from' if expect_match else 'match'} baseline (seed={seed})"
if test_buffers is not None:
match = all(np.array_equal(a, b) for a, b in zip(buffers, test_buffers, strict=True))
assert match == expect_match, f"buffers {'differ from' if expect_match else 'match'} baseline (seed={seed})"
return fn, val, buffers
print('capture + replay')
run_policy_jit, test_val, test_buffers = random_inputs_run_fn(run_policy_jit, SEED)
print('pickle round trip')
run_policy_jit = pickle.loads(pickle.dumps(run_policy_jit))
random_inputs_run_fn(run_policy_jit, SEED, test_val, test_buffers, expect_match=True)
random_inputs_run_fn(run_policy_jit, SEED+1, test_val, test_buffers, expect_match=False)
return run_policy_jit
def derive_frame_skip(vision_input_shapes, policy_input_shapes):
fb = policy_input_shapes.get('features_buffer')
if fb is None:
return 1
fb_history = fb[1]
if fb_history >= 99:
return 1
return 4
def make_supercombo_input_queues(input_shapes, frame_skip, device):
img_shape = input_shapes.get('img', input_shapes.get('input_imgs'))
if img_shape is None:
raise ValueError("No img input found in model shapes")
n_frames = img_shape[1] // 6
img_buf_shape = (frame_skip * (n_frames - 1) + 1, 6, img_shape[2], img_shape[3])
npy_keys = {}
queue_keys = {}
for key, shape in input_shapes.items():
if 'img' in key:
continue
if len(shape) == 3 and shape[1] > 1:
if key.startswith('desire'):
npy_keys[key] = np.zeros(shape[2], dtype=np.float32)
queue_keys[f'{key}_q'] = Tensor(
np.zeros((frame_skip * shape[1], shape[0], shape[2]), dtype=np.float32),
device=device).contiguous().realize()
elif key == 'features_buffer':
queue_keys['feat_q'] = Tensor(
np.zeros((frame_skip * (shape[1] - 1) + 1, shape[0], shape[2]), dtype=np.float32),
device=device).contiguous().realize()
else:
npy_keys[key] = np.zeros(shape, dtype=np.float32)
elif len(shape) == 2:
npy_keys[key] = np.zeros(shape, dtype=np.float32)
if 'traffic_convention' not in npy_keys:
tc_shape = input_shapes.get('traffic_convention', (1, 2))
npy_keys['traffic_convention'] = np.zeros(tc_shape, dtype=np.float32)
npy_keys['tfm'] = np.zeros((3, 3), dtype=np.float32)
npy_keys['big_tfm'] = np.zeros((3, 3), dtype=np.float32)
input_queues = {
'img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
'big_img_q': Tensor(np.zeros(img_buf_shape, dtype=np.uint8), device=device).contiguous().realize(),
**queue_keys,
**{k: Tensor(v, device='NPY').realize() for k, v in npy_keys.items()},
}
return input_queues, npy_keys
def make_run_supercombo(model_runner, nv12: NV12Frame, model_w, model_h,
features_slice, frame_skip, input_shapes, prepare_only=False):
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
sample_desire_fn = partial(sample_desire, frame_skip=frame_skip)
desire_key = _detect_desire_key(input_shapes)
if desire_key is None:
raise ValueError(f"No desire* key found in input_shapes: {list(input_shapes.keys())}")
road_img_key, wide_img_key = _detect_vision_keys(input_shapes)
extra_policy_keys = [k for k in input_shapes
if k not in (desire_key, 'features_buffer', 'traffic_convention')
and 'img' not in k]
def run_supercombo(img_q, big_img_q, feat_q, desire_q,
frame, big_frame, **kwargs):
desire = kwargs.get(desire_key)
traffic_convention = kwargs.get('traffic_convention')
tfm = kwargs['tfm']
big_tfm = kwargs['big_tfm']
tfm = tfm.to(Device.DEFAULT)
big_tfm = big_tfm.to(Device.DEFAULT)
desire = desire.to(Device.DEFAULT)
traffic_convention = traffic_convention.to(Device.DEFAULT)
Tensor.realize(tfm, big_tfm, desire, traffic_convention)
img = shift_and_sample(img_q, frame_prepare(frame, tfm).unsqueeze(0), sample_skip_fn)
big_img = shift_and_sample(big_img_q, frame_prepare(big_frame, big_tfm).unsqueeze(0), sample_skip_fn)
if prepare_only:
return img, big_img
desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
feat_buf = sample_skip_fn(feat_q)
inputs = {road_img_key: img, wide_img_key: big_img,
desire_key: desire_buf, 'features_buffer': feat_buf,
'traffic_convention': traffic_convention}
for k in extra_policy_keys:
if k in kwargs:
inputs[k] = kwargs[k].to(Device.DEFAULT)
model_out = next(iter(model_runner(inputs).values())).cast('float32')
new_feat = model_out[:, features_slice].reshape(1, -1).unsqueeze(0)
shift_and_sample(feat_q, new_feat, sample_skip_fn)
return model_out
return run_supercombo
def make_run_vision_multi_policy(vision_runner, policy_runners, nv12: NV12Frame, model_w, model_h,
vision_features_slice, frame_skip, desire_key, extra_policy_keys,
vision_road_key, vision_wide_key, prepare_only=False):
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
sample_desire_fn = partial(sample_desire, frame_skip=frame_skip)
def run_multi_policy(img_q, big_img_q, feat_q, desire_q, desire,
traffic_convention, tfm, big_tfm, frame, big_frame, **extra):
npy_tensors = [tfm.to(Device.DEFAULT), big_tfm.to(Device.DEFAULT),
desire.to(Device.DEFAULT), traffic_convention.to(Device.DEFAULT)]
extra_device = {k: extra[k].to(Device.DEFAULT) for k in extra_policy_keys}
Tensor.realize(*npy_tensors, *extra_device.values())
tfm, big_tfm, desire, traffic_convention = npy_tensors
img = shift_and_sample(img_q, frame_prepare(frame, tfm).unsqueeze(0), sample_skip_fn)
big_img = shift_and_sample(big_img_q, frame_prepare(big_frame, big_tfm).unsqueeze(0), sample_skip_fn)
if prepare_only:
return img, big_img
vision_out = next(iter(vision_runner({vision_road_key: img, vision_wide_key: big_img}).values())).cast('float32')
new_feat = vision_out[:, vision_features_slice].reshape(1, -1).unsqueeze(0)
feat_buf = shift_and_sample(feat_q, new_feat, sample_skip_fn)
desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
inputs = {'features_buffer': feat_buf, desire_key: desire_buf, 'traffic_convention': traffic_convention, **extra_device}
policy_outputs = []
for runner in policy_runners:
policy_out = next(iter(runner(inputs).values())).cast('float32')
policy_outputs.append(policy_out)
return (vision_out, *policy_outputs)
return run_multi_policy
def _warmup_and_serialize(run_jit, input_queues, npy, nv12):
for i in range(3):
np.random.seed(42 + i)
frame = Tensor.randint(nv12.size, low=0, high=256, dtype=dtypes.uint8, device=WARP_DEV).realize()
big_frame = Tensor.randint(nv12.size, low=0, high=256, dtype=dtypes.uint8, device=WARP_DEV).realize()
for arr in npy_arrays.values():
arr[:] = np.random.randn(*arr.shape).astype(arr.dtype)
frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
big_frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
for v in npy.values():
v[:] = np.random.randn(*v.shape).astype(v.dtype)
Device.default.synchronize()
start_time = time.perf_counter()
run_jit(**queues, frame=frame, big_frame=big_frame)
mid_time = time.perf_counter()
st = time.perf_counter()
run_jit(**input_queues, frame=frame, big_frame=big_frame)
mt = time.perf_counter()
Device.default.synchronize()
print(f" [{i + 1}/3] enqueue {(mid_time - start_time) * 1e3:6.2f} ms -- total {(time.perf_counter() - start_time) * 1e3:6.2f} ms")
return pickle.loads(pickle.dumps(run_jit)) if not prepare_only else run_jit
et = time.perf_counter()
print(f" [{i + 1}/3] enqueue {(mt - st) * 1e3:6.2f} ms -- total {(et - st) * 1e3:6.2f} ms")
return pickle.loads(pickle.dumps(run_jit))
def _parse_size(size_str: str) -> tuple[int, int]:
width, height = size_str.lower().split('x')
return int(width), int(height)
def compile_supercombo(nv12: NV12Frame, model_w, model_h, prepare_only, frame_skip,
model_runner, metadata):
print(f"Compiling combined supercombo JIT for {nv12.width}x{nv12.height} (prepare_only={prepare_only})...")
features_slice = metadata['output_slices']['hidden_state']
input_shapes = metadata['input_shapes']
_run = make_run_supercombo(model_runner, nv12, model_w, model_h,
features_slice, frame_skip, input_shapes, prepare_only)
run_jit = TinyJit(_run, prune=True)
input_queues, npy = make_supercombo_input_queues(input_shapes, frame_skip, Device.DEFAULT)
run_jit = _warmup_and_serialize(run_jit, input_queues, npy, nv12)
return run_jit
def read_file_chunked_to_shm(path):
if not path:
return None
import atexit
from openpilot.common.file_chunker import read_file_chunked
from openpilot.system.hardware.hw import Paths
shm_path = os.path.join(Paths.shm_path(), os.path.basename(path))
atexit.register(lambda: os.path.exists(shm_path) and os.remove(shm_path))
with open(shm_path, 'wb') as f:
f.write(read_file_chunked(path))
return shm_path
def compile_multi_policy(nv12: NV12Frame, model_w, model_h, prepare_only, frame_skip,
vision_runner, policy_runners, vision_metadata, policy_metadata):
print(f"Compiling combined multi-policy JIT for {nv12.width}x{nv12.height} (prepare_only={prepare_only})...")
vision_features_slice = vision_metadata['output_slices']['hidden_state']
vision_input_shapes = vision_metadata['input_shapes']
policy_input_shapes = policy_metadata['input_shapes']
desire_key = _detect_desire_key(policy_input_shapes)
extra_policy_keys = [k for k in policy_input_shapes if k not in ('features_buffer', desire_key, 'traffic_convention')]
vision_road_key, vision_wide_key = _detect_vision_keys(vision_input_shapes)
_run = make_run_vision_multi_policy(vision_runner, policy_runners, nv12, model_w, model_h,
vision_features_slice, frame_skip, desire_key, extra_policy_keys,
vision_road_key, vision_wide_key, prepare_only)
run_jit = TinyJit(_run, prune=True)
input_queues, npy = make_split_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, Device.DEFAULT)
run_jit = _warmup_and_serialize(run_jit, input_queues, npy, nv12)
return run_jit
def _compile_for_resolutions(camera_resolutions: list, model_size: tuple[int, int], frame_skip: int,
vision_runner, policy_runners: list, metadata: dict) -> dict:
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
return {
(cam_w, cam_h): {
name: compile_and_warmup(NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h)), model_size, prepare_only,
frame_skip, vision_runner, policy_runners, metadata)
for name, prepare_only in [('warp_enqueue', True), ('run_policy', False)]
}
for cam_w, cam_h in camera_resolutions
}
def _load_policy_runners(args: argparse.Namespace) -> tuple[list, list]:
runners, keys = [], []
for name, onnx_arg in [('policy', args.policy_onnx), ('off_policy', args.off_policy_onnx), ('on_policy', args.on_policy_onnx)]:
if onnx_arg:
runners.append(OnnxRunner(onnx_arg))
keys.append(name)
return runners, keys
def _parse_size(s):
w, h = s.lower().split('x')
return int(w), int(h)
if __name__ == "__main__":
from openpilot.selfdrive.modeld.get_model_metadata import make_metadata_dict
from tinygrad.nn.onnx import OnnxRunner
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.selfdrive.modeld.get_model_metadata import make_metadata_dict
parser = argparse.ArgumentParser(description="Compile combined JIT pkl for sunnypilot modeld_v2")
parser.add_argument('--model-type', choices=MODEL_TYPES, required=True)
parser.add_argument('--model-size', type=_parse_size, required=True, help='model input WxH')
parser.add_argument('--camera-resolutions', type=_parse_size, nargs='+', required=True)
parser.add_argument('--frame-skip', type=int, default=None, help='frame skip value (auto-derived if not provided)')
parser.add_argument('--output', required=True)
p = argparse.ArgumentParser(description="Compile combined JIT pkl for sunnypilot modeld_v2")
p.add_argument('--model-type', choices=MODEL_TYPES, required=True)
p.add_argument('--model-size', type=_parse_size, required=True, help='model input WxH')
p.add_argument('--camera-resolutions', type=_parse_size, nargs='+', required=True)
p.add_argument('--frame-skip', type=int, default=None, help='frame skip value (auto-derived if not provided)')
p.add_argument('--output', required=True)
parser.add_argument('--vision-onnx', help='vision ONNX (for split models)')
parser.add_argument('--policy-onnx', help='policy ONNX (for vision_policy)')
parser.add_argument('--off-policy-onnx', help='off-policy ONNX (for vision_multi_policy)')
parser.add_argument('--on-policy-onnx', help='on-policy ONNX (for vision_multi_policy)')
parser.add_argument('--supercombo-onnx', help='supercombo ONNX (for supercombo)')
p.add_argument('--vision-onnx', help='vision ONNX (for split models)')
p.add_argument('--policy-onnx', help='policy ONNX (for vision_policy)')
p.add_argument('--off-policy-onnx', help='off-policy ONNX (for vision_multi_policy)')
p.add_argument('--on-policy-onnx', help='on-policy ONNX (for vision_multi_policy)')
p.add_argument('--supercombo-onnx', help='supercombo ONNX (for supercombo)')
args = parser.parse_args()
output_data = defaultdict(dict)
args.vision_onnx = read_file_chunked_to_shm(args.vision_onnx)
args.policy_onnx = read_file_chunked_to_shm(args.policy_onnx)
args.off_policy_onnx = read_file_chunked_to_shm(args.off_policy_onnx)
args.on_policy_onnx = read_file_chunked_to_shm(args.on_policy_onnx)
args.supercombo_onnx = read_file_chunked_to_shm(args.supercombo_onnx)
vision_runner = OnnxRunner(args.vision_onnx) if args.vision_onnx else None
args = p.parse_args()
out = defaultdict(dict)
if args.model_type == 'vision_policy':
assert vision_runner and args.policy_onnx
policy_runners = [OnnxRunner(args.policy_onnx)]
output_data['metadata'] = {'vision': make_metadata_dict(args.vision_onnx), 'policy': make_metadata_dict(args.policy_onnx)}
assert args.vision_onnx and args.policy_onnx
vision_runner = OnnxRunner(args.vision_onnx)
policy_runner = OnnxRunner(args.policy_onnx)
out['metadata']['vision'] = make_metadata_dict(args.vision_onnx)
out['metadata']['policy'] = make_metadata_dict(args.policy_onnx)
frame_skip = args.frame_skip if args.frame_skip is not None else derive_frame_skip(out['metadata']['vision']['input_shapes'],
out['metadata']['policy']['input_shapes'])
for cam_w, cam_h in args.camera_resolutions:
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
model_w, model_h = args.model_size
out[(cam_w, cam_h)] = {
name: compile_split_policy(nv12, model_w, model_h, prepare_only, frame_skip,
vision_runner, policy_runner,
out['metadata']['vision'], out['metadata']['policy'])
for name, prepare_only in [('warp_enqueue', True), ('run_policy', False)]
}
elif args.model_type == 'supercombo':
assert args.supercombo_onnx
policy_runners = [OnnxRunner(args.supercombo_onnx)]
output_data['metadata'] = {'model': make_metadata_dict(args.supercombo_onnx)}
model_runner = OnnxRunner(args.supercombo_onnx)
out['metadata']['model'] = make_metadata_dict(args.supercombo_onnx)
frame_skip = args.frame_skip if args.frame_skip is not None else derive_frame_skip({}, out['metadata']['model']['input_shapes'])
for cam_w, cam_h in args.camera_resolutions:
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
model_w, model_h = args.model_size
out[(cam_w, cam_h)] = {
name: compile_supercombo(nv12, model_w, model_h, prepare_only, frame_skip,
model_runner, out['metadata']['model'])
for name, prepare_only in [('warp_enqueue', True), ('run_policy', False)]
}
elif args.model_type == 'vision_multi_policy':
assert vision_runner
policy_runners, policy_names = _load_policy_runners(args)
output_data['metadata'] = {'vision': make_metadata_dict(args.vision_onnx)}
for name in policy_names:
runner_arg = getattr(args, f"{name}_onnx")
output_data['metadata'][name] = make_metadata_dict(runner_arg)
assert args.vision_onnx
vision_runner = OnnxRunner(args.vision_onnx)
out['metadata']['vision'] = make_metadata_dict(args.vision_onnx)
policy_keys = [key for key in output_data['metadata'].keys() if key != 'vision']
first_policy_meta = output_data['metadata'][policy_keys[0]] if policy_keys else {}
vision_meta = output_data['metadata'].get('vision', {})
policy_runners = []
policy_onnxes = []
if args.policy_onnx:
policy_onnxes.append(('policy', args.policy_onnx))
if args.off_policy_onnx:
policy_onnxes.append(('off_policy', args.off_policy_onnx))
if args.on_policy_onnx:
policy_onnxes.append(('on_policy', args.on_policy_onnx))
derived_frame_skip = args.frame_skip or derive_frame_skip(vision_meta.get('input_shapes', {}), first_policy_meta.get('input_shapes', {}))
output_data.update(_compile_for_resolutions(args.camera_resolutions, args.model_size, derived_frame_skip,
vision_runner, policy_runners, output_data['metadata']))
for name, onnx_path in policy_onnxes:
runner = OnnxRunner(onnx_path)
policy_runners.append(runner)
out['metadata'][name] = make_metadata_dict(onnx_path)
with open(args.output, "wb") as file:
pickle.dump(output_data, file)
first_policy_key = policy_onnxes[0][0]
frame_skip = args.frame_skip if args.frame_skip is not None else derive_frame_skip(out['metadata']['vision']['input_shapes'],
out['metadata'][first_policy_key]['input_shapes'])
for cam_w, cam_h in args.camera_resolutions:
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
model_w, model_h = args.model_size
out[(cam_w, cam_h)] = {
name: compile_multi_policy(nv12, model_w, model_h, prepare_only, frame_skip,
vision_runner, policy_runners,
out['metadata']['vision'], out['metadata'][first_policy_key])
for name, prepare_only in [('warp_enqueue', True), ('run_policy', False)]
}
with open(args.output, "wb") as f:
pickle.dump(out, f)
pkl_size = os.path.getsize(args.output)
print(f"Saved combined JIT to {args.output} ({pkl_size / 1e6:.2f} MB)")
from openpilot.common.file_chunker import chunk_file, get_chunk_targets
chunk_targets = get_chunk_targets(args.output, pkl_size)
chunk_file(args.output, chunk_targets)
print(f"Chunked into {len(chunk_targets) - 1} file(s)")
num_chunks = len(chunk_targets) - 1
print(f"Chunked into {num_chunks} file(s)")

View File

@@ -35,6 +35,7 @@ class ModelConstants:
LANE_LINES_WIDTH = 2
ROAD_EDGES_WIDTH = 2
PLAN_WIDTH = 15
ACTION_WIDTH = 2
DESIRE_PRED_WIDTH = 8
LAT_PLANNER_SOLUTION_WIDTH = 4
DESIRED_CURV_WIDTH = 1

View File

@@ -7,7 +7,6 @@ See the LICENSE.md file in the root directory for more details.
"""
import os
os.environ['GMMU'] = '0'
from openpilot.system.hardware import TICI
os.environ['DEV'] = 'QCOM' if TICI else 'CPU'
USBGPU = "USBGPU" in os.environ
@@ -45,6 +44,34 @@ from openpilot.sunnypilot.models.helpers import get_active_bundle
PROCESS_NAME = "selfdrive.modeld.modeld_tinygrad"
def _load_pkl_compat(data: bytes):
from tinygrad.uop.ops import UOpMetaClass, UOp, Ops, buffers, ParamArg
from tinygrad.dtype import dtypes
_orig_call = UOpMetaClass.__call__
def _compat_call(cls, op, dtype=dtypes.void, src=(), arg=None, tag=None, metadata=None, _buffer=None):
if _buffer is not None and op is Ops.SLICE:
created = _orig_call(cls, op, dtype, src, arg, tag, metadata)
buffers[created] = _buffer
return created
if op is Ops.PARAM and isinstance(arg, int):
arg = ParamArg(arg)
if op is Ops.PERMUTE and len(src) >= 2 and src[0].op is Ops.NOOP:
op = Ops.RESHAPE
shape_uop = src[1]
if shape_uop.op is Ops.CONST and hasattr(shape_uop.dtype, 'count') and shape_uop.dtype.count > 1 and isinstance(shape_uop.arg, tuple):
src = (src[0], UOp(Ops.STACK, shape_uop.dtype, tuple(UOp(Ops.CONST, dtypes.weakint, (), arg=v) for v in shape_uop.arg)))
if op is Ops.CUSTOM and hasattr(dtype, 'count') and dtype.count > 1 and not isinstance(arg, str):
op = Ops.CONST
return _orig_call(cls, op, dtype, src, arg, tag, metadata, _buffer if op is Ops.BUFFER else None)
UOpMetaClass.__call__ = _compat_call
try:
return pickle.loads(data)
finally:
UOpMetaClass.__call__ = _orig_call
def _pkl_exists(path):
from openpilot.common.file_chunker import get_manifest_path
return os.path.exists(path) or os.path.exists(get_manifest_path(path))
@@ -63,6 +90,11 @@ def _find_driving_pkl(bundle):
if _pkl_exists(pkl_path):
return pkl_path
fallback = os.path.join(model_root, 'driving_tinygrad.pkl')
if _pkl_exists(fallback):
return fallback
return None
class FrameMeta:
frame_id: int = 0
@@ -107,11 +139,9 @@ class ModelState(ModelStateBase):
from openpilot.common.file_chunker import read_file_chunked
cloudlog.warning(f"loading combined pkl: {pkl_path}")
jits = pickle.loads(read_file_chunked(pkl_path))
jits = _load_pkl_compat(read_file_chunked(pkl_path))
self.DEV = Device.DEFAULT
self.WARP_DEV = 'CPU' if USBGPU else self.DEV
self.QUEUE_DEV = self.DEV
metadata = jits['metadata']
if 'model' in metadata:
@@ -123,11 +153,11 @@ class ModelState(ModelStateBase):
self._vision_input_names = [k for k in model_metadata['input_shapes'] if 'img' in k]
from openpilot.sunnypilot.modeld_v2.compile_modeld import make_supercombo_input_queues
frame_skip = derive_frame_skip({}, model_metadata['input_shapes'])
self.input_queues, self.numpy_inputs = make_supercombo_input_queues(model_metadata['input_shapes'], frame_skip, device=self.QUEUE_DEV)
self.input_queues, self.npy = make_supercombo_input_queues(model_metadata['input_shapes'], frame_skip, device=self.DEV)
else:
vision_metadata = metadata['vision']
policy_keys = [k for k in metadata if k != 'vision']
if policy_keys == ['policy']:
if len(policy_keys) == 1 and policy_keys[0] in ('policy', 'on_policy'):
self._combined_model_type = 'split'
else:
self._combined_model_type = 'multi_policy'
@@ -136,16 +166,14 @@ class ModelState(ModelStateBase):
self._policy_slices_list = [metadata[k]['output_slices'] for k in policy_keys]
self.policy_output_slices = self._policy_slices_list[0]
self._has_on_policy = any('on' in k.lower() for k in policy_keys)
on_policy_key = next((k for k in policy_keys if 'on' in k.lower()), None)
self._on_policy_has_plan = on_policy_key is not None and 'plan' in metadata[on_policy_key]['output_slices']
first_policy_metadata = metadata[policy_keys[0]]
vision_input_shapes = vision_metadata['input_shapes']
policy_input_shapes = first_policy_metadata['input_shapes']
self._vision_input_names = [k for k in vision_input_shapes if 'img' in k]
frame_skip = derive_frame_skip(vision_input_shapes, policy_input_shapes)
self.input_queues, self.numpy_inputs = make_split_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, device=self.QUEUE_DEV)
self._desire_key = next(key for key in self.numpy_inputs if key.startswith('desire'))
self._road_key = next(key for key in self._vision_input_names if 'big' not in key)
self._wide_key = next(key for key in self._vision_input_names if 'big' in key)
self.input_queues, self.npy = make_split_input_queues(vision_input_shapes, policy_input_shapes, frame_skip, device=self.DEV)
from openpilot.sunnypilot.modeld_v2.parse_model_outputs_split import Parser as SplitParser
from openpilot.sunnypilot.modeld_v2.parse_model_outputs import Parser as CombinedParser
@@ -167,11 +195,12 @@ class ModelState(ModelStateBase):
self._run_policy = jits[(cam_w, cam_h)]['run_policy']
self._warp_enqueue = jits[(cam_w, cam_h)]['warp_enqueue']
yuv_size = self.frame_buf_params[self._road_key][3]
road_name = next(k for k in self._vision_input_names if 'big' not in k)
yuv_size = self.frame_buf_params[road_name][3]
self._warp_enqueue(
**self.input_queues,
frame=Tensor(np.zeros(yuv_size, dtype=np.uint8), device=self.WARP_DEV).contiguous().realize(),
big_frame=Tensor(np.zeros(yuv_size, dtype=np.uint8), device=self.WARP_DEV).contiguous().realize())
frame=Tensor(np.zeros(yuv_size, dtype=np.uint8), device=self.DEV).contiguous().realize(),
big_frame=Tensor(np.zeros(yuv_size, dtype=np.uint8), device=self.DEV).contiguous().realize())
@property
@@ -184,7 +213,7 @@ class ModelState(ModelStateBase):
@property
def desire_key(self) -> str:
return self._desire_key
return next(k for k in self.npy if k.startswith('desire'))
def run(self, bufs: dict[str, VisionBuf], transforms: dict[str, np.ndarray],
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
@@ -195,21 +224,21 @@ class ModelState(ModelStateBase):
yuv_size = self.frame_buf_params[key][3]
cache_key = (key, ptr)
if cache_key not in self._blob_cache:
self._blob_cache[cache_key] = Tensor.from_blob(ptr, (yuv_size,), dtype='uint8', device=self.WARP_DEV)
self._blob_cache[cache_key] = Tensor.from_blob(ptr, (yuv_size,), dtype='uint8', device=self.DEV)
self.full_frames[key] = self._blob_cache[cache_key]
desire_key = self.desire_key
inputs[desire_key][0] = 0
self.numpy_inputs[desire_key][:] = np.where(inputs[desire_key] - self.prev_desire > .99, inputs[desire_key], 0)
self.npy[desire_key][:] = np.where(inputs[desire_key] - self.prev_desire > .99, inputs[desire_key], 0)
self.prev_desire[:] = inputs[desire_key]
for key in ('traffic_convention', 'lateral_control_params', 'action_t'):
if key in self.numpy_inputs and key in inputs:
self.numpy_inputs[key][:] = inputs[key]
if key in self.npy and key in inputs:
self.npy[key][:] = inputs[key]
road_key = self._road_key
wide_key = self._wide_key
self.numpy_inputs['tfm'][:, :] = transforms[road_key].reshape(3, 3)
self.numpy_inputs['big_tfm'][:, :] = transforms[wide_key].reshape(3, 3)
road_key = next(n for n in bufs if 'big' not in n)
wide_key = next(n for n in bufs if 'big' in n)
self.npy['tfm'][:, :] = transforms[road_key].reshape(3, 3)
self.npy['big_tfm'][:, :] = transforms[wide_key].reshape(3, 3)
if prepare_only:
self._warp_enqueue(**self.input_queues, frame=self.full_frames[road_key], big_frame=self.full_frames[wide_key])
@@ -230,15 +259,15 @@ class ModelState(ModelStateBase):
policy_output = raw_outputs[i + 1].numpy().flatten()
policy_sliced = {k: policy_output[np.newaxis, v] for k, v in policy_slices.items()}
parsed = self.parser.parse_policy_outputs(policy_sliced)
if 'off' in self._policy_keys[i] and self._has_on_policy:
if 'off' in self._policy_keys[i] and self._on_policy_has_plan:
parsed.pop('plan', None)
outputs.update(parsed)
if 'planplus' in outputs and 'plan' in outputs:
outputs['plan'] = outputs['plan'] + outputs['planplus']
if 'desired_curvature' in outputs and 'prev_desired_curv' in self.numpy_inputs:
buf = self.numpy_inputs['prev_desired_curv']
if 'desired_curvature' in outputs and 'prev_desired_curv' in self.npy:
buf = self.npy['prev_desired_curv']
buf[0, :-1] = buf[0, 1:]
buf[0, -1, :] = outputs['desired_curvature'][0, :] if not self.mlsim else 0
@@ -246,20 +275,27 @@ class ModelState(ModelStateBase):
def get_action_from_model(self, model_output: dict[str, np.ndarray], prev_action: log.ModelDataV2.Action,
lat_action_t: float, long_action_t: float, v_ego: float) -> log.ModelDataV2.Action:
if 'action' not in model_output:
plan = model_output['plan'][0]
desired_accel, should_stop = get_accel_from_plan(plan[:, Plan.VELOCITY][:, 0], plan[:, Plan.ACCELERATION][:, 0], self.constants.T_IDXS,
action_t=long_action_t)
desired_accel = smooth_value(desired_accel, prev_action.desiredAcceleration, self.LONG_SMOOTH_SECONDS)
curvature_plan = (plan + (self.PLANPLUS_CONTROL - 1.0) * model_output['planplus'][0]
if 'planplus' in model_output and self.PLANPLUS_CONTROL != 1.0 else plan)
desired_curvature = get_curvature_from_output(model_output, curvature_plan, v_ego, lat_action_t, self.mlsim)
else:
if 'action' in model_output:
desired_accel = model_output['action'][0, 1]
desired_curvature = model_output['action'][0, 0] / (max(1.0, v_ego))**2
should_stop = (v_ego < 0.3 and desired_accel < 0.1)
desired_accel = smooth_value(desired_accel, prev_action.desiredAcceleration, self.LONG_SMOOTH_SECONDS)
if self.generation is not None and self.generation >= 10:
if v_ego > self.MIN_LAT_CONTROL_SPEED:
desired_curvature = smooth_value(desired_curvature, prev_action.desiredCurvature, self.LAT_SMOOTH_SECONDS)
else:
desired_curvature = prev_action.desiredCurvature
return log.ModelDataV2.Action(desiredCurvature=float(desired_curvature),
desiredAcceleration=float(desired_accel),
shouldStop=bool(should_stop))
plan = model_output['plan'][0]
desired_accel, should_stop = get_accel_from_plan(plan[:, Plan.VELOCITY][:, 0], plan[:, Plan.ACCELERATION][:, 0], self.constants.T_IDXS,
action_t=long_action_t)
desired_accel = smooth_value(desired_accel, prev_action.desiredAcceleration, self.LONG_SMOOTH_SECONDS)
curvature_plan = plan + (self.PLANPLUS_CONTROL - 1.0) * model_output['planplus'][0] if 'planplus' in model_output and self.PLANPLUS_CONTROL != 1.0 else plan
desired_curvature = get_curvature_from_output(model_output, curvature_plan, v_ego, lat_action_t, self.mlsim)
if self.generation is not None and self.generation >= 10: # smooth curvature for post FOF models
if v_ego > self.MIN_LAT_CONTROL_SPEED:
desired_curvature = smooth_value(desired_curvature, prev_action.desiredCurvature, self.LAT_SMOOTH_SECONDS)
@@ -412,23 +448,19 @@ def main(demo=False):
bufs = {name: buf_extra if 'big' in name else buf_main for name in model.vision_input_names}
transforms = {name: model_transform_extra if 'big' in name else model_transform_main for name in model.vision_input_names}
frame_delay = DT_MDL # compensate for time passed since the frame was captured: current_time - timestamp_eof is 50ms on average
action_delay = DT_MDL / 2 # middle of the interval between model output (current state) and next frame (expected state)
frame_delay = DT_MDL
action_delay = DT_MDL / 2
lat_action_t = lat_delay + frame_delay + action_delay
long_action_t = long_delay + frame_delay + action_delay
inputs:dict[str, np.ndarray] = {
inputs: dict[str, np.ndarray] = {
model.desire_key: vec_desire,
'traffic_convention': traffic_convention,
'action_t': np.array([lat_action_t, long_action_t], dtype=np.float32),
}
if 'lateral_control_params' in model.numpy_inputs:
if 'lateral_control_params' in model.npy:
inputs['lateral_control_params'] = np.array([v_ego, lat_delay], dtype=np.float32)
if 'action_t' in model.numpy_inputs:
inputs['action_t'] = np.array([lat_action_t, long_action_t], dtype=np.float32)
mt1 = time.perf_counter()
model_output = model.run(bufs, transforms, inputs, prepare_only)
mt2 = time.perf_counter()
@@ -440,7 +472,7 @@ def main(demo=False):
posenet_send = messaging.new_message('cameraOdometry')
mdv2sp_send = messaging.new_message('modelDataV2SP')
action = model.get_action_from_model(model_output, prev_action, lat_action_t, long_action_t, v_ego)
action = model.get_action_from_model(model_output, prev_action, lat_delay + DT_MDL, long_delay + DT_MDL, v_ego)
prev_action = action
fill_model_msg(drivingdata_send, modelv2_send, model_output, action,
publish_state, meta_main.frame_id, meta_extra.frame_id, frame_id,

View File

@@ -0,0 +1,62 @@
## Neural networks in openpilot
To view the architecture of the ONNX networks, you can use [netron](https://netron.app/)
## Supercombo
### Supercombo input format (Full size: 799906 x float32)
* **image stream**
* Two consecutive images (256 * 512 * 3 in RGB) recorded at 20 Hz : 393216 = 2 * 6 * 128 * 256
* Each 256 * 512 image is represented in YUV420 with 6 channels : 6 * 128 * 256
* Channels 0,1,2,3 represent the full-res Y channel and are represented in numpy as Y[::2, ::2], Y[::2, 1::2], Y[1::2, ::2], and Y[1::2, 1::2]
* Channel 4 represents the half-res U channel
* Channel 5 represents the half-res V channel
* **wide image stream**
* Two consecutive images (256 * 512 * 3 in RGB) recorded at 20 Hz : 393216 = 2 * 6 * 128 * 256
* Each 256 * 512 image is represented in YUV420 with 6 channels : 6 * 128 * 256
* Channels 0,1,2,3 represent the full-res Y channel and are represented in numpy as Y[::2, ::2], Y[::2, 1::2], Y[1::2, ::2], and Y[1::2, 1::2]
* Channel 4 represents the half-res U channel
* Channel 5 represents the half-res V channel
* **desire**
* one-hot encoded buffer to command model to execute certain actions, bit needs to be sent for the past 5 seconds (at 20FPS) : 100 * 8
* **traffic convention**
* one-hot encoded vector to tell model whether traffic is right-hand or left-hand traffic : 2
* **feature buffer**
* A buffer of intermediate features that gets appended to the current feature to form a 5 seconds temporal context (at 20FPS) : 99 * 512
### Supercombo output format (Full size: XXX x float32)
Read [here](https://github.com/commaai/openpilot/blob/90af436a121164a51da9fa48d093c29f738adf6a/selfdrive/modeld/models/driving.h#L236) for more.
## Driver Monitoring Model
* .onnx model can be run with onnx runtimes
* .dlc file is a pre-quantized model and only runs on qualcomm DSPs
### input format
* single image W = 1440 H = 960 luminance channel (Y) from the planar YUV420 format:
* full input size is 1440 * 960 = 1382400
* normalized ranging from 0.0 to 1.0 in float32 (onnx runner) or ranging from 0 to 255 in uint8 (snpe runner)
* camera calibration angles (roll, pitch, yaw) from liveCalibration: 3 x float32 inputs
### output format
* 84 x float32 outputs = 2 + 41 * 2 ([parsing example](https://github.com/commaai/openpilot/blob/22ce4e17ba0d3bfcf37f8255a4dd1dc683fe0c38/selfdrive/modeld/models/dmonitoring.cc#L33))
* for each person in the front seats (2 * 41)
* face pose: 12 = 6 + 6
* face orientation [pitch, yaw, roll] in camera frame: 3
* face position [dx, dy] relative to image center: 2
* normalized face size: 1
* standard deviations for above outputs: 6
* face visible probability: 1
* eyes: 20 = (8 + 1) + (8 + 1) + 1 + 1
* eye position and size, and their standard deviations: 8
* eye visible probability: 1
* eye closed probability: 1
* wearing sunglasses probability: 1
* face occluded probability: 1
* touching wheel probability: 1
* paying attention probability: 1
* (deprecated) distracted probabilities: 2
* using phone probability: 1
* distracted probability: 1
* common outputs 2
* poor camera vision probability: 1
* left hand drive probability: 1

View File

View File

@@ -134,8 +134,6 @@ class Parser:
out_shape=(SplitModelConstants.NUM_ROAD_EDGES,SplitModelConstants.IDX_N,SplitModelConstants.LANE_LINES_WIDTH))
if 'sim_pose' in outs:
self.parse_mdn('sim_pose', outs, in_N=0, out_N=0, out_shape=(SplitModelConstants.POSE_WIDTH,))
if 'action' in outs:
self.parse_mdn('action', outs, in_N=0, out_N=0, out_shape=(SplitModelConstants.ACTION_WIDTH,))
def parse_vision_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
self.parse_mdn('pose', outs, in_N=0, out_N=0, out_shape=(SplitModelConstants.POSE_WIDTH,))
@@ -148,6 +146,8 @@ class Parser:
def parse_policy_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
self.parse_dynamic_outputs(outs)
self.split_outputs(outs)
if 'action' in outs:
self.parse_mdn('action', outs, in_N=0, out_N=0, out_shape=(SplitModelConstants.ACTION_WIDTH,))
return outs
def parse_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:

View File

@@ -0,0 +1,101 @@
// clang++ -O2 repro.cc && ./a.out
#include <sched.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
static inline double millis_since_boot() {
struct timespec t;
clock_gettime(CLOCK_BOOTTIME, &t);
return t.tv_sec * 1000.0 + t.tv_nsec * 1e-6;
}
#define MODEL_WIDTH 320
#define MODEL_HEIGHT 640
// null function still breaks it
#define input_lambda(x) x
// this is copied from models/dmonitoring.cc, and is the code that triggers the issue
void inner(uint8_t *resized_buf, float *net_input_buf) {
int resized_width = MODEL_WIDTH;
int resized_height = MODEL_HEIGHT;
// one shot conversion, O(n) anyway
// yuvframe2tensor, normalize
for (int r = 0; r < MODEL_HEIGHT/2; r++) {
for (int c = 0; c < MODEL_WIDTH/2; c++) {
// Y_ul
net_input_buf[(c*MODEL_HEIGHT/2) + r] = input_lambda(resized_buf[(2*r*resized_width) + (2*c)]);
// Y_ur
net_input_buf[(c*MODEL_HEIGHT/2) + r + (2*(MODEL_WIDTH/2)*(MODEL_HEIGHT/2))] = input_lambda(resized_buf[(2*r*resized_width) + (2*c+1)]);
// Y_dl
net_input_buf[(c*MODEL_HEIGHT/2) + r + ((MODEL_WIDTH/2)*(MODEL_HEIGHT/2))] = input_lambda(resized_buf[(2*r*resized_width+1) + (2*c)]);
// Y_dr
net_input_buf[(c*MODEL_HEIGHT/2) + r + (3*(MODEL_WIDTH/2)*(MODEL_HEIGHT/2))] = input_lambda(resized_buf[(2*r*resized_width+1) + (2*c+1)]);
// U
net_input_buf[(c*MODEL_HEIGHT/2) + r + (4*(MODEL_WIDTH/2)*(MODEL_HEIGHT/2))] = input_lambda(resized_buf[(resized_width*resized_height) + (r*resized_width/2) + c]);
// V
net_input_buf[(c*MODEL_HEIGHT/2) + r + (5*(MODEL_WIDTH/2)*(MODEL_HEIGHT/2))] = input_lambda(resized_buf[(resized_width*resized_height) + ((resized_width/2)*(resized_height/2)) + (r*resized_width/2) + c]);
}
}
}
float trial() {
int resized_width = MODEL_WIDTH;
int resized_height = MODEL_HEIGHT;
int yuv_buf_len = (MODEL_WIDTH/2) * (MODEL_HEIGHT/2) * 6; // Y|u|v -> y|y|y|y|u|v
// allocate the buffers
uint8_t *resized_buf = (uint8_t*)malloc(resized_width*resized_height*3/2);
float *net_input_buf = (float*)malloc(yuv_buf_len*sizeof(float));
printf("allocate -- %p 0x%x -- %p 0x%lx\n", resized_buf, resized_width*resized_height*3/2, net_input_buf, yuv_buf_len*sizeof(float));
// test for bad buffers
static int CNT = 20;
float avg = 0.0;
for (int i = 0; i < CNT; i++) {
double s4 = millis_since_boot();
inner(resized_buf, net_input_buf);
double s5 = millis_since_boot();
avg += s5-s4;
}
avg /= CNT;
// once it's bad, it's reliably bad
if (avg > 10) {
printf("HIT %f\n", avg);
printf("BAD\n");
for (int i = 0; i < 200; i++) {
double s4 = millis_since_boot();
inner(resized_buf, net_input_buf);
double s5 = millis_since_boot();
printf("%.2f ", s5-s4);
}
printf("\n");
exit(0);
}
// don't free so we get a different buffer each time
//free(resized_buf);
//free(net_input_buf);
return avg;
}
int main() {
while (true) {
float ret = trial();
printf("got %f\n", ret);
}
}

View File

@@ -46,6 +46,16 @@ class TestFindDrivingPkl:
assert result is not None
assert 'driving_fof_tinygrad.pkl' in result
def test_finds_fallback_driving_tinygrad(self, tmp_path, monkeypatch):
(tmp_path / 'driving_tinygrad.pkl').write_bytes(b'fake')
from openpilot.system.hardware import hw
monkeypatch.setattr(hw.Paths, 'model_root', staticmethod(lambda: str(tmp_path)))
bundle = DummyBundle(models=[DummyModel('vision', 'nonexistent.pkl')])
result = _find_driving_pkl(bundle)
assert result is not None
assert 'driving_tinygrad.pkl' in result
# Init — assertion guard
@@ -70,12 +80,11 @@ class TestStockEquivalence:
stock_queues, stock_npy = make_input_queues(SPLIT_VISION_INPUT_SHAPES, SPLIT_POLICY_INPUT_SHAPES, frame_skip,
device='NPY')
# TODO-SP: remove action_t skip once SP adds prerequisite for deep models (action_t input queue)
skip_keys = {'action_t'}
assert set(state.input_queues.keys()) == set(stock_queues.keys()) - skip_keys, \
optional_keys = {'action_t'} if 'action_t' not in SPLIT_POLICY_INPUT_SHAPES else set()
assert set(state.input_queues.keys()) == set(stock_queues.keys()) - optional_keys, \
f"Queue keys differ: v2={set(state.input_queues.keys())}, stock={set(stock_queues.keys())}"
assert set(state.numpy_inputs.keys()) == set(stock_npy.keys()) - skip_keys, \
f"Npy keys differ: v2={set(state.numpy_inputs.keys())}, stock={set(stock_npy.keys())}"
assert set(state.npy.keys()) == set(stock_npy.keys()) - optional_keys, \
f"Npy keys differ: v2={set(state.npy.keys())}, stock={set(stock_npy.keys())}"
def test_split_queue_keys_work_with_desire_key(self, model_state_factory):
from openpilot.sunnypilot.modeld_v2.compile_modeld import derive_frame_skip, make_split_input_queues
@@ -178,16 +187,16 @@ class TestInputQueueCreation:
def test_npy_contains_transforms(self, archetype_name, model_state_factory):
arch = ARCHETYPES[archetype_name]
state = model_state_factory(arch)
assert 'tfm' in state.numpy_inputs, f"{arch.name}: 'tfm' missing from npy"
assert 'big_tfm' in state.numpy_inputs, f"{arch.name}: 'big_tfm' missing from npy"
assert state.numpy_inputs['tfm'].shape == (3, 3)
assert state.numpy_inputs['big_tfm'].shape == (3, 3)
assert 'tfm' in state.npy, f"{arch.name}: 'tfm' missing from npy"
assert 'big_tfm' in state.npy, f"{arch.name}: 'big_tfm' missing from npy"
assert state.npy['tfm'].shape == (3, 3)
assert state.npy['big_tfm'].shape == (3, 3)
@pytest.mark.parametrize("archetype_name", ARCHETYPE_NAMES)
def test_npy_contains_desire(self, archetype_name, model_state_factory):
arch = ARCHETYPES[archetype_name]
state = model_state_factory(arch)
assert arch.expected_desire_key in state.numpy_inputs, \
assert arch.expected_desire_key in state.npy, \
f"{arch.name}: '{arch.expected_desire_key}' missing from npy"

View File

@@ -68,3 +68,18 @@ def test_recovery_power_scaling():
# For the below, yes, I know this isn't the same slicing as fillmodlmsg. This is to show that the values are only scaled on curv
expected_curv_plan_vel = plan[0, :, Plan.VELOCITY][:, 0] + control * planplus[0, :, Plan.VELOCITY][:, 0]
np.testing.assert_allclose(recorded_curv_plans[0][:, Plan.VELOCITY][:, 0], expected_curv_plan_vel, rtol=1e-5, atol=1e-6)
def test_action_direct_output():
state = MockStruct(
LONG_SMOOTH_SECONDS=0.3,
LAT_SMOOTH_SECONDS=0.1,
MIN_LAT_CONTROL_SPEED=0.3,
generation=12,
)
prev_action = log.ModelDataV2.Action()
model_output = {'action': np.array([[0.01, -0.5]])}
result = ModelState.get_action_from_model(state, model_output, prev_action, 0.1, 0.1, 10.0)
assert result.desiredAcceleration != 0.0
assert result.desiredCurvature != 0.0
assert isinstance(result.shouldStop, bool)

View File

@@ -0,0 +1,103 @@
import os
os.environ['DEV'] = 'CPU'
import pytest
import numpy as np
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.sunnypilot.modeld_v2.warp import CAMERA_CONFIGS
from openpilot.sunnypilot.modeld_v2.warp import Warp, MODEL_W, MODEL_H
VISION_NAME_PAIRS = [ # needed to account for supercombos input_imgs
('img', 'big_img'),
('input_imgs', 'big_input_imgs'),
]
class MockVisionBuf:
def __init__(self, w, h):
self.width = w
self.height = h
_, _, _, yuv_size = get_nv12_info(w, h)
self.data = np.zeros(yuv_size, dtype=np.uint8)
@pytest.mark.parametrize("buffer_length", [2, 5])
def test_warp_initialization(buffer_length):
warp = Warp(buffer_length)
assert warp.buffer_length == buffer_length
assert warp.img_buffer_shape == (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
@pytest.mark.parametrize("buffer_length", [2, 5])
@pytest.mark.parametrize("cam_w, cam_h", CAMERA_CONFIGS)
@pytest.mark.parametrize("road, wide", VISION_NAME_PAIRS)
def test_warp_process(buffer_length, cam_w, cam_h, road, wide):
warp = Warp(buffer_length)
mock_buf = MockVisionBuf(cam_w, cam_h)
transform = np.eye(3, dtype=np.float32).flatten()
bufs = {road: mock_buf, wide: mock_buf}
transforms = {road: transform, wide: transform}
out = warp.process(bufs, transforms)
assert isinstance(out, dict)
assert road in out and wide in out
assert out[road].shape == (1, 12, MODEL_H // 2, MODEL_W // 2)
assert out[wide].shape == (1, 12, MODEL_H // 2, MODEL_W // 2)
key = (cam_w, cam_h)
assert key in warp.jit_cache
out2 = warp.process(bufs, transforms)
assert out2[road].shape == out[road].shape
@pytest.mark.parametrize("road, wide", VISION_NAME_PAIRS)
def test_warp_buffer_shift(road, wide):
warp = Warp(2)
cam_w, cam_h = CAMERA_CONFIGS[1]
transform = np.eye(3, dtype=np.float32).flatten()
buf1 = MockVisionBuf(cam_w, cam_h)
buf1.data[0] = 255
bufs1 = {road: buf1, wide: buf1}
transforms = {road: transform, wide: transform}
out1 = warp.process(bufs1, transforms)
road1 = out1[road].numpy().copy()
buf2 = MockVisionBuf(cam_w, cam_h)
buf2.data[0] = 128
bufs2 = {road: buf2, wide: buf2}
out2 = warp.process(bufs2, transforms)
assert not np.array_equal(road1, out2[road].numpy())
@pytest.mark.parametrize("buffer_length", [2, 5])
@pytest.mark.parametrize("road, wide", VISION_NAME_PAIRS)
def test_warp_buffer_accumulation(buffer_length, road, wide):
warp = Warp(buffer_length)
cam_w, cam_h = CAMERA_CONFIGS[0]
transform = np.eye(3, dtype=np.float32).flatten()
transforms = {road: transform, wide: transform}
outputs = []
for i in range(buffer_length + 1):
buf = MockVisionBuf(cam_w, cam_h)
buf.data[:] = i * 10
out = warp.process({road: buf, wide: buf}, transforms)
outputs.append(out[road].numpy().copy())
assert warp.full_buffers['img'].shape == (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
for i in range(1, len(outputs)):
assert not np.array_equal(outputs[i - 1], outputs[i])
def test_warp_different_cameras_same_instance():
warp = Warp(2)
transform = np.eye(3, dtype=np.float32).flatten()
buf1 = MockVisionBuf(*CAMERA_CONFIGS[0])
warp.process({'img': buf1, 'big_img': buf1}, {'img': transform, 'big_img': transform})
assert len(warp.jit_cache) == 1
buf2 = MockVisionBuf(*CAMERA_CONFIGS[1])
warp.process({'img': buf2, 'big_img': buf2}, {'img': transform, 'big_img': transform})
assert len(warp.jit_cache) == 2

View File

@@ -0,0 +1,2 @@
#!/usr/bin/env bash
clang++ -I /home/batman/one/external/tensorflow/include/ -L /home/batman/one/external/tensorflow/lib -Wl,-rpath=/home/batman/one/external/tensorflow/lib main.cc -ltensorflow

View File

@@ -0,0 +1,69 @@
#include <cassert>
#include <cstdio>
#include <cstdlib>
#include "tensorflow/c/c_api.h"
void* read_file(const char* path, size_t* out_len) {
FILE* f = fopen(path, "r");
if (!f) {
return NULL;
}
fseek(f, 0, SEEK_END);
long f_len = ftell(f);
rewind(f);
char* buf = (char*)calloc(f_len, 1);
assert(buf);
size_t num_read = fread(buf, f_len, 1, f);
fclose(f);
if (num_read != 1) {
free(buf);
return NULL;
}
if (out_len) {
*out_len = f_len;
}
return buf;
}
static void DeallocateBuffer(void* data, size_t) {
free(data);
}
int main(int argc, char* argv[]) {
TF_Buffer* buf;
TF_Graph* graph;
TF_Status* status;
char *path = argv[1];
// load model
{
size_t model_size;
char tmp[1024];
snprintf(tmp, sizeof(tmp), "%s.pb", path);
printf("loading model %s\n", tmp);
uint8_t *model_data = (uint8_t *)read_file(tmp, &model_size);
buf = TF_NewBuffer();
buf->data = model_data;
buf->length = model_size;
buf->data_deallocator = DeallocateBuffer;
printf("loaded model of size %d\n", model_size);
}
// import graph
status = TF_NewStatus();
graph = TF_NewGraph();
TF_ImportGraphDefOptions *opts = TF_NewImportGraphDefOptions();
TF_GraphImportGraphDef(graph, buf, opts, status);
TF_DeleteImportGraphDefOptions(opts);
TF_DeleteBuffer(buf);
if (TF_GetCode(status) != TF_OK) {
printf("FAIL: %s\n", TF_Message(status));
} else {
printf("SUCCESS\n");
}
}

View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python3
import sys
import tensorflow as tf
with open(sys.argv[1], "rb") as f:
graph_def = tf.compat.v1.GraphDef()
graph_def.ParseFromString(f.read())
#tf.io.write_graph(graph_def, '', sys.argv[1]+".try")

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
import os
import time
import numpy as np
import cereal.messaging as messaging
from openpilot.system.manager.process_config import managed_processes
N = int(os.getenv("N", "5"))
TIME = int(os.getenv("TIME", "30"))
if __name__ == "__main__":
sock = messaging.sub_sock('modelV2', conflate=False, timeout=1000)
execution_times = []
for _ in range(N):
os.environ['LOGPRINT'] = 'debug'
managed_processes['modeld'].start()
time.sleep(5)
t = []
start = time.monotonic()
while time.monotonic() - start < TIME:
msgs = messaging.drain_sock(sock, wait_for_one=True)
for m in msgs:
t.append(m.modelV2.modelExecutionTime)
execution_times.append(np.array(t[10:]) * 1000)
managed_processes['modeld'].stop()
print("\n\n")
print(f"ran modeld {N} times for {TIME}s each")
for _, t in enumerate(execution_times):
print(f"\tavg: {sum(t)/len(t):0.2f}ms, min: {min(t):0.2f}ms, max: {max(t):0.2f}ms")
print("\n\n")

View File

@@ -0,0 +1,171 @@
import pickle
import time
import numpy as np
from pathlib import Path
from tinygrad.tensor import Tensor
from tinygrad.engine.jit import TinyJit
from tinygrad.device import Device
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
from openpilot.selfdrive.modeld.compile_modeld import NV12Frame, make_frame_prepare as _make_frame_prepare
CAMERA_CONFIGS = [
(_ar_ox_fisheye.width, _ar_ox_fisheye.height),
(_os_fisheye.width, _os_fisheye.height),
]
def make_frame_prepare(cam_w, cam_h, model_w, model_h):
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
return _make_frame_prepare(nv12, model_w, model_h)
def warp_pkl_path(w, h):
from openpilot.selfdrive.modeld.helpers import MODELS_DIR
return MODELS_DIR / f'warp_{w}x{h}_tinygrad.pkl'
def make_update_img_input(frame_prepare, model_w, model_h):
def update_img_input_tinygrad(tensor, frame, M_inv):
M_inv = M_inv.to(Device.DEFAULT)
new_img = frame_prepare(frame, M_inv)
tensor.assign(tensor[6:].cat(new_img, dim=0).contiguous())
return Tensor.cat(tensor[:6], tensor[-6:], dim=0).contiguous().reshape(1, 12, model_h//2, model_w//2)
return update_img_input_tinygrad
def make_update_both_imgs(frame_prepare, model_w, model_h):
update_img = make_update_img_input(frame_prepare, model_w, model_h)
def update_both_imgs_tinygrad(calib_img_buffer, new_img, M_inv,
calib_big_img_buffer, new_big_img, M_inv_big):
calib_img_pair = update_img(calib_img_buffer, new_img, M_inv)
calib_big_img_pair = update_img(calib_big_img_buffer, new_big_img, M_inv_big)
return calib_img_pair, calib_big_img_pair
return update_both_imgs_tinygrad
MODELS_DIR = Path(__file__).parent / 'models'
MODEL_W, MODEL_H = MEDMODEL_INPUT_SIZE
UPSTREAM_BUFFER_LENGTH = 5
def v2_warp_pkl_path(cam_w, cam_h, buffer_length):
return MODELS_DIR / f'warp_{cam_w}x{cam_h}_b{buffer_length}_tinygrad.pkl'
def compile_v2_warp(cam_w, cam_h, buffer_length):
_, _, _, yuv_size = get_nv12_info(cam_w, cam_h)
img_buffer_shape = (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
print(f"Compiling v2 warp for {cam_w}x{cam_h} buffer_length={buffer_length}...")
frame_prepare = make_frame_prepare(cam_w, cam_h, MODEL_W, MODEL_H)
update_both_imgs = make_update_both_imgs(frame_prepare, MODEL_W, MODEL_H)
update_img_jit = TinyJit(update_both_imgs, prune=True)
full_buffer = Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize()
big_full_buffer = Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize()
new_frame_np = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
new_big_frame_np = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
for i in range(10):
img_inputs = [full_buffer,
Tensor.from_blob(new_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
big_img_inputs = [big_full_buffer,
Tensor.from_blob(new_big_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
inputs = img_inputs + big_img_inputs
Device.default.synchronize()
st = time.perf_counter()
_ = update_img_jit(*inputs)
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/10] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
pkl_path = v2_warp_pkl_path(cam_w, cam_h, buffer_length)
with open(pkl_path, "wb") as f:
pickle.dump(update_img_jit, f)
print(f" Saved to {pkl_path}")
jit = pickle.load(open(pkl_path, "rb"))
verify_frame = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
verify_big_frame = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
fresh_inputs = [
Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize(),
Tensor.from_blob(verify_frame.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY'),
Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize(),
Tensor.from_blob(verify_big_frame.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY'),
]
jit(*fresh_inputs)
class Warp:
def __init__(self, buffer_length=2):
self.buffer_length = buffer_length
self.img_buffer_shape = (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
self.jit_cache = {}
self.full_buffers = {k: Tensor.zeros(self.img_buffer_shape, dtype='uint8').contiguous().realize() for k in ['img', 'big_img']}
self._blob_cache: dict[int, Tensor] = {}
self._nv12_cache: dict[tuple[int, int], int] = {}
self.transforms_np = {k: np.zeros((3, 3), dtype=np.float32) for k in ['img', 'big_img']}
self.transforms = {k: Tensor(v, device='NPY').realize() for k, v in self.transforms_np.items()}
def process(self, bufs, transforms):
if not bufs:
return {}
road = next(n for n in bufs if 'big' not in n)
wide = next(n for n in bufs if 'big' in n)
cam_w, cam_h = bufs[road].width, bufs[road].height
key = (cam_w, cam_h)
if key not in self.jit_cache:
v2_pkl = v2_warp_pkl_path(cam_w, cam_h, self.buffer_length)
if v2_pkl.exists():
with open(v2_pkl, 'rb') as f:
self.jit_cache[key] = pickle.load(f)
elif self.buffer_length == UPSTREAM_BUFFER_LENGTH:
upstream_pkl = warp_pkl_path(cam_w, cam_h)
if upstream_pkl.exists():
with open(upstream_pkl, 'rb') as f:
self.jit_cache[key] = pickle.load(f)
if key not in self.jit_cache:
frame_prepare = make_frame_prepare(cam_w, cam_h, MODEL_W, MODEL_H)
update_both_imgs = make_update_both_imgs(frame_prepare, MODEL_W, MODEL_H)
self.jit_cache[key] = TinyJit(update_both_imgs, prune=True)
if key not in self._nv12_cache:
self._nv12_cache[key] = get_nv12_info(cam_w, cam_h)[3]
yuv_size = self._nv12_cache[key]
road_ptr = np.frombuffer(bufs[road].data, dtype=np.uint8).ctypes.data
wide_ptr = np.frombuffer(bufs[wide].data, dtype=np.uint8).ctypes.data
if road_ptr not in self._blob_cache:
self._blob_cache[road_ptr] = Tensor.from_blob(road_ptr, (yuv_size,), dtype='uint8')
if wide_ptr not in self._blob_cache:
self._blob_cache[wide_ptr] = Tensor.from_blob(wide_ptr, (yuv_size,), dtype='uint8')
road_blob = self._blob_cache[road_ptr]
wide_blob = self._blob_cache[wide_ptr] if wide_ptr != road_ptr else Tensor.from_blob(wide_ptr, (yuv_size,), dtype='uint8')
np.copyto(self.transforms_np['img'], transforms[road].reshape(3, 3))
np.copyto(self.transforms_np['big_img'], transforms[wide].reshape(3, 3))
Device.default.synchronize()
res = self.jit_cache[key](
self.full_buffers['img'], road_blob, self.transforms['img'],
self.full_buffers['big_img'], wide_blob, self.transforms['big_img'],
)
out_road = res[0].realize()
out_wide = res[1].realize()
return {road: out_road, wide: out_wide}
if __name__ == "__main__":
for cam_w, cam_h in CAMERA_CONFIGS:
for bl in [2, 5]:
compile_v2_warp(cam_w, cam_h, bl)

View File

@@ -26,22 +26,11 @@ class ModelParser:
download_uri.sha256 = download_uri_data.get("sha256")
return download_uri
@staticmethod
def _parse_chunk(chunk_data) -> custom.ModelManagerSP.Chunk:
chunk = custom.ModelManagerSP.Chunk()
chunk.fileName = chunk_data.get("file_name")
chunk.sha256 = chunk_data.get("sha256")
return chunk
@staticmethod
def _parse_artifact(artifact_data) -> custom.ModelManagerSP.Artifact:
artifact = custom.ModelManagerSP.Artifact()
artifact.fileName = artifact_data.get("file_name")
artifact.downloadUri = ModelParser._parse_download_uri(artifact_data.get("download_uri", {}))
if "chunks" in artifact_data:
artifact.chunks = [ModelParser._parse_chunk(chunk_data) for chunk_data in artifact_data["chunks"]]
return artifact
@staticmethod
@@ -127,7 +116,7 @@ class ModelCache:
class ModelFetcher:
"""Handles fetching and caching of model data from remote source"""
MODEL_URL = "https://raw.githubusercontent.com/sunnypilot/sunnypilot-models/refs/heads/gh-pages/docs/driving_models_v18.json"
MODEL_URL = "https://raw.githubusercontent.com/sunnypilot/sunnypilot-models/refs/heads/gh-pages/docs/driving_models_v17.json"
def __init__(self, params: Params):
self.params = params
@@ -195,6 +184,4 @@ if __name__ == "__main__":
# Print artifact details
print(f"Artifact: {model.artifact.fileName}, Download URI: {model.artifact.downloadUri.uri}")
# Print metadata details
if model.artifact.chunks:
print(f"Contains {len(model.artifact.chunks)} chunks.")
print(f"Metadata: {model.metadata.fileName}, Download URI: {model.metadata.downloadUri.uri}")

View File

@@ -6,138 +6,80 @@ See the LICENSE.md file in the root directory for more details.
"""
import hashlib
import os
import pickle
from pathlib import Path
import numpy as np
from cereal import custom
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.sunnypilot.models.constants import Meta, MetaSimPose, MetaTombRaider
from cereal import custom
from openpilot.sunnypilot.models.constants import Meta, MetaTombRaider, MetaSimPose
from openpilot.system.hardware.hw import Paths
from pathlib import Path
# see the README.md for more details on the model selector versioning
CURRENT_SELECTOR_VERSION = 15
REQUIRED_MIN_SELECTOR_VERSION = 14
# SET ME TO THE EXACT JSON VERSION WE SET IN SUNNYPILOT_MODELS REPO
REQUIRED_JSON_VERSION = 15
CUSTOM_MODEL_PATH = Paths.model_root()
METADATA_PATH = Path(__file__).parent / '../models/supercombo_metadata.pkl'
ModelManager = custom.ModelManagerSP
_LAST_VALIDATED_RAW = None
def _compute_hash(file_path: str) -> str | None:
from openpilot.common.file_chunker import read_file_chunked
try:
return hashlib.sha256(read_file_chunked(file_path)).hexdigest().lower()
except FileNotFoundError:
return None
async def verify_file(file_path: str, expected_hash: str) -> bool:
file_hash = _compute_hash(file_path)
return file_hash == expected_hash.lower() if file_hash else False
def _verify_file(file_path: str, expected_hash: str) -> bool:
file_hash = _compute_hash(file_path)
return file_hash == expected_hash.lower() if file_hash else False
from openpilot.common.file_chunker import read_file_chunked
try:
data = read_file_chunked(file_path)
except FileNotFoundError:
return False
return hashlib.sha256(data).hexdigest().lower() == expected_hash.lower()
def is_bundle_version_compatible(bundle: dict) -> bool:
"""
The bundle parsed from the json specifies a `minimum_selector_version`, which defines the minimum selector version
Checks whether the model bundle is compatible with the current selector version constraints.
The bundle specifies a `minimum_selector_version`, which defines the minimum selector version
required to load the model. This function ensures that:
the bundle MUST match the `REQUIRED_JSON_VERSION` set here in helpers.
1. The model is not too old: the bundle must require at least `REQUIRED_MIN_SELECTOR_VERSION`.
2. The model is not too new: it must support the current selector version (`CURRENT_SELECTOR_VERSION`).
This allows the selector to enforce both a minimum and maximum range of supported models,
even if a model would otherwise be compatible.
:param bundle: Dictionary containing `minimum_selector_version`, as defined by the model bundle.
:type bundle: Dict
:return: True if the selector version is within the accepted range for the bundle; otherwise False.
:rtype: Bool
"""
return bundle.get("minimumSelectorVersion", 0) == REQUIRED_JSON_VERSION
return bool(REQUIRED_MIN_SELECTOR_VERSION <= bundle.get("minimumSelectorVersion", 0) <= CURRENT_SELECTOR_VERSION)
def _bundle_artifacts(bundle: custom.ModelManagerSP.ModelBundle) -> list[tuple[str, str]]:
artifacts = []
for model in getattr(bundle, 'models', []) or []:
for artifact in (getattr(model, 'artifact', None), getattr(model, 'metadata', None)):
if artifact and getattr(artifact, 'fileName', None) and getattr(artifact, 'downloadUri', None):
sha256 = getattr(artifact.downloadUri, 'sha256', None)
if sha256:
artifacts.append((artifact.fileName, sha256))
return artifacts
def get_active_bundle(params: Params = None) -> custom.ModelManagerSP.ModelBundle:
"""Gets the active model bundle from cache"""
if params is None:
params = Params()
def _bundle_is_valid_locally(bundle: custom.ModelManagerSP.ModelBundle) -> bool:
model_root = Paths.model_root()
return all(_verify_file(os.path.join(model_root, file_name), expected_hash)
for file_name, expected_hash in _bundle_artifacts(bundle))
def _bundle_needs_reset(active_bundle: custom.ModelManagerSP.ModelBundle, available_bundles: list[custom.ModelManagerSP.ModelBundle] | None) -> bool:
if active_bundle is None:
return False
if available_bundles is not None:
matching_bundle = None
for bundle in available_bundles:
if getattr(active_bundle, 'ref', None) and getattr(bundle, 'ref', None):
if active_bundle.ref == bundle.ref:
matching_bundle = bundle
break
elif getattr(active_bundle, 'internalName', None) == getattr(bundle, 'internalName', None):
matching_bundle = bundle
break
if matching_bundle is None:
return True
if active_bundle.minimumSelectorVersion != matching_bundle.minimumSelectorVersion:
return True
active_runner = getattr(active_bundle, 'runner', None)
matching_runner = getattr(matching_bundle, 'runner', None)
if active_runner is not None and matching_runner is not None:
if getattr(active_runner, 'raw', active_runner) != getattr(matching_runner, 'raw', matching_runner):
return True
if set(_bundle_artifacts(active_bundle)) != set(_bundle_artifacts(matching_bundle)):
return True
return not _bundle_is_valid_locally(active_bundle)
def validate_active_bundle(params: Params, available_bundles: list[custom.ModelManagerSP.ModelBundle] | None = None) -> None:
global _LAST_VALIDATED_RAW
raw_bundle = params.get("ModelManager_ActiveBundle")
if not raw_bundle:
return
if raw_bundle == _LAST_VALIDATED_RAW:
return
active_bundle = get_active_bundle(params, raw_bundle_dict=raw_bundle)
if active_bundle is None or _bundle_needs_reset(active_bundle, available_bundles):
cloudlog.warning("Active model bundle invalid; resetting to default")
params.remove("ModelManager_ActiveBundle")
params.put("ModelRunnerTypeCache", int(custom.ModelManagerSP.Runner.stock), block=True)
_LAST_VALIDATED_RAW = None
else:
_LAST_VALIDATED_RAW = raw_bundle
def get_active_bundle(params: Params | None = None, raw_bundle_dict: dict | bytes | None = None) -> "custom.ModelManagerSP.ModelBundle | None":
params = params or Params()
try:
active_bundle_dict = raw_bundle_dict if raw_bundle_dict is not None else (params.get("ModelManager_ActiveBundle") or {})
if active_bundle_dict and is_bundle_version_compatible(active_bundle_dict):
return custom.ModelManagerSP.ModelBundle(**active_bundle_dict)
if (active_bundle := params.get("ModelManager_ActiveBundle") or {}) and is_bundle_version_compatible(active_bundle):
return custom.ModelManagerSP.ModelBundle(**active_bundle)
except Exception:
pass
return None
def get_active_model_runner(params: Params | None = None, force_check: bool = False) -> int:
params = params or Params()
def get_active_model_runner(params: Params = None, force_check=False) -> int:
if params is None:
params = Params()
cached_runner_type = params.get("ModelRunnerTypeCache")
if cached_runner_type is not None and not force_check:
return cached_runner_type
runner_type = custom.ModelManagerSP.Runner.stock
if active_bundle := get_active_bundle(params):
runner_type = active_bundle.runner.raw
@@ -146,40 +88,66 @@ def get_active_model_runner(params: Params | None = None, force_check: bool = Fa
return runner_type
def _get_model():
if bundle := get_active_bundle():
drive_model = next(model for model in bundle.models if model.type == ModelManager.Model.Type.supercombo)
return drive_model
return None
def load_metadata():
model = _get_model()
metadata_path = f"{CUSTOM_MODEL_PATH}/{model.metadata.fileName}" if model else METADATA_PATH
metadata_path = METADATA_PATH
if model := _get_model():
metadata_path = f"{CUSTOM_MODEL_PATH}/{model.metadata.fileName}"
with open(metadata_path, 'rb') as f:
return pickle.load(f)
def prepare_inputs(model_metadata: dict) -> dict[str, np.ndarray]:
return {
key: np.zeros(shape, dtype=np.float32).flatten()
for key, shape in model_metadata['input_shapes'].items()
if 'img' not in key
def prepare_inputs(model_metadata) -> dict[str, np.ndarray]:
# img buffers are managed in openCL transform code so we don't pass them as inputs
inputs = {
k: np.zeros(v, dtype=np.float32).flatten()
for k, v in model_metadata['input_shapes'].items()
if 'img' not in k
}
return inputs
def load_meta_constants(model_metadata: dict):
""" Loads the appropriate meta model class based on key shapes"""
if 'sim_pose' in model_metadata['input_shapes']:
return MetaSimPose
meta_slice = model_metadata['output_slices']['meta']
if (meta_slice.start, meta_slice.stop, meta_slice.step) == (5868, 5921, None):
return MetaTombRaider
def load_meta_constants(model_metadata):
"""
Determines and loads the appropriate meta model class based on the metadata provided. The function checks
specific keys and conditions within the provided metadata dictionary to identify the corresponding meta
model class to return.
return Meta
:param model_metadata: Dictionary containing metadata about the model. It includes
details such as input shapes, output slices, and other configurations for identifying
metadata-dependent meta model classes.
:type model_metadata: dict
:return: The appropriate meta model class (Meta, MetaSimPose, or MetaTombRaider)
based on the conditions and metadata provided.
:rtype: type
"""
meta = Meta # Default Meta
if 'sim_pose' in model_metadata['input_shapes'].keys():
# Meta for models with sim_pose input
meta = MetaSimPose
else:
# Meta for Tomb Raider, it does not include sim_pose input but has the same meta slice as previous models
meta_slice = model_metadata['output_slices']['meta']
meta_tf_slice = slice(5868, 5921, None)
if (
meta_slice.start == meta_tf_slice.start and
meta_slice.stop == meta_tf_slice.stop and
meta_slice.step == meta_tf_slice.step
):
meta = MetaTombRaider
return meta
# The following method(s) are modeld helper methods

View File

@@ -17,7 +17,7 @@ from openpilot.system.hardware.hw import Paths
from cereal import messaging, custom
from openpilot.sunnypilot.models.fetcher import ModelFetcher
from openpilot.sunnypilot.models.helpers import get_active_bundle, validate_active_bundle, verify_file
from openpilot.sunnypilot.models.helpers import verify_file, get_active_bundle
class ModelManagerSP:
@@ -89,16 +89,20 @@ class ModelManagerSP:
del self._download_start_times[model.fileName]
async def _download_chunked(self, base_url: str, base_path: str, artifact) -> None:
from openpilot.common.file_chunker import get_chunk_name, get_manifest_path
num_chunks = len(artifact.chunks)
if num_chunks == 0:
raise ValueError("No chunks defined in artifact")
from openpilot.common.file_chunker import get_manifest_path, get_chunk_name
manifest_url = get_manifest_path(base_url)
manifest_path = get_manifest_path(base_path)
async with aiohttp.ClientSession() as session:
async with session.get(manifest_url) as resp:
if resp.status == 404:
raise FileNotFoundError
resp.raise_for_status()
num_chunks = int((await resp.read()).strip())
self._download_start_times[artifact.fileName] = time.monotonic()
for i, _ in enumerate(artifact.chunks):
for i in range(num_chunks):
chunk_url = get_chunk_name(base_url, i, num_chunks)
chunk_path = get_chunk_name(base_path, i, num_chunks)
chunk_downloaded = 0
@@ -113,7 +117,7 @@ class ModelManagerSP:
if self.params.get("ModelManager_DownloadIndex") is None:
raise Exception("Download cancelled")
intra = chunk_downloaded / max(chunk_size, 1)
progress = min(99.0, ((i + intra) / num_chunks) * 100)
progress = min(99, (i + intra) / num_chunks * 100)
artifact.downloadProgress.status = custom.ModelManagerSP.DownloadStatus.downloading
artifact.downloadProgress.progress = progress
artifact.downloadProgress.eta = self._calculate_eta(artifact.fileName, progress)
@@ -144,9 +148,9 @@ class ModelManagerSP:
self._report_status()
return
if len(artifact.chunks) > 0:
try:
await self._download_chunked(url, full_path, artifact)
else:
except (FileNotFoundError, aiohttp.ClientResponseError):
await self._download_file(url, full_path, artifact)
if not await verify_file(full_path, expected_hash):
@@ -166,16 +170,18 @@ class ModelManagerSP:
artifact.downloadProgress.status = custom.ModelManagerSP.DownloadStatus.failed
artifact.downloadProgress.eta = 0
self._sync_artifact_progress(artifact)
if self.selected_bundle:
self.selected_bundle.status = custom.ModelManagerSP.DownloadStatus.failed
self.selected_bundle.status = custom.ModelManagerSP.DownloadStatus.failed
self._report_status()
self._download_start_times.pop(artifact.fileName, None)
raise
async def _process_model(self, model, destination_path: str) -> None:
"""Processes a single model download including verification"""
await self._process_artifact(model.metadata, destination_path)
await self._process_artifact(model.artifact, destination_path)
model_artifact = model.artifact
metadata_artifact = model.metadata
await self._process_artifact(metadata_artifact, destination_path)
await self._process_artifact(model_artifact, destination_path)
def _report_status(self) -> None:
"""Reports current status through messaging system"""
@@ -216,8 +222,7 @@ class ModelManagerSP:
self.selected_bundle = None
except Exception:
if self.selected_bundle:
self.selected_bundle.status = custom.ModelManagerSP.DownloadStatus.failed
self.selected_bundle.status = custom.ModelManagerSP.DownloadStatus.failed
raise
finally:
@@ -234,7 +239,6 @@ class ModelManagerSP:
while True:
try:
self.available_models = self.model_fetcher.get_available_bundles()
validate_active_bundle(self.params, self.available_models)
self.active_bundle = get_active_bundle(self.params)
if (index_to_download := self.params.get("ModelManager_DownloadIndex")) is not None:
@@ -248,8 +252,8 @@ class ModelManagerSP:
self.selected_bundle = None
if self.params.get("ModelManager_ClearCache"):
self.clear_model_cache()
self.params.remove("ModelManager_ClearCache")
self.clear_model_cache()
self.params.remove("ModelManager_ClearCache")
self._report_status()
rk.keep_time()

View File

@@ -0,0 +1,28 @@
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.models.runners.model_runner import ModelRunner
from openpilot.sunnypilot.models.runners.tinygrad.tinygrad_runner import TinygradRunner, TinygradSplitRunner
from openpilot.sunnypilot.models.runners.constants import ModelType
def get_model_runner() -> ModelRunner:
"""
Factory function to create and return the appropriate ModelRunner instance.
Selects TinygradRunner, choosing TinygradSplitRunner if separate vision/policy
models are detected in the active bundle.
:return: An instance of a ModelRunner subclass (ONNXRunner, TinygradRunner, or TinygradSplitRunner).
"""
bundle = get_active_bundle()
if bundle and bundle.models:
model_types = {m.type.raw for m in bundle.models}
# Check if the bundle uses separate vision and policy models (legacy or new split format)
split_types = {ModelType.vision, ModelType.policy, ModelType.offPolicy, ModelType.onPolicy}
if model_types & split_types:
return TinygradSplitRunner()
# Otherwise, assume a single model (likely supercombo)
if bundle.models:
return TinygradRunner(bundle.models[0].type.raw)
# Default fallback to TinygradRunner with the supercombo type if bundle info is missing/incomplete
return TinygradRunner(ModelType.supercombo)

View File

@@ -0,0 +1,174 @@
from abc import abstractmethod, ABC
import numpy as np
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.models.runners.constants import NumpyDict, ShapeDict, Model, SliceDict, SEND_RAW_PRED
from openpilot.system.hardware.hw import Paths
import pickle
CUSTOM_MODEL_PATH = Paths.model_root()
class ModelData:
"""
Stores metadata and configuration for a specific machine learning model.
This class loads model metadata (like input shapes and output slices)
from a pickle file associated with a model instance.
:param model: The machine learning model object containing metadata.
"""
def __init__(self, model: Model):
self.model = model
self.metadata = model.metadata
self.input_shapes: ShapeDict = {}
self.output_slices: SliceDict = {}
if self.metadata:
self._load_metadata()
def _load_metadata(self) -> None:
"""Loads input shapes and output slices from the model's metadata pickle file."""
metadata_path = f"{CUSTOM_MODEL_PATH}/{self.metadata.fileName}"
with open(metadata_path, 'rb') as f:
model_metadata = pickle.load(f)
self.input_shapes = model_metadata.get('input_shapes', {})
self.output_slices = model_metadata.get('output_slices', {})
class ModularRunner(ABC):
"""
Represents a modular runner for handling and slicing model outputs.
This abstract base class is designed to provide an interface for modular
parsing and processing of model outputs. Classes inheriting from it must
implement the specified abstract methods, defining how model outputs
should be handled and stored. The primary goal is to enable structured
parsing of outputs through a dictionary-based method mapping.
:ivar parser_method_dict: Mapping dictionary containing parser methods
for handling specific types of outputs.
:type parser_method_dict: dict
"""
@property
@abstractmethod
def parser_method_dict(self) -> dict:
pass
@parser_method_dict.setter
@abstractmethod
def parser_method_dict(self, value: dict) -> None:
pass
@abstractmethod
def _slice_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
pass
class ModelRunner(ModularRunner):
"""
Abstract base class for managing and executing machine learning models.
Provides a common interface for loading models, preparing inputs, running
inference, and slicing/parsing outputs based on model metadata. Derived
classes implement the specifics of input preparation and model execution
for different frameworks (e.g., Tinygrad, ONNX).
"""
def __init__(self):
"""Initializes the model runner, loading the active model bundle."""
self.is_20hz: bool | None = None
self.is_20hz_3d: bool | None = None
self.models: dict[int, ModelData] = {}
self._model_data: ModelData | None = None # Active model data for current operation
self._parser_method_dict: dict = {}
self.inputs: dict = {}
self._parser = None
self._load_models()
self._constants = None
@property
def constants(self):
return self._constants
@property
def parser_method_dict(self) -> dict:
"""Returns the dictionary mapping model types to their respective parsing methods."""
return self._parser_method_dict
@parser_method_dict.setter
def parser_method_dict(self, value: dict) -> None:
"""Sets the dictionary mapping model types to their respective parsing methods."""
self._parser_method_dict = value
def _load_models(self) -> None:
"""Loads the active model bundle configuration and sets up ModelData."""
bundle = get_active_bundle()
if not bundle:
raise ValueError("No active model bundle found, why are we being executed?")
self.models = {model.type.raw: ModelData(model) for model in bundle.models}
self.is_20hz = bundle.is20hz
self.is_20hz_3d = False
@property
def input_shapes(self) -> ShapeDict:
"""Returns the input shapes for the currently active model."""
if self._model_data:
return self._model_data.input_shapes
raise ValueError("Model data is not available. Ensure the model is loaded correctly.")
@property
def output_slices(self) -> SliceDict:
"""Returns the output slices for the currently active model."""
if self._model_data:
return self._model_data.output_slices
raise ValueError("Model data is not available. Ensure the model is loaded correctly.")
@property
def vision_input_names(self) -> list[str]:
"""Returns the list of vision input names from the input shapes."""
if self._model_data:
return list(self._model_data.input_shapes.keys())
raise ValueError("Model data is not available. Ensure the model is loaded correctly.")
@abstractmethod
def prepare_inputs(self, numpy_inputs: NumpyDict) -> dict:
"""
Abstract method to prepare inputs for model inference.
:param numpy_inputs: Dictionary of numpy arrays for non-image inputs.
:return: Dictionary of prepared inputs ready for the model.
"""
raise NotImplementedError
@abstractmethod
def _run_model(self) -> NumpyDict:
"""
Abstract method to execute model inference with prepared inputs.
:return: Dictionary containing the model's raw output arrays.
"""
raise NotImplementedError
def _slice_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""
Slices the raw model output array based on the output_slices metadata.
:param model_outputs: The raw numpy array output from the model.
:return: A dictionary where keys are output names and values are sliced numpy arrays.
"""
if not self._model_data:
raise ValueError("Model data is not available. Ensure the model is loaded correctly.")
sliced_outputs = {k: model_outputs[np.newaxis, v] for k, v in self._model_data.output_slices.items()}
if SEND_RAW_PRED:
sliced_outputs['raw_pred'] = model_outputs.copy() # Optionally include the full raw output
return sliced_outputs
def run_model(self) -> NumpyDict:
"""
Executes the model inference pipeline: runs the model and parses outputs.
:return: Dictionary containing the final parsed model outputs.
"""
return self._run_model() # Parsing is handled within specific runner implementations

View File

@@ -0,0 +1,91 @@
import os
from abc import ABC
import numpy as np
from openpilot.sunnypilot.modeld_v2.parse_model_outputs import Parser as CombinedParser
from openpilot.sunnypilot.modeld_v2.parse_model_outputs_split import Parser as SplitParser
from openpilot.sunnypilot.models.runners.constants import ModelType, NumpyDict
from openpilot.sunnypilot.models.runners.model_runner import ModularRunner
from openpilot.system.hardware.hw import Paths
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
CUSTOM_MODEL_PATH = Paths.model_root()
class OffPolicyTinygrad(ModularRunner, ABC):
"""
A TinygradRunner specialized for off-policy models.
Uses a SplitParser to handle outputs specific to the off-policy part of a split model setup.
"""
def __init__(self):
self._off_policy_parser = SplitParser()
self.parser_method_dict[ModelType.offPolicy] = self._parse_off_policy_outputs
def _parse_off_policy_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses off-policy model outputs using SplitParser."""
result: NumpyDict = self._off_policy_parser.parse_policy_outputs(self._slice_outputs(model_outputs))
return result
class OnPolicyTinygrad(ModularRunner, ABC):
"""
A TinygradRunner specialized for on-policy models.
Uses a SplitParser to handle outputs specific to the on-policy part of a split model setup.
"""
def __init__(self):
self._on_policy_parser = SplitParser()
self.parser_method_dict[ModelType.onPolicy] = self._parse_on_policy_outputs
def _parse_on_policy_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses on-policy model outputs using SplitParser."""
result: NumpyDict = self._on_policy_parser.parse_policy_outputs(self._slice_outputs(model_outputs))
return result
class PolicyTinygrad(ModularRunner, ABC):
"""
A TinygradRunner specialized for policy-only models.
Uses a SplitParser to handle outputs specific to the policy part of a split model setup.
"""
def __init__(self):
self._policy_parser = SplitParser()
self.parser_method_dict[ModelType.policy] = self._parse_policy_outputs
def _parse_policy_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses policy model outputs using SplitParser."""
result: NumpyDict = self._policy_parser.parse_policy_outputs(self._slice_outputs(model_outputs))
return result
class VisionTinygrad(ModularRunner, ABC):
"""
A TinygradRunner specialized for vision-only models.
Uses a SplitParser to handle outputs specific to the vision part of a split model setup.
"""
def __init__(self):
self._vision_parser = SplitParser()
self.parser_method_dict[ModelType.vision] = self._parse_vision_outputs
def _parse_vision_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses vision model outputs using SplitParser."""
result: NumpyDict = self._vision_parser.parse_vision_outputs(self._slice_outputs(model_outputs))
return result
class SupercomboTinygrad(ModularRunner, ABC):
"""
A TinygradRunner specialized for vision-only models.
Uses a SplitParser to handle outputs specific to the vision part of a split model setup.
"""
def __init__(self):
self._supercombo_parser = CombinedParser()
self.parser_method_dict[ModelType.supercombo] = self._parse_supercombo_outputs
def _parse_supercombo_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses vision model outputs using SplitParser."""
result: NumpyDict = self._supercombo_parser.parse_outputs(self._slice_outputs(model_outputs))
return result

View File

@@ -0,0 +1,179 @@
import pickle
import numpy as np
from openpilot.sunnypilot.models.runners.constants import NumpyDict, ModelType, ShapeDict, CUSTOM_MODEL_PATH, SliceDict
from openpilot.sunnypilot.models.runners.model_runner import ModelRunner
from openpilot.sunnypilot.models.runners.tinygrad.model_types import PolicyTinygrad, VisionTinygrad, SupercomboTinygrad, OffPolicyTinygrad, OnPolicyTinygrad
from openpilot.sunnypilot.models.split_model_constants import SplitModelConstants
from openpilot.sunnypilot.modeld_v2.constants import ModelConstants
from tinygrad.tensor import Tensor
class TinygradRunner(ModelRunner, SupercomboTinygrad, PolicyTinygrad, VisionTinygrad, OffPolicyTinygrad, OnPolicyTinygrad):
"""
A ModelRunner implementation for executing Tinygrad models.
Handles loading Tinygrad model artifacts (.pkl), preparing inputs as Tinygrad
Tensors (potentially using QCOM extensions on TICI), running inference,
and parsing the outputs.
:param model_type: The type of model (e.g., supercombo) to load and run.
"""
def __init__(self, model_type: int = ModelType.supercombo):
ModelRunner.__init__(self)
SupercomboTinygrad.__init__(self)
PolicyTinygrad.__init__(self)
VisionTinygrad.__init__(self)
OffPolicyTinygrad.__init__(self)
OnPolicyTinygrad.__init__(self)
self._constants = ModelConstants
self._model_data = self.models.get(model_type)
if not self._model_data or not self._model_data.model:
raise ValueError(f"Model data for type {model_type} not available.")
artifact_filename = self._model_data.model.artifact.fileName
assert artifact_filename.endswith('_tinygrad.pkl'), \
f"Invalid model file {artifact_filename} for TinygradRunner"
model_pkl_path = f"{CUSTOM_MODEL_PATH}/{artifact_filename}"
with open(model_pkl_path, "rb") as f:
try:
# Load the compiled Tinygrad model runner function
self.model_run = pickle.load(f)
except FileNotFoundError as e:
# Provide a helpful error message if the model was built for a different platform
assert "/dev/kgsl-3d0" not in str(e), "Model was built on C3 or C3X, but is being loaded on PC"
raise
# Map input names to their required dtype and device from the loaded model
self.input_to_dtype = {}
self.input_to_device = {}
for idx, name in enumerate(self.model_run.captured.expected_names):
info = self.model_run.captured.expected_input_info[idx]
self.input_to_dtype[name] = info[2] # dtype
self.input_to_device[name] = info[3] # device
self._policy_cached = False
@property
def vision_input_names(self) -> list[str]:
"""Returns the list of vision input names from the input shapes."""
return [name for name in self.input_shapes.keys() if 'img' in name]
def prepare_policy_inputs(self, numpy_inputs: NumpyDict):
if not self._policy_cached:
for key, value in numpy_inputs.items():
self.inputs[key] = Tensor(value, device='NPY').realize()
self._policy_cached = True
def prepare_inputs(self, numpy_inputs: NumpyDict) -> dict:
"""Prepares all vision and policy inputs for the model."""
self.prepare_policy_inputs(numpy_inputs)
for key in self.vision_input_names:
if key in self.inputs:
self.inputs[key] = self.inputs[key].cast(self.input_to_dtype[key])
return self.inputs
def _run_model(self) -> NumpyDict:
"""Runs the Tinygrad model inference and parses the outputs."""
outputs = self.model_run(**self.inputs).contiguous().realize().uop.base.buffer.numpy().flatten()
return self._parse_outputs(outputs)
def _parse_outputs(self, model_outputs: np.ndarray) -> NumpyDict:
"""Parses the raw model outputs using the standard Parser."""
if self._model_data is None:
raise ValueError("Model data is not available. Ensure the model is loaded correctly.")
result: NumpyDict = self.parser_method_dict[self._model_data.model.type.raw](model_outputs)
return result
class TinygradSplitRunner(ModelRunner):
"""
A ModelRunner that coordinates separate TinygradVisionRunner and TinygradPolicyRunner instances.
Manages the execution of split vision and policy models, combining their inputs and outputs.
"""
def __init__(self):
super().__init__()
self.is_20hz_3d = True
self.vision_runner = TinygradRunner(ModelType.vision)
self.policy_runner = TinygradRunner(ModelType.policy) if self.models.get(ModelType.policy) else None
self.off_policy_runner = TinygradRunner(ModelType.offPolicy) if self.models.get(ModelType.offPolicy) else None
self.on_policy_runner = TinygradRunner(ModelType.onPolicy) if self.models.get(ModelType.onPolicy) else None
self._constants = SplitModelConstants
def _run_model(self) -> NumpyDict:
"""Runs both vision and policy models and merges their parsed outputs."""
vision_output = self.vision_runner.run_model()
outputs = {**vision_output}
if self.policy_runner:
policy_output = self.policy_runner.run_model()
outputs.update(policy_output)
if self.off_policy_runner:
off_policy_output = self.off_policy_runner.run_model()
if self.on_policy_runner:
off_policy_output.pop('plan', None)
outputs.update(off_policy_output)
if self.on_policy_runner:
on_policy_output = self.on_policy_runner.run_model()
outputs.update(on_policy_output)
if 'planplus' in outputs and 'plan' in outputs:
outputs['plan'] = outputs['plan'] + outputs['planplus']
return outputs
@property
def vision_input_names(self) -> list[str]:
"""Returns the list of vision input names from the vision runner."""
return list(self.vision_runner.vision_input_names)
@property
def input_shapes(self) -> ShapeDict:
"""Returns the combined input shapes from both vision and policy models."""
shapes = {**self.vision_runner.input_shapes}
if self.policy_runner:
shapes.update(self.policy_runner.input_shapes)
if self.off_policy_runner:
shapes.update(self.off_policy_runner.input_shapes)
if self.on_policy_runner:
shapes.update(self.on_policy_runner.input_shapes)
return shapes
@property
def output_slices(self) -> SliceDict:
"""Returns the combined output slices from both vision and policy models."""
slices = {**self.vision_runner.output_slices}
if self.policy_runner:
slices.update(self.policy_runner.output_slices)
if self.off_policy_runner:
slices.update(self.off_policy_runner.output_slices)
if self.on_policy_runner:
slices.update(self.on_policy_runner.output_slices)
return slices
def prepare_inputs(self, numpy_inputs: NumpyDict) -> dict:
"""Prepares inputs for both vision and policy models."""
if self.policy_runner:
self.policy_runner.prepare_policy_inputs(numpy_inputs)
for key in self.vision_input_names:
if key in self.inputs:
self.vision_runner.inputs[key] = self.inputs[key].cast(self.vision_runner.input_to_dtype[key])
inputs = {**self.vision_runner.inputs}
if self.policy_runner:
inputs.update(self.policy_runner.inputs)
if self.off_policy_runner:
self.off_policy_runner.prepare_policy_inputs(numpy_inputs)
inputs.update(self.off_policy_runner.inputs)
if self.on_policy_runner:
self.on_policy_runner.prepare_policy_inputs(numpy_inputs)
inputs.update(self.on_policy_runner.inputs)
return inputs

View File

@@ -32,7 +32,7 @@ class Proc:
PROCS = [
Proc(['camerad'], 1.65, atol=0.4, msgs=['roadCameraState', 'wideRoadCameraState', 'driverCameraState']),
Proc(['modeld'], 1.5, atol=0.2, msgs=['modelV2']),
Proc(['modeld'], 1.8, atol=0.2, msgs=['modelV2']),
Proc(['dmonitoringmodeld'], 0.65, atol=0.35, msgs=['driverStateV2']),
Proc(['encoderd'], 0.23, msgs=[]),
]