Revert "Simpler file chunker (#37276)"

This reverts commit b27fa58444.
This commit is contained in:
Bruce Wayne
2026-02-20 16:43:43 -08:00
parent 09926bf5b5
commit d6af0e6eb5
6 changed files with 72 additions and 61 deletions
+2 -1
View File
@@ -64,7 +64,8 @@ flycheck_*
cppcheck_report.txt
comma*.sh
selfdrive/modeld/models/*.pkl*
selfdrive/modeld/models/*.pkl
selfdrive/modeld/models/*.pkl.*
# openpilot log files
*.bz2
-31
View File
@@ -1,31 +0,0 @@
import math
import os
from pathlib import Path
CHUNK_SIZE = 49 * 1024 * 1024 # 49MB, under GitHub's 50MB limit
def get_chunk_name(name, idx, num_chunks):
return f"{name}.chunk{idx+1:02d}of{num_chunks:02d}"
def get_chunk_paths(path, file_size):
num_chunks = math.ceil(file_size / CHUNK_SIZE)
return [get_chunk_name(path, i, num_chunks) for i in range(num_chunks)]
def chunk_file(path, num_chunks):
with open(path, 'rb') as f:
data = f.read()
actual_num_chunks = max(1, math.ceil(len(data) / CHUNK_SIZE))
assert num_chunks >= actual_num_chunks, f"Allowed {num_chunks} chunks but needs at least {actual_num_chunks}, for path {path}"
for i in range(num_chunks):
with open(get_chunk_name(path, i, num_chunks), 'wb') as f:
f.write(data[i * CHUNK_SIZE:(i + 1) * CHUNK_SIZE])
def read_file_chunked(path):
for num_chunks in range(1, 100):
if os.path.isfile(get_chunk_name(path, 0, num_chunks)):
files = [get_chunk_name(path, i, num_chunks) for i in range(num_chunks)]
return b''.join(Path(f).read_bytes() for f in files)
if os.path.isfile(path):
return Path(path).read_bytes()
raise FileNotFoundError(path)
+23 -22
View File
@@ -1,18 +1,14 @@
import os
import glob
from openpilot.common.file_chunker import chunk_file, get_chunk_paths
Import('env', 'arch')
chunker_file = File("#common/file_chunker.py")
lenv = env.Clone()
CHUNK_BYTES = int(os.environ.get("TG_CHUNK_BYTES", str(45 * 1024 * 1024)))
tinygrad_root = env.Dir("#").abspath
tinygrad_files = ["#"+x for x in glob.glob(env.Dir("#tinygrad_repo").relpath + "/**", recursive=True, root_dir=tinygrad_root)
if 'pycache' not in x and os.path.isfile(os.path.join(tinygrad_root, x))]
def estimate_pickle_max_size(onnx_size):
return 1.2 * onnx_size + 10 * 1024 * 1024 # 20% + 10MB is plenty
# Get model metadata
for model_name in ['driving_vision', 'driving_policy', 'dmonitoring_model']:
fn = File(f"models/{model_name}").abspath
@@ -30,34 +26,39 @@ image_flag = {
'larch64': 'IMAGE=2',
}.get(arch, 'IMAGE=0')
script_files = [File(Dir("#selfdrive/modeld").File("compile_warp.py").abspath)]
compile_warp_cmd = f'{tg_flags} python3 {Dir("#selfdrive/modeld").abspath}/compile_warp.py '
cmd = f'{tg_flags} python3 {Dir("#selfdrive/modeld").abspath}/compile_warp.py '
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
warp_targets = []
for cam in [_ar_ox_fisheye, _os_fisheye]:
w, h = cam.width, cam.height
warp_targets += [File(f"models/warp_{w}x{h}_tinygrad.pkl").abspath, File(f"models/dm_warp_{w}x{h}_tinygrad.pkl").abspath]
def chunk_warps(target, source, env):
for t in warp_targets:
chunk_file(t, 1)
chunk_targets = sum([get_chunk_paths(t, estimate_pickle_max_size(0)) for t in warp_targets], [])
lenv.Command(chunk_targets, tinygrad_files + script_files + [chunker_file],
[compile_warp_cmd, chunk_warps])
lenv.Command(warp_targets, tinygrad_files + script_files, cmd)
def tg_compile(flags, model_name):
pythonpath_string = 'PYTHONPATH="${PYTHONPATH}:' + env.Dir("#tinygrad_repo").abspath + '"'
fn = File(f"models/{model_name}").abspath
pkl = fn + "_tinygrad.pkl"
onnx_path = fn + ".onnx"
chunk_targets = get_chunk_paths(pkl, estimate_pickle_max_size(os.path.getsize(onnx_path)))
def do_chunk(target, source, env):
chunk_file(pkl, len(chunk_targets))
return lenv.Command(
chunk_targets,
[onnx_path] + tinygrad_files + [chunker_file],
[f'{pythonpath_string} {flags} {image_flag} python3 {Dir("#tinygrad_repo").abspath}/examples/openpilot/compile3.py {fn}.onnx {pkl}',
do_chunk]
out = fn + "_tinygrad.pkl"
full = out + ".full"
parts = out + ".parts"
full_node = lenv.Command(
full,
[fn + ".onnx"] + tinygrad_files,
f'{pythonpath_string} {flags} {image_flag} python3 {Dir("#tinygrad_repo").abspath}/examples/openpilot/compile3.py {fn}.onnx {full}'
)
split_script = File(Dir("#selfdrive/modeld").File("external_pickle.py").abspath)
parts_node = lenv.Command(
parts,
[full_node, split_script, Value(str(CHUNK_BYTES))],
[f'python3 {split_script.abspath} {full} {out} {CHUNK_BYTES}', Delete(full)],
)
lenv.NoCache(parts_node)
lenv.AlwaysBuild(parts_node)
return parts_node
# Compile small models
for model_name in ['driving_vision', 'driving_policy', 'dmonitoring_model']:
tg_compile(tg_flags, model_name)
+4 -3
View File
@@ -16,8 +16,8 @@ from openpilot.common.realtime import config_realtime_process
from openpilot.common.transformations.model import dmonitoringmodel_intrinsics
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.common.file_chunker import read_file_chunked
from openpilot.selfdrive.modeld.parse_model_outputs import sigmoid, safe_exp
from openpilot.selfdrive.modeld.external_pickle import load_external_pickle
PROCESS_NAME = "selfdrive.modeld.dmonitoringmodeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
@@ -45,7 +45,7 @@ class ModelState:
self.tensor_inputs = {k: Tensor(v, device='NPY').realize() for k,v in self.numpy_inputs.items()}
self._blob_cache : dict[int, Tensor] = {}
self.image_warp = None
self.model_run = pickle.loads(read_file_chunked(str(MODEL_PKL_PATH)))
self.model_run = load_external_pickle(MODEL_PKL_PATH)
def run(self, buf: VisionBuf, calib: np.ndarray, transform: np.ndarray) -> tuple[np.ndarray, float]:
self.numpy_inputs['calib'][0,:] = calib
@@ -55,7 +55,8 @@ class ModelState:
if self.image_warp is None:
self.frame_buf_params = get_nv12_info(buf.width, buf.height)
warp_path = MODELS_DIR / f'dm_warp_{buf.width}x{buf.height}_tinygrad.pkl'
self.image_warp = pickle.loads(read_file_chunked(str(warp_path)))
with open(warp_path, "rb") as f:
self.image_warp = pickle.load(f)
ptr = buf.data.ctypes.data
# There is a ringbuffer of imgs, just cache tensors pointing to all of them
if ptr not in self._blob_cache:
+38
View File
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
import hashlib
import pickle
import sys
from pathlib import Path
def split_pickle(full_path: Path, out_prefix: Path, chunk_bytes: int) -> None:
data = full_path.read_bytes()
out_dir = out_prefix.parent
for p in out_dir.glob(f"{out_prefix.name}.data-*"):
p.unlink()
total = (len(data) + chunk_bytes - 1) // chunk_bytes
names = []
for i in range(0, len(data), chunk_bytes):
name = f"{out_prefix.name}.data-{(i // chunk_bytes) + 1:04d}-of-{total:04d}"
(out_dir / name).write_bytes(data[i:i + chunk_bytes])
names.append(name)
manifest = hashlib.sha256(data).hexdigest() + "\n" + "\n".join(names) + "\n"
(out_dir / (out_prefix.name + ".parts")).write_text(manifest)
def load_external_pickle(prefix: Path):
parts = prefix.parent / (prefix.name + ".parts")
lines = parts.read_text().splitlines()
expected_hash, chunk_names = lines[0], lines[1:]
data = bytearray()
for name in chunk_names:
data += (prefix.parent / name).read_bytes()
if hashlib.sha256(data).hexdigest() != expected_hash:
raise RuntimeError(f"hash mismatch loading {prefix}")
return pickle.loads(data)
if __name__ == "__main__":
split_pickle(Path(sys.argv[1]), Path(sys.argv[2]), int(sys.argv[3]))
+5 -4
View File
@@ -27,8 +27,8 @@ from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.selfdrive.controls.lib.drive_helpers import get_accel_from_plan, smooth_value, get_curvature_from_plan
from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.common.file_chunker import read_file_chunked
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan
from openpilot.selfdrive.modeld.external_pickle import load_external_pickle
PROCESS_NAME = "selfdrive.modeld.modeld"
@@ -178,8 +178,8 @@ class ModelState:
self.parser = Parser()
self.frame_buf_params : dict[str, tuple[int, int, int, int]] = {}
self.update_imgs = None
self.vision_run = pickle.loads(read_file_chunked(str(VISION_PKL_PATH)))
self.policy_run = pickle.loads(read_file_chunked(str(POLICY_PKL_PATH)))
self.vision_run = load_external_pickle(VISION_PKL_PATH)
self.policy_run = load_external_pickle(POLICY_PKL_PATH)
def slice_outputs(self, model_outputs: np.ndarray, output_slices: dict[str, slice]) -> dict[str, np.ndarray]:
parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in output_slices.items()}
@@ -196,7 +196,8 @@ class ModelState:
w, h = bufs[key].width, bufs[key].height
self.frame_buf_params[key] = get_nv12_info(w, h)
warp_path = MODELS_DIR / f'warp_{w}x{h}_tinygrad.pkl'
self.update_imgs = pickle.loads(read_file_chunked(str(warp_path)))
with open(warp_path, "rb") as f:
self.update_imgs = pickle.load(f)
for key in bufs.keys():
ptr = bufs[key].data.ctypes.data