Compare commits

..

8 Commits

Author SHA1 Message Date
royjr
9a2b0522f0 Merge branch 'master' into better-exp 2026-04-19 21:30:07 -04:00
royjr
d2b255aebc Merge branch 'master' into better-exp 2026-03-02 02:42:26 -05:00
royjr
0df6bac25e Merge branch 'master' into better-exp 2026-02-13 22:23:19 -05:00
royjr
01147241a3 Merge branch 'master' into better-exp 2026-02-05 00:51:27 -05:00
royjr
81cd484c5a Merge branch 'master' into better-exp 2025-12-29 09:57:36 -05:00
royjr
9aa7c39e55 Merge branch 'master' into better-exp 2025-12-21 17:07:47 -05:00
royjr
b93303094e Merge branch 'master' into better-exp 2025-12-14 12:19:11 -05:00
royjr
acfe52317b init 2025-12-02 21:56:48 -05:00
22 changed files with 456 additions and 581 deletions

View File

@@ -172,8 +172,8 @@ jobs:
output_file="${{ env.MODELS_DIR }}/${base_name}_tinygrad.pkl"
echo "Compiling: $onnx_file -> $output_file"
DEV=QCOM FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1 IMAGE=2 python3 "${{ env.TINYGRAD_PATH }}/examples/openpilot/compile3.py" "$onnx_file" "$output_file"
DEV=QCOM FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1 python3 "${{ env.MODELS_DIR }}/../get_model_metadata.py" "$onnx_file" || true
QCOM=1 python3 "${{ env.TINYGRAD_PATH }}/examples/openpilot/compile3.py" "$onnx_file" "$output_file"
DEV=QCOM FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 python3 "${{ env.MODELS_DIR }}/../get_model_metadata.py" "$onnx_file" || true
done
- name: Validate Model Outputs

1
.gitignore vendored
View File

@@ -44,7 +44,6 @@ bin/
config.json
compile_commands.json
compare_runtime*.html
selfdrive/modeld/models/tg_compiled_flags.json
# build artifacts
docs_site/

View File

@@ -342,6 +342,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
chillModeSwitched @24;
}
}

View File

@@ -22,19 +22,15 @@ from openpilot.sunnypilot.selfdrive.controls.lib.latcontrol_torque_ext import La
# Additionally, there is friction in the steering wheel that needs
# to be overcome to move it at all, this is compensated for too.
KP = 0.8
KI = 0.15
INTERP_SPEEDS = [1, 1.5, 2.0, 3.0, 5, 7.5, 10, 15, 30]
KP_INTERP = [150.0, 100.0, 30.0, 6.0, 2.0, 1.5, 1.2, 0.8, 0.5]
KI_INTERP = [0.06, 0.06, 0.06, 0.08, 0.08, 0.10, 0.12, 0.14, 0.16]
JERK_INTERP = [0.25, 0.25, 0.25, 0.23, 0.20, 0.18, 0.16, 0.15, 0.15]
MEAS_FILTER_TAU_INTERP = [0.07, 0.07, 0.07, 0.07, 0.07, 0.06, 0.06, 0.05, 0.01]
INTEGRATOR_DECAY_INTERP = [0.990, 0.990, 0.990, 0.992, 0.993, 0.995, 0.996, 0.998, 0.999]
INTEGRATOR_DECAY_FRAMES = 20
STRAIGHT_DAMP_THRESHOLD_INTERP = [0.03, 0.03, 0.03, 0.03, 0.03, 0.05, 0.05, 0.2, 0.2]
STRAIGHT_DAMP_MIN_INTERP = [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5]
STRAIGHT_DAMP_TAU = 0.5
KP_INTERP = [250, 120, 65, 30, 11.5, 5.5, 3.5, 2.0, KP]
LP_FILTER_CUTOFF_HZ = 1.2
JERK_LOOKAHEAD_SECONDS = 0.30
JERK_LOOKAHEAD_SECONDS = 0.19
JERK_GAIN = 0.3
LAT_ACCEL_REQUEST_BUFFER_SECONDS = 1.0
VERSION = 1
@@ -44,16 +40,13 @@ class LatControlTorque(LatControl):
self.torque_params = CP.lateralTuning.torque.as_builder()
self.torque_from_lateral_accel = CI.torque_from_lateral_accel()
self.lateral_accel_from_torque = CI.lateral_accel_from_torque()
self.pid = PIDController([INTERP_SPEEDS, KP_INTERP], [INTERP_SPEEDS, KI_INTERP], rate=1/self.dt)
self.pid = PIDController([INTERP_SPEEDS, KP_INTERP], KI, rate=1/self.dt)
self.update_limits()
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
self.lat_accel_request_buffer_len = int(LAT_ACCEL_REQUEST_BUFFER_SECONDS / self.dt)
self.lat_accel_request_buffer = deque([0.] * self.lat_accel_request_buffer_len , maxlen=self.lat_accel_request_buffer_len)
self.lookahead_frames = int(JERK_LOOKAHEAD_SECONDS / self.dt)
self.jerk_filter = FirstOrderFilter(0.0, 1 / (2 * np.pi * LP_FILTER_CUTOFF_HZ), self.dt)
self.measurement_filter = FirstOrderFilter(0.0, 0.04, self.dt)
self.integrator_decay_counter = 0
self.straight_damp_filter = FirstOrderFilter(1.0, STRAIGHT_DAMP_TAU, self.dt)
self.extension = LatControlTorqueExt(self, CP, CP_SP, CI)
@@ -75,9 +68,7 @@ class LatControlTorque(LatControl):
pid_log = log.ControlsState.LateralTorqueState.new_message()
pid_log.version = VERSION
measured_curvature = -VM.calc_curvature(math.radians(CS.steeringAngleDeg - params.angleOffsetDeg), CS.vEgo, params.roll)
meas_tau = float(np.interp(CS.vEgo, INTERP_SPEEDS, MEAS_FILTER_TAU_INTERP))
self.measurement_filter.update_alpha(meas_tau)
measurement = self.measurement_filter.update(measured_curvature * CS.vEgo ** 2)
measurement = measured_curvature * CS.vEgo ** 2
future_desired_lateral_accel = desired_curvature * CS.vEgo ** 2
self.lat_accel_request_buffer.append(future_desired_lateral_accel)
@@ -88,12 +79,7 @@ class LatControlTorque(LatControl):
delay_frames = int(np.clip(lat_delay / self.dt + 1, 1, self.lat_accel_request_buffer_len))
expected_lateral_accel = self.lat_accel_request_buffer[-delay_frames]
setpoint = expected_lateral_accel
raw_error = setpoint - measurement
damp_min = float(np.interp(CS.vEgo, INTERP_SPEEDS, STRAIGHT_DAMP_MIN_INTERP))
damp_threshold = float(np.interp(CS.vEgo, INTERP_SPEEDS, STRAIGHT_DAMP_THRESHOLD_INTERP))
damp_target = max(damp_min, min(1.0, abs(setpoint) / damp_threshold))
damp_factor = self.straight_damp_filter.update(damp_target)
error = raw_error * damp_factor
error = setpoint - measurement
lookahead_idx = int(np.clip(-delay_frames + self.lookahead_frames, -self.lat_accel_request_buffer_len+1, -2))
raw_lateral_jerk = (self.lat_accel_request_buffer[lookahead_idx+1] - self.lat_accel_request_buffer[lookahead_idx-1]) / (2 * self.dt)
@@ -102,8 +88,7 @@ class LatControlTorque(LatControl):
ff = gravity_adjusted_future_lateral_accel
# latAccelOffset corrects roll compensation bias from device roll misalignment relative to car roll
ff -= self.torque_params.latAccelOffset
jerk_gain = float(np.interp(CS.vEgo, INTERP_SPEEDS, JERK_INTERP))
ff += get_friction(error + jerk_gain * desired_lateral_jerk, lateral_accel_deadzone, FRICTION_THRESHOLD, self.torque_params)
ff += get_friction(error + JERK_GAIN * desired_lateral_jerk, lateral_accel_deadzone, FRICTION_THRESHOLD, self.torque_params)
if not active:
output_torque = 0.0
@@ -114,15 +99,6 @@ class LatControlTorque(LatControl):
freeze_integrator = steer_limited_by_safety or CS.steeringPressed or CS.vEgo < 5
output_lataccel = self.pid.update(pid_log.error, speed=CS.vEgo, feedforward=ff, freeze_integrator=freeze_integrator)
error_opposes_integrator = (self.pid.i > 0 and pid_log.error < 0) or (self.pid.i < 0 and pid_log.error > 0)
if error_opposes_integrator:
self.integrator_decay_counter = min(self.integrator_decay_counter + 1, INTEGRATOR_DECAY_FRAMES + 10)
else:
self.integrator_decay_counter = 0
if self.integrator_decay_counter >= INTEGRATOR_DECAY_FRAMES:
self.pid.i *= float(np.interp(CS.vEgo, INTERP_SPEEDS, INTEGRATOR_DECAY_INTERP))
output_torque = self.torque_from_lateral_accel(output_lataccel, self.torque_params)
# Lateral acceleration torque controller extension updates

View File

@@ -32,12 +32,12 @@ COST_E_DIM = 5
COST_DIM = COST_E_DIM + 1
CONSTR_DIM = 4
X_EGO_OBSTACLE_COST = 2.
X_EGO_OBSTACLE_COST = 3.
X_EGO_COST = 0.
V_EGO_COST = 0.
A_EGO_COST = 0.
J_EGO_COST = 5.
A_CHANGE_COST = 350.
A_CHANGE_COST = 200.
DANGER_ZONE_COST = 100.
CRASH_DISTANCE = .25
LEAD_DANGER_FACTOR = 0.75
@@ -53,7 +53,7 @@ T_IDXS_LST = [index_function(idx, max_val=MAX_T, max_idx=N) for idx in range(N+1
T_IDXS = np.array(T_IDXS_LST)
FCW_IDXS = T_IDXS < 5.0
T_DIFFS = np.diff(T_IDXS, prepend=[0.])
COMFORT_BRAKE = 2.
COMFORT_BRAKE = 2.5
STOP_DISTANCE = 6.0
CRUISE_MIN_ACCEL = -1.2
CRUISE_MAX_ACCEL = 1.6

View File

@@ -1,22 +1,6 @@
import glob
import json
import os
from itertools import product
from SCons.Script import Value
import glob
from openpilot.common.file_chunker import chunk_file, get_chunk_paths
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE, DM_INPUT_SIZE
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.modeld.helpers import CompileConfig
from tinygrad import Device
CAMERA_CONFIGS = [
(_ar_ox_fisheye.width, _ar_ox_fisheye.height), # tici: 1928x1208
(_os_fisheye.width, _os_fisheye.height), # mici: 1344x760
]
MODELD_CONFIGS = [CompileConfig(cam_w, cam_h, prepare_only, 'driving_')
for (cam_w, cam_h), prepare_only in product(CAMERA_CONFIGS, [True, False])]
DM_WARP_CONFIGS = [CompileConfig(cam_w, cam_h, True, 'dm_') for cam_w, cam_h in CAMERA_CONFIGS]
Import('env', 'arch')
chunker_file = File("#common/file_chunker.py")
@@ -29,72 +13,31 @@ tinygrad_files = ["#"+x for x in glob.glob(env.Dir("#tinygrad_repo").relpath + "
def estimate_pickle_max_size(onnx_size):
return 1.2 * onnx_size + 10 * 1024 * 1024 # 20% + 10MB is plenty
# get fastest TG config
available = set(Device.get_available_devices())
if 'CUDA' in available:
tg_backend = 'CUDA'
tg_flags = f'DEV={tg_backend}'
elif 'QCOM' in available:
tg_backend = 'QCOM'
tg_flags = f'DEV={tg_backend} FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1'
else:
tg_backend = 'CPU' if arch == 'Darwin' else 'CPU:LLVM'
# THREADS=0 is need to prevent bug: https://github.com/tinygrad/tinygrad/issues/14689
tg_flags = f'DEV={tg_backend} THREADS=0'
def write_tg_compiled_flags(target, source, env):
with open(str(target[0]), "w") as f:
json.dump({"DEV": tg_backend}, f)
f.write("\n")
compiled_flags_node = lenv.Command(
File("models/tg_compiled_flags.json").abspath,
tinygrad_files + [Value(tg_backend)],
write_tg_compiled_flags,
)
# tinygrad calls brew which needs a $HOME in the env
mac_brew_string = f'HOME={os.path.expanduser("~")}' if arch == 'Darwin' else ''
# compile warp
# THREADS=0 is need to prevent bug: https://github.com/tinygrad/tinygrad/issues/14689
tg_flags = {
'larch64': 'DEV=QCOM FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0',
'Darwin': f'DEV=CPU THREADS=0 HOME={os.path.expanduser("~")}', # tinygrad calls brew which needs a $HOME in the env
}.get(arch, 'DEV=CPU CPU_LLVM=1 THREADS=0')
# Get model metadata
for model_name in ['driving_vision', 'driving_policy', 'dmonitoring_model']:
fn = File(f"models/{model_name}").abspath
script_files = [File(Dir("#selfdrive/modeld").File("get_model_metadata.py").abspath)]
cmd = f'{tg_flags} {mac_brew_string} python3 {Dir("#selfdrive/modeld").abspath}/get_model_metadata.py {fn}.onnx'
lenv.Command(fn + "_metadata.pkl", [fn + ".onnx"] + tinygrad_files + script_files + [compiled_flags_node], cmd)
cmd = f'{tg_flags} python3 {Dir("#selfdrive/modeld").abspath}/get_model_metadata.py {fn}.onnx'
lenv.Command(fn + "_metadata.pkl", [fn + ".onnx"] + tinygrad_files + script_files, cmd)
image_flag = {
'larch64': 'IMAGE=2',
}.get(arch, 'IMAGE=0')
modeld_dir = Dir("#selfdrive/modeld").abspath
compile_modeld_script = [File(f"{modeld_dir}/compile_modeld.py")]
compile_dm_warp_script = [File(f"{modeld_dir}/compile_dm_warp.py")]
driving_onnx_deps = [File(f"models/{m}.onnx").abspath for m in ['driving_vision', 'driving_policy']]
driving_metadata_deps = [File(f"models/{m}_metadata.pkl").abspath for m in ['driving_vision', 'driving_policy']]
model_w, model_h = MEDMODEL_INPUT_SIZE
frame_skip = ModelConstants.MODEL_RUN_FREQ // ModelConstants.MODEL_CONTEXT_FREQ
for cfg in MODELD_CONFIGS:
cmd = (f'{tg_flags} {mac_brew_string} {image_flag} python3 {modeld_dir}/compile_modeld.py '
f'--model-size {model_w}x{model_h} '
f'--nv12 {",".join(str(x) for x in cfg.nv12)} '
f'--vision-onnx {File("models/driving_vision.onnx").abspath} '
f'--policy-onnx {File("models/driving_policy.onnx").abspath} '
f'--output {cfg.pkl_path} --frame-skip {frame_skip}'
+ (' --prepare-only' if cfg.prepare_only else ''))
node = lenv.Command(cfg.pkl_path, tinygrad_files + compile_modeld_script + driving_onnx_deps + driving_metadata_deps + [chunker_file, compiled_flags_node], cmd)
onnx_sizes_sum = sum(os.path.getsize(f) for f in driving_onnx_deps)
chunk_targets = get_chunk_paths(cfg.pkl_path, estimate_pickle_max_size(onnx_sizes_sum))
def do_chunk(target, source, env, pkl=cfg.pkl_path, chunks=chunk_targets):
chunk_file(pkl, chunks)
lenv.Command(chunk_targets, node, do_chunk)
dm_w, dm_h = DM_INPUT_SIZE
for cfg in DM_WARP_CONFIGS:
cmd = (f'{tg_flags} {mac_brew_string} {image_flag} python3 {modeld_dir}/compile_dm_warp.py '
f'--nv12 {",".join(str(x) for x in cfg.nv12)} --warp-to {dm_w}x{dm_h} '
f'--output {cfg.pkl_path}')
lenv.Command(cfg.pkl_path, tinygrad_files + compile_dm_warp_script + compile_modeld_script + [compiled_flags_node], cmd)
script_files = [File(Dir("#selfdrive/modeld").File("compile_warp.py").abspath)]
compile_warp_cmd = f'{tg_flags} python3 {Dir("#selfdrive/modeld").abspath}/compile_warp.py '
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
warp_targets = []
for cam in [_ar_ox_fisheye, _os_fisheye]:
w, h = cam.width, cam.height
warp_targets += [File(f"models/warp_{w}x{h}_tinygrad.pkl").abspath, File(f"models/dm_warp_{w}x{h}_tinygrad.pkl").abspath]
lenv.Command(warp_targets, tinygrad_files + script_files, compile_warp_cmd)
def tg_compile(flags, model_name):
pythonpath_string = 'PYTHONPATH="${PYTHONPATH}:' + env.Dir("#tinygrad_repo").abspath + '"'
@@ -104,7 +47,7 @@ def tg_compile(flags, model_name):
chunk_targets = get_chunk_paths(pkl, estimate_pickle_max_size(os.path.getsize(onnx_path)))
compile_node = lenv.Command(
pkl,
[onnx_path] + tinygrad_files + [chunker_file, compiled_flags_node],
[onnx_path] + tinygrad_files + [chunker_file],
f'{pythonpath_string} {flags} {image_flag} python3 {Dir("#tinygrad_repo").abspath}/examples/openpilot/compile3.py {fn}.onnx {pkl}',
)
def do_chunk(target, source, env):
@@ -115,4 +58,7 @@ def tg_compile(flags, model_name):
do_chunk,
)
tg_compile(tg_flags, 'dmonitoring_model')
# Compile small models
for model_name in ['driving_vision', 'driving_policy', 'dmonitoring_model']:
tg_compile(tg_flags, model_name)

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env python3
import argparse
import pickle
import time
from tinygrad.tensor import Tensor
from tinygrad.device import Device
from tinygrad.engine.jit import TinyJit
from openpilot.selfdrive.modeld.compile_modeld import NV12Frame, warp_perspective_tinygrad, _parse_size, _parse_nv12
def make_warp_dm(nv12: NV12Frame, dm_w, dm_h):
cam_w, cam_h, stride, _, _, _ = nv12
stride_pad = stride - cam_w
def warp_dm(input_frame, M_inv):
M_inv = M_inv.to(Device.DEFAULT).realize()
return warp_perspective_tinygrad(input_frame[:cam_h*stride], M_inv,
(dm_w, dm_h), (cam_h, cam_w), stride_pad).reshape(-1, dm_h * dm_w)
return warp_dm
def compile_dm_warp(nv12: NV12Frame, dm_w, dm_h, pkl_path):
print(f"Compiling DM warp for {nv12.width}x{nv12.height} -> {dm_w}x{dm_h}...")
warp_dm_jit = TinyJit(make_warp_dm(nv12, dm_w, dm_h), prune=True)
for i in range(10):
frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
M_inv = Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')
Device.default.synchronize()
st = time.perf_counter()
warp_dm_jit(frame, M_inv).realize()
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/10] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
with open(pkl_path, "wb") as f:
pickle.dump(warp_dm_jit, f)
print(f" Saved to {pkl_path}")
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument('--nv12', type=_parse_nv12, required=True,
help=f'NV12 frame layout: {",".join(NV12Frame._fields)}')
p.add_argument('--warp-to', type=_parse_size, required=True, help='DM input WxH')
p.add_argument('--output', required=True)
args = p.parse_args()
dm_w, dm_h = args.warp_to
compile_dm_warp(args.nv12, dm_w, dm_h, args.output)

View File

@@ -1,253 +0,0 @@
#!/usr/bin/env python3
import argparse
import pickle
import time
from functools import partial
from collections import namedtuple
import numpy as np
from tinygrad.tensor import Tensor
from tinygrad.helpers import Context
from tinygrad.device import Device
from tinygrad.engine.jit import TinyJit
from tinygrad.nn.onnx import OnnxRunner
# https://github.com/tinygrad/tinygrad/issues/15682
from tinygrad.uop.ops import UOp, Ops
_orig = UOp.__reduce__
UOp.__reduce__ = lambda self: (UOp.unique, ()) if self.op is Ops.UNIQUE else _orig(self)
NV12Frame = namedtuple("NV12Frame", ['width', 'height', 'stride', 'y_height', 'uv_height', 'size'])
UV_SCALE_MATRIX = np.array([[0.5, 0, 0], [0, 0.5, 0], [0, 0, 1]], dtype=np.float32)
UV_SCALE_MATRIX_INV = np.linalg.inv(UV_SCALE_MATRIX)
def warp_perspective_tinygrad(src_flat, M_inv, dst_shape, src_shape, stride_pad):
w_dst, h_dst = dst_shape
h_src, w_src = src_shape
x = Tensor.arange(w_dst).reshape(1, w_dst).expand(h_dst, w_dst).reshape(-1)
y = Tensor.arange(h_dst).reshape(h_dst, 1).expand(h_dst, w_dst).reshape(-1)
# inline 3x3 matmul as elementwise to avoid reduce op (enables fusion with gather)
src_x = M_inv[0, 0] * x + M_inv[0, 1] * y + M_inv[0, 2]
src_y = M_inv[1, 0] * x + M_inv[1, 1] * y + M_inv[1, 2]
src_w = M_inv[2, 0] * x + M_inv[2, 1] * y + M_inv[2, 2]
src_x = src_x / src_w
src_y = src_y / src_w
x_nn_clipped = Tensor.round(src_x).clip(0, w_src - 1).cast('int')
y_nn_clipped = Tensor.round(src_y).clip(0, h_src - 1).cast('int')
idx = y_nn_clipped * (w_src + stride_pad) + x_nn_clipped
return src_flat[idx]
def frames_to_tensor(frames):
H = (frames.shape[0] * 2) // 3
W = frames.shape[1]
in_img1 = Tensor.cat(frames[0:H:2, 0::2],
frames[1:H:2, 0::2],
frames[0:H:2, 1::2],
frames[1:H:2, 1::2],
frames[H:H+H//4].reshape((H//2, W//2)),
frames[H+H//4:H+H//2].reshape((H//2, W//2)), dim=0).reshape((6, H//2, W//2))
return in_img1
def make_frame_prepare(nv12: NV12Frame, model_w, model_h):
cam_w, cam_h, stride, y_height, uv_height, _ = nv12
uv_offset = stride * y_height
stride_pad = stride - cam_w
def frame_prepare_tinygrad(input_frame, M_inv):
# UV_SCALE @ M_inv @ UV_SCALE_INV simplifies to elementwise scaling
M_inv_uv = M_inv * Tensor([[1.0, 1.0, 0.5], [1.0, 1.0, 0.5], [2.0, 2.0, 1.0]])
# deinterleave NV12 UV plane (UVUV... -> separate U, V)
uv = input_frame[uv_offset:uv_offset + uv_height * stride].reshape(uv_height, stride)
with Context(SPLIT_REDUCEOP=0):
y = warp_perspective_tinygrad(input_frame[:cam_h*stride],
M_inv, (model_w, model_h),
(cam_h, cam_w), stride_pad).realize()
u = warp_perspective_tinygrad(uv[:cam_h//2, :cam_w:2].flatten(),
M_inv_uv, (model_w//2, model_h//2),
(cam_h//2, cam_w//2), 0).realize()
v = warp_perspective_tinygrad(uv[:cam_h//2, 1:cam_w:2].flatten(),
M_inv_uv, (model_w//2, model_h//2),
(cam_h//2, cam_w//2), 0).realize()
yuv = y.cat(u).cat(v).reshape((model_h * 3 // 2, model_w))
tensor = frames_to_tensor(yuv)
return tensor
return frame_prepare_tinygrad
def make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip):
img = vision_input_shapes['img'] # (1, 12, 128, 256)
n_frames = img[1] // 6
img_buf_shape = (frame_skip * (n_frames - 1) + 1, 6, img[2], img[3])
fb = policy_input_shapes['features_buffer'] # (1, 25, 512)
dp = policy_input_shapes['desire_pulse'] # (1, 25, 8)
tc = policy_input_shapes['traffic_convention'] # (1, 2)
npy = {
'desire': np.zeros(dp[2], dtype=np.float32),
'traffic_convention': np.zeros(tc, dtype=np.float32),
'tfm': np.zeros((3, 3), dtype=np.float32),
'big_tfm': np.zeros((3, 3), dtype=np.float32),
}
input_queues = {
'img_q': Tensor.zeros(img_buf_shape, dtype='uint8').contiguous().realize(),
'big_img_q': Tensor.zeros(img_buf_shape, dtype='uint8').contiguous().realize(),
'feat_q': Tensor.zeros(frame_skip * (fb[1] - 1) + 1, fb[0], fb[2]).contiguous().realize(),
'desire_q': Tensor.zeros(frame_skip * dp[1], dp[0], dp[2]).contiguous().realize(),
**{k: Tensor(v, device='NPY').realize() for k, v in npy.items()},
}
return input_queues, npy
def shift_and_sample(buf, new_val, sample_fn):
buf.assign(buf[1:].cat(new_val, dim=0).contiguous())
return sample_fn(buf)
def sample_skip(buf, frame_skip):
return buf[::frame_skip].contiguous().flatten(0, 1).unsqueeze(0)
def sample_desire(buf, frame_skip):
return buf.reshape(-1, frame_skip, *buf.shape[1:]).max(1).flatten(0, 1).unsqueeze(0)
def make_run_policy(vision_runner, policy_runner, nv12: NV12Frame, model_w, model_h,
vision_features_slice, frame_skip, prepare_only=False):
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
sample_skip_fn = partial(sample_skip, frame_skip=frame_skip)
sample_desire_fn = partial(sample_desire, frame_skip=frame_skip)
def run_policy(img_q, big_img_q, feat_q, desire_q, desire, traffic_convention, tfm, big_tfm, frame, big_frame):
tfm = tfm.to(Device.DEFAULT)
big_tfm = big_tfm.to(Device.DEFAULT)
desire = desire.to(Device.DEFAULT)
traffic_convention = traffic_convention.to(Device.DEFAULT)
Tensor.realize(tfm, big_tfm, desire, traffic_convention)
img = shift_and_sample(img_q, frame_prepare(frame, tfm).unsqueeze(0), sample_skip_fn)
big_img = shift_and_sample(big_img_q, frame_prepare(big_frame, big_tfm).unsqueeze(0), sample_skip_fn)
if prepare_only:
return img, big_img
vision_out = next(iter(vision_runner({'img': img, 'big_img': big_img}).values())).cast('float32')
new_feat = vision_out[:, vision_features_slice].reshape(1, -1).unsqueeze(0)
feat_buf = shift_and_sample(feat_q, new_feat, sample_skip_fn)
desire_buf = shift_and_sample(desire_q, desire.reshape(1, 1, -1), sample_desire_fn)
inputs = {'features_buffer': feat_buf, 'desire_pulse': desire_buf, 'traffic_convention': traffic_convention}
policy_out = next(iter(policy_runner(inputs).values())).cast('float32')
return vision_out, policy_out
return run_policy
def compile_modeld(nv12: NV12Frame, model_w, model_h, prepare_only, frame_skip,
vision_onnx, policy_onnx, pkl_path):
from get_model_metadata import metadata_path_for
print(f"Compiling combined policy JIT for {nv12.width}x{nv12.height} (prepare_only={prepare_only})...")
vision_runner = OnnxRunner(vision_onnx)
policy_runner = OnnxRunner(policy_onnx)
with open(metadata_path_for(vision_onnx), 'rb') as f:
vision_metadata = pickle.load(f)
vision_features_slice = vision_metadata['output_slices']['hidden_state']
vision_input_shapes = vision_metadata['input_shapes']
with open(metadata_path_for(policy_onnx), 'rb') as f:
policy_input_shapes = pickle.load(f)['input_shapes']
_run = make_run_policy(vision_runner, policy_runner, nv12, model_w, model_h,
vision_features_slice, frame_skip, prepare_only)
run_policy_jit = TinyJit(_run, prune=True)
N_RUNS = 3
SEED = 42
def random_inputs_run_fn(fn, seed, test_val=None, test_buffers=None, expect_match=True):
input_queues, npy = make_input_queues(vision_input_shapes, policy_input_shapes, frame_skip)
np.random.seed(seed)
Tensor.manual_seed(seed)
for i in range(N_RUNS):
frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
big_frame = Tensor.randint(nv12.size, low=0, high=256, dtype='uint8').realize()
for v in npy.values():
v[:] = np.random.randn(*v.shape).astype(v.dtype)
Device.default.synchronize()
st = time.perf_counter()
outs = fn(**input_queues, frame=frame, big_frame=big_frame)
mt = time.perf_counter()
for o in outs:
# .realize() not needed once jitted, but needed for unjitted fn
o.realize()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/{N_RUNS}] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
val = [np.copy(v.numpy()) for v in outs]
buffers = [np.copy(v.numpy().copy()) for v in input_queues.values()]
if test_val is not None:
match = all(np.array_equal(a, b) for a, b in zip(val, test_val, strict=True))
assert match == expect_match, f"outputs {'differ from' if expect_match else 'match'} baseline (seed={seed})"
if test_buffers is not None:
match = all(np.array_equal(a, b) for a, b in zip(buffers, test_buffers, strict=True))
assert match == expect_match, f"buffers {'differ from' if expect_match else 'match'} baseline (seed={seed})"
return fn, val, buffers
print('run unjitted')
_, test_val, test_buffers = random_inputs_run_fn(_run, seed=SEED)
print('capture + replay')
run_policy_jit, _, _ = random_inputs_run_fn(run_policy_jit, SEED, test_val, test_buffers)
print('pickle round trip')
with open(pkl_path, "wb") as f:
pickle.dump(run_policy_jit, f)
print(f" Saved to {pkl_path}")
with open(pkl_path, "rb") as f:
run_policy_jit = pickle.load(f)
random_inputs_run_fn(run_policy_jit, SEED, test_val, test_buffers, expect_match=True)
random_inputs_run_fn(run_policy_jit, SEED+1, test_val, test_buffers, expect_match=False)
def _parse_size(s):
w, h = s.lower().split('x')
return int(w), int(h)
def _parse_nv12(s):
parts = s.split(',')
assert len(parts) == len(NV12Frame._fields), \
f"--nv12 expects {','.join(NV12Frame._fields)} (got {s!r})"
return NV12Frame(*(int(x) for x in parts))
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument('--model-size', type=_parse_size, required=True, help='model input WxH')
p.add_argument('--nv12', type=_parse_nv12, required=True,
help=f'NV12 frame layout: {",".join(NV12Frame._fields)}')
p.add_argument('--vision-onnx', required=True)
p.add_argument('--policy-onnx', required=True)
p.add_argument('--output', required=True)
p.add_argument('--prepare-only', action='store_true')
p.add_argument('--frame-skip', type=int, required=True)
args = p.parse_args()
model_w, model_h = args.model_size
compile_modeld(args.nv12, model_w, model_h, args.prepare_only, args.frame_skip,
args.vision_onnx, args.policy_onnx, args.output)

209
selfdrive/modeld/compile_warp.py Executable file
View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python3
import time
import pickle
import numpy as np
from pathlib import Path
from tinygrad.tensor import Tensor
from tinygrad.helpers import Context
from tinygrad.device import Device
from tinygrad.engine.jit import TinyJit
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE, DM_INPUT_SIZE
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
MODELS_DIR = Path(__file__).parent / 'models'
CAMERA_CONFIGS = [
(_ar_ox_fisheye.width, _ar_ox_fisheye.height), # tici: 1928x1208
(_os_fisheye.width, _os_fisheye.height), # mici: 1344x760
]
UV_SCALE_MATRIX = np.array([[0.5, 0, 0], [0, 0.5, 0], [0, 0, 1]], dtype=np.float32)
UV_SCALE_MATRIX_INV = np.linalg.inv(UV_SCALE_MATRIX)
IMG_BUFFER_SHAPE = (30, MEDMODEL_INPUT_SIZE[1] // 2, MEDMODEL_INPUT_SIZE[0] // 2)
def warp_pkl_path(w, h):
return MODELS_DIR / f'warp_{w}x{h}_tinygrad.pkl'
def dm_warp_pkl_path(w, h):
return MODELS_DIR / f'dm_warp_{w}x{h}_tinygrad.pkl'
def warp_perspective_tinygrad(src_flat, M_inv, dst_shape, src_shape, stride_pad):
w_dst, h_dst = dst_shape
h_src, w_src = src_shape
x = Tensor.arange(w_dst).reshape(1, w_dst).expand(h_dst, w_dst).reshape(-1)
y = Tensor.arange(h_dst).reshape(h_dst, 1).expand(h_dst, w_dst).reshape(-1)
# inline 3x3 matmul as elementwise to avoid reduce op (enables fusion with gather)
src_x = M_inv[0, 0] * x + M_inv[0, 1] * y + M_inv[0, 2]
src_y = M_inv[1, 0] * x + M_inv[1, 1] * y + M_inv[1, 2]
src_w = M_inv[2, 0] * x + M_inv[2, 1] * y + M_inv[2, 2]
src_x = src_x / src_w
src_y = src_y / src_w
x_nn_clipped = Tensor.round(src_x).clip(0, w_src - 1).cast('int')
y_nn_clipped = Tensor.round(src_y).clip(0, h_src - 1).cast('int')
idx = y_nn_clipped * (w_src + stride_pad) + x_nn_clipped
return src_flat[idx]
def frames_to_tensor(frames, model_w, model_h):
H = (frames.shape[0] * 2) // 3
W = frames.shape[1]
in_img1 = Tensor.cat(frames[0:H:2, 0::2],
frames[1:H:2, 0::2],
frames[0:H:2, 1::2],
frames[1:H:2, 1::2],
frames[H:H+H//4].reshape((H//2, W//2)),
frames[H+H//4:H+H//2].reshape((H//2, W//2)), dim=0).reshape((6, H//2, W//2))
return in_img1
def make_frame_prepare(cam_w, cam_h, model_w, model_h):
stride, y_height, uv_height, _ = get_nv12_info(cam_w, cam_h)
uv_offset = stride * y_height
stride_pad = stride - cam_w
def frame_prepare_tinygrad(input_frame, M_inv):
# UV_SCALE @ M_inv @ UV_SCALE_INV simplifies to elementwise scaling
M_inv_uv = M_inv * Tensor([[1.0, 1.0, 0.5], [1.0, 1.0, 0.5], [2.0, 2.0, 1.0]])
# deinterleave NV12 UV plane (UVUV... -> separate U, V)
uv = input_frame[uv_offset:uv_offset + uv_height * stride].reshape(uv_height, stride)
with Context(SPLIT_REDUCEOP=0):
y = warp_perspective_tinygrad(input_frame[:cam_h*stride],
M_inv, (model_w, model_h),
(cam_h, cam_w), stride_pad).realize()
u = warp_perspective_tinygrad(uv[:cam_h//2, :cam_w:2].flatten(),
M_inv_uv, (model_w//2, model_h//2),
(cam_h//2, cam_w//2), 0).realize()
v = warp_perspective_tinygrad(uv[:cam_h//2, 1:cam_w:2].flatten(),
M_inv_uv, (model_w//2, model_h//2),
(cam_h//2, cam_w//2), 0).realize()
yuv = y.cat(u).cat(v).reshape((model_h * 3 // 2, model_w))
tensor = frames_to_tensor(yuv, model_w, model_h)
return tensor
return frame_prepare_tinygrad
def make_update_img_input(frame_prepare, model_w, model_h):
def update_img_input_tinygrad(tensor, frame, M_inv):
M_inv = M_inv.to(Device.DEFAULT)
new_img = frame_prepare(frame, M_inv)
full_buffer = tensor[6:].cat(new_img, dim=0).contiguous()
return full_buffer, Tensor.cat(full_buffer[:6], full_buffer[-6:], dim=0).contiguous().reshape(1, 12, model_h//2, model_w//2)
return update_img_input_tinygrad
def make_update_both_imgs(frame_prepare, model_w, model_h):
update_img = make_update_img_input(frame_prepare, model_w, model_h)
def update_both_imgs_tinygrad(calib_img_buffer, new_img, M_inv,
calib_big_img_buffer, new_big_img, M_inv_big):
calib_img_buffer, calib_img_pair = update_img(calib_img_buffer, new_img, M_inv)
calib_big_img_buffer, calib_big_img_pair = update_img(calib_big_img_buffer, new_big_img, M_inv_big)
return calib_img_buffer, calib_img_pair, calib_big_img_buffer, calib_big_img_pair
return update_both_imgs_tinygrad
def make_warp_dm(cam_w, cam_h, dm_w, dm_h):
stride, y_height, _, _ = get_nv12_info(cam_w, cam_h)
stride_pad = stride - cam_w
def warp_dm(input_frame, M_inv):
M_inv = M_inv.to(Device.DEFAULT)
result = warp_perspective_tinygrad(input_frame[:cam_h*stride], M_inv, (dm_w, dm_h), (cam_h, cam_w), stride_pad).reshape(-1, dm_h * dm_w)
return result
return warp_dm
def compile_modeld_warp(cam_w, cam_h):
model_w, model_h = MEDMODEL_INPUT_SIZE
_, _, _, yuv_size = get_nv12_info(cam_w, cam_h)
print(f"Compiling modeld warp for {cam_w}x{cam_h}...")
frame_prepare = make_frame_prepare(cam_w, cam_h, model_w, model_h)
update_both_imgs = make_update_both_imgs(frame_prepare, model_w, model_h)
update_img_jit = TinyJit(update_both_imgs, prune=True)
full_buffer = Tensor.zeros(IMG_BUFFER_SHAPE, dtype='uint8').contiguous().realize()
big_full_buffer = Tensor.zeros(IMG_BUFFER_SHAPE, dtype='uint8').contiguous().realize()
full_buffer_np = np.zeros(IMG_BUFFER_SHAPE, dtype=np.uint8)
big_full_buffer_np = np.zeros(IMG_BUFFER_SHAPE, dtype=np.uint8)
for i in range(10):
new_frame_np = (32 * np.random.randn(yuv_size).astype(np.float32) + 128).clip(0, 255).astype(np.uint8)
img_inputs = [full_buffer,
Tensor.from_blob(new_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
new_big_frame_np = (32 * np.random.randn(yuv_size).astype(np.float32) + 128).clip(0, 255).astype(np.uint8)
big_img_inputs = [big_full_buffer,
Tensor.from_blob(new_big_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
inputs = img_inputs + big_img_inputs
Device.default.synchronize()
inputs_np = [x.numpy() for x in inputs]
inputs_np[0] = full_buffer_np
inputs_np[3] = big_full_buffer_np
st = time.perf_counter()
out = update_img_jit(*inputs)
full_buffer = out[0].contiguous().realize().clone()
big_full_buffer = out[2].contiguous().realize().clone()
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/10] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
pkl_path = warp_pkl_path(cam_w, cam_h)
with open(pkl_path, "wb") as f:
pickle.dump(update_img_jit, f)
print(f" Saved to {pkl_path}")
jit = pickle.load(open(pkl_path, "rb"))
jit(*inputs)
def compile_dm_warp(cam_w, cam_h):
dm_w, dm_h = DM_INPUT_SIZE
_, _, _, yuv_size = get_nv12_info(cam_w, cam_h)
print(f"Compiling DM warp for {cam_w}x{cam_h}...")
warp_dm = make_warp_dm(cam_w, cam_h, dm_w, dm_h)
warp_dm_jit = TinyJit(warp_dm, prune=True)
for i in range(10):
inputs = [Tensor.from_blob((32 * Tensor.randn(yuv_size,) + 128).cast(dtype='uint8').realize().numpy().ctypes.data, (yuv_size,), dtype='uint8'),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
Device.default.synchronize()
st = time.perf_counter()
warp_dm_jit(*inputs)
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
print(f" [{i+1}/10] enqueue {(mt-st)*1e3:6.2f} ms -- total {(et-st)*1e3:6.2f} ms")
pkl_path = dm_warp_pkl_path(cam_w, cam_h)
with open(pkl_path, "wb") as f:
pickle.dump(warp_dm_jit, f)
print(f" Saved to {pkl_path}")
def run_and_save_pickle():
for cam_w, cam_h in CAMERA_CONFIGS:
compile_modeld_warp(cam_w, cam_h)
compile_dm_warp(cam_w, cam_h)
if __name__ == "__main__":
run_and_save_pickle()

View File

@@ -1,16 +1,12 @@
#!/usr/bin/env python3
import os
from openpilot.selfdrive.modeld.tinygrad_helpers import MODELS_DIR, set_tinygrad_backend_from_compiled_flags
set_tinygrad_backend_from_compiled_flags()
# FIXME-SP: remove once we bump tg
from openpilot.system.hardware import TICI
os.environ['DEV'] = 'QCOM' if TICI else 'CPU'
from tinygrad.tensor import Tensor
import time
import pickle
import numpy as np
from pathlib import Path
from cereal import messaging
from cereal.messaging import PubMaster, SubMaster
@@ -25,8 +21,9 @@ from openpilot.selfdrive.modeld.parse_model_outputs import sigmoid, safe_exp
PROCESS_NAME = "selfdrive.modeld.dmonitoringmodeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_PKL_PATH = MODELS_DIR / 'dmonitoring_model_tinygrad.pkl'
METADATA_PATH = MODELS_DIR / 'dmonitoring_model_metadata.pkl'
MODEL_PKL_PATH = Path(__file__).parent / 'models/dmonitoring_model_tinygrad.pkl'
METADATA_PATH = Path(__file__).parent / 'models/dmonitoring_model_metadata.pkl'
MODELS_DIR = Path(__file__).parent / 'models'
class ModelState:
inputs: dict[str, np.ndarray]

View File

@@ -7,10 +7,6 @@ from typing import Any
from tinygrad.nn.onnx import OnnxPBParser
def metadata_path_for(onnx_path) -> pathlib.Path:
p = pathlib.Path(onnx_path)
return p.parent / (p.stem + '_metadata.pkl')
class MetadataOnnxPBParser(OnnxPBParser):
def _parse_ModelProto(self) -> dict:
@@ -52,7 +48,7 @@ if __name__ == "__main__":
'output_shapes': dict(get_name_and_shape(x) for x in model["graph"]["output"]),
}
metadata_path = metadata_path_for(model_path)
metadata_path = model_path.parent / (model_path.stem + '_metadata.pkl')
with open(metadata_path, 'wb') as f:
pickle.dump(metadata, f)

View File

@@ -1,31 +0,0 @@
import json
import os
from dataclasses import dataclass
from pathlib import Path
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
MODELS_DIR = Path(__file__).resolve().parent / 'models'
COMPILED_FLAGS_PATH = MODELS_DIR / 'tg_compiled_flags.json'
def set_tinygrad_backend_from_compiled_flags() -> None:
if os.path.isfile(COMPILED_FLAGS_PATH):
with open(COMPILED_FLAGS_PATH) as f:
os.environ['DEV'] = str(json.load(f)['DEV'])
@dataclass
class CompileConfig:
cam_w: int
cam_h: int
prepare_only: bool
prefix: str
@property
def pkl_path(self):
return str(MODELS_DIR / f'{self.prefix}{"warp_" if self.prepare_only else ""}{self.cam_w}x{self.cam_h}_tinygrad.pkl')
@property
def nv12(self):
return (self.cam_w, self.cam_h, *get_nv12_info(self.cam_w, self.cam_h))

View File

@@ -1,8 +1,7 @@
#!/usr/bin/env python3
import os
from openpilot.selfdrive.modeld.helpers import MODELS_DIR, CompileConfig, set_tinygrad_backend_from_compiled_flags
set_tinygrad_backend_from_compiled_flags()
from openpilot.system.hardware import TICI
os.environ['DEV'] = 'QCOM' if TICI else 'CPU'
USBGPU = "USBGPU" in os.environ
if USBGPU:
os.environ['DEV'] = 'AMD'
@@ -13,6 +12,7 @@ import pickle
import numpy as np
import cereal.messaging as messaging
from cereal import car, log
from pathlib import Path
from cereal.messaging import PubMaster, SubMaster
from msgq.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
from opendbc.car.car_helpers import get_demo_car_params
@@ -26,7 +26,6 @@ from openpilot.common.transformations.model import get_warp_matrix
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.selfdrive.controls.lib.drive_helpers import get_accel_from_plan, smooth_value, get_curvature_from_plan
from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.compile_modeld import make_input_queues
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.common.file_chunker import read_file_chunked
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan
@@ -38,13 +37,18 @@ from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
PROCESS_NAME = "selfdrive.modeld.modeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODELS_DIR = Path(__file__).parent / 'models'
VISION_PKL_PATH = MODELS_DIR / 'driving_vision_tinygrad.pkl'
VISION_METADATA_PATH = MODELS_DIR / 'driving_vision_metadata.pkl'
POLICY_PKL_PATH = MODELS_DIR / 'driving_policy_tinygrad.pkl'
POLICY_METADATA_PATH = MODELS_DIR / 'driving_policy_metadata.pkl'
LAT_SMOOTH_SECONDS = 0.0
LONG_SMOOTH_SECONDS = 0.3
MIN_LAT_CONTROL_SPEED = 0.3
IMG_QUEUE_SHAPE = (6*(ModelConstants.MODEL_RUN_FREQ//ModelConstants.MODEL_CONTEXT_FREQ + 1), 128, 256)
assert IMG_QUEUE_SHAPE[0] == 30
def get_action_from_model(model_output: dict[str, np.ndarray], prev_action: log.ModelDataV2.Action,
@@ -79,39 +83,108 @@ class FrameMeta:
if vipc is not None:
self.frame_id, self.timestamp_sof, self.timestamp_eof = vipc.frame_id, vipc.timestamp_sof, vipc.timestamp_eof
class InputQueues:
def __init__ (self, model_fps, env_fps, n_frames_input):
assert env_fps % model_fps == 0
assert env_fps >= model_fps
self.model_fps = model_fps
self.env_fps = env_fps
self.n_frames_input = n_frames_input
self.dtypes = {}
self.shapes = {}
self.q = {}
def update_dtypes_and_shapes(self, input_dtypes, input_shapes) -> None:
self.dtypes.update(input_dtypes)
if self.env_fps == self.model_fps:
self.shapes.update(input_shapes)
else:
for k in input_shapes:
shape = list(input_shapes[k])
if 'img' in k:
n_channels = shape[1] // self.n_frames_input
shape[1] = (self.env_fps // self.model_fps + (self.n_frames_input - 1)) * n_channels
else:
shape[1] = (self.env_fps // self.model_fps) * shape[1]
self.shapes[k] = tuple(shape)
def reset(self) -> None:
self.q = {k: np.zeros(self.shapes[k], dtype=self.dtypes[k]) for k in self.dtypes.keys()}
def enqueue(self, inputs:dict[str, np.ndarray]) -> None:
for k in inputs.keys():
if inputs[k].dtype != self.dtypes[k]:
raise ValueError(f'supplied input <{k}({inputs[k].dtype})> has wrong dtype, expected {self.dtypes[k]}')
input_shape = list(self.shapes[k])
input_shape[1] = -1
single_input = inputs[k].reshape(tuple(input_shape))
sz = single_input.shape[1]
self.q[k][:,:-sz] = self.q[k][:,sz:]
self.q[k][:,-sz:] = single_input
def get(self, *names) -> dict[str, np.ndarray]:
if self.env_fps == self.model_fps:
return {k: self.q[k] for k in names}
else:
out = {}
for k in names:
shape = self.shapes[k]
if 'img' in k:
n_channels = shape[1] // (self.env_fps // self.model_fps + (self.n_frames_input - 1))
out[k] = np.concatenate([self.q[k][:, s:s+n_channels] for s in np.linspace(0, shape[1] - n_channels, self.n_frames_input, dtype=int)], axis=1)
elif 'pulse' in k:
# any pulse within interval counts
out[k] = self.q[k].reshape((shape[0], shape[1] * self.model_fps // self.env_fps, self.env_fps // self.model_fps, -1)).max(axis=2)
else:
idxs = np.arange(-1, -shape[1], -self.env_fps // self.model_fps)[::-1]
out[k] = self.q[k][:, idxs]
return out
class ModelState(ModelStateBase):
inputs: dict[str, np.ndarray]
output: np.ndarray
prev_desire: np.ndarray # for tracking the rising edge of the pulse
def __init__(self, cam_w: int, cam_h: int):
def __init__(self):
ModelStateBase.__init__(self)
self.LAT_SMOOTH_SECONDS = LAT_SMOOTH_SECONDS
with open(VISION_METADATA_PATH, 'rb') as f:
vision_metadata = pickle.load(f)
self.vision_input_shapes = vision_metadata['input_shapes']
self.vision_input_names = list(self.vision_input_shapes.keys())
self.vision_output_slices = vision_metadata['output_slices']
vision_output_size = vision_metadata['output_shapes']['outputs'][1]
with open(POLICY_METADATA_PATH, 'rb') as f:
policy_metadata = pickle.load(f)
self.policy_input_shapes = policy_metadata['input_shapes']
self.policy_output_slices = policy_metadata['output_slices']
policy_output_size = policy_metadata['output_shapes']['outputs'][1]
self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
self.frame_skip = ModelConstants.MODEL_RUN_FREQ // ModelConstants.MODEL_CONTEXT_FREQ
self.input_queues, self.npy = make_input_queues(self.vision_input_shapes, self.policy_input_shapes, self.frame_skip)
# policy inputs
self.numpy_inputs = {k: np.zeros(self.policy_input_shapes[k], dtype=np.float32) for k in self.policy_input_shapes}
self.full_input_queues = InputQueues(ModelConstants.MODEL_CONTEXT_FREQ, ModelConstants.MODEL_RUN_FREQ, ModelConstants.N_FRAMES)
for k in ['desire_pulse', 'features_buffer']:
self.full_input_queues.update_dtypes_and_shapes({k: self.numpy_inputs[k].dtype}, {k: self.numpy_inputs[k].shape})
self.full_input_queues.reset()
self.img_queues = {'img': Tensor.zeros(IMG_QUEUE_SHAPE, dtype='uint8').contiguous().realize(),
'big_img': Tensor.zeros(IMG_QUEUE_SHAPE, dtype='uint8').contiguous().realize()}
self.full_frames : dict[str, Tensor] = {}
self._blob_cache : dict[int, Tensor] = {}
self.transforms_np = {k: np.zeros((3,3), dtype=np.float32) for k in self.img_queues}
self.transforms = {k: Tensor(v, device='NPY').realize() for k, v in self.transforms_np.items()}
self.vision_output = np.zeros(vision_output_size, dtype=np.float32)
self.policy_inputs = {k: Tensor(v, device='NPY').realize() for k,v in self.numpy_inputs.items()}
self.policy_output = np.zeros(policy_output_size, dtype=np.float32)
self.parser = Parser()
self.frame_buf_params = {k: get_nv12_info(cam_w, cam_h) for k in ('img', 'big_img')}
self.run_policy = pickle.loads(read_file_chunked(CompileConfig(cam_w, cam_h, prefix='driving_', prepare_only=False).pkl_path))
self.warp_enqueue = pickle.loads(read_file_chunked(CompileConfig(cam_w, cam_h, prefix='driving_', prepare_only=True).pkl_path))
self.warp_enqueue(
**self.input_queues,
frame=Tensor.zeros(self.frame_buf_params['img'][3], dtype='uint8').contiguous().realize(),
big_frame=Tensor.zeros(self.frame_buf_params['big_img'][3], dtype='uint8').contiguous().realize())
self.frame_buf_params : dict[str, tuple[int, int, int, int]] = {}
self.update_imgs = None
self.vision_run = pickle.loads(read_file_chunked(str(VISION_PKL_PATH)))
self.policy_run = pickle.loads(read_file_chunked(str(POLICY_PKL_PATH)))
def slice_outputs(self, model_outputs: np.ndarray, output_slices: dict[str, slice]) -> dict[str, np.ndarray]:
parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in output_slices.items()}
@@ -119,6 +192,18 @@ class ModelState(ModelStateBase):
def run(self, bufs: dict[str, VisionBuf], transforms: dict[str, np.ndarray],
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
# Model decides when action is completed, so desire input is just a pulse triggered on rising edge
inputs['desire_pulse'][0] = 0
new_desire = np.where(inputs['desire_pulse'] - self.prev_desire > .99, inputs['desire_pulse'], 0)
self.prev_desire[:] = inputs['desire_pulse']
if self.update_imgs is None:
for key in bufs.keys():
w, h = bufs[key].width, bufs[key].height
self.frame_buf_params[key] = get_nv12_info(w, h)
warp_path = MODELS_DIR / f'warp_{w}x{h}_tinygrad.pkl'
with open(warp_path, "rb") as f:
self.update_imgs = pickle.load(f)
for key in bufs.keys():
ptr = bufs[key].data.ctypes.data
yuv_size = self.frame_buf_params[key][3]
@@ -127,31 +212,31 @@ class ModelState(ModelStateBase):
if cache_key not in self._blob_cache:
self._blob_cache[cache_key] = Tensor.from_blob(ptr, (yuv_size,), dtype='uint8')
self.full_frames[key] = self._blob_cache[cache_key]
for key in bufs.keys():
self.transforms_np[key][:,:] = transforms[key][:,:]
# Model decides when action is completed, so desire input is just a pulse triggered on rising edge
inputs['desire_pulse'][0] = 0
self.npy['desire'][:] = np.where(inputs['desire_pulse'] - self.prev_desire > .99, inputs['desire_pulse'], 0)
self.prev_desire[:] = inputs['desire_pulse']
self.npy['traffic_convention'][:] = inputs['traffic_convention']
self.npy['tfm'][:,:] = transforms['img'][:,:]
self.npy['big_tfm'][:,:] = transforms['big_img'][:,:]
out = self.update_imgs(self.img_queues['img'], self.full_frames['img'], self.transforms['img'],
self.img_queues['big_img'], self.full_frames['big_img'], self.transforms['big_img'])
self.img_queues['img'], self.img_queues['big_img'] = out[0].realize(), out[2].realize()
vision_inputs = {'img': out[1], 'big_img': out[3]}
if prepare_only:
self.warp_enqueue(**self.input_queues, frame=self.full_frames['img'], big_frame=self.full_frames['big_img'])
return None
vision_output, policy_output = self.run_policy(
**self.input_queues, frame=self.full_frames['img'], big_frame=self.full_frames['big_img']
)
self.vision_output = self.vision_run(**vision_inputs).contiguous().realize().uop.base.buffer.numpy().flatten()
vision_outputs_dict = self.parser.parse_vision_outputs(self.slice_outputs(self.vision_output, self.vision_output_slices))
vision_output = vision_output.numpy().flatten()
policy_output = policy_output.numpy().flatten()
vision_outputs_dict = self.parser.parse_vision_outputs(self.slice_outputs(vision_output, self.vision_output_slices))
policy_outputs_dict = self.parser.parse_policy_outputs(self.slice_outputs(policy_output, self.policy_output_slices))
self.full_input_queues.enqueue({'features_buffer': vision_outputs_dict['hidden_state'], 'desire_pulse': new_desire})
for k in ['desire_pulse', 'features_buffer']:
self.numpy_inputs[k][:] = self.full_input_queues.get(k)[k]
self.numpy_inputs['traffic_convention'][:] = inputs['traffic_convention']
self.policy_output = self.policy_run(**self.policy_inputs).contiguous().realize().uop.base.buffer.numpy().flatten()
policy_outputs_dict = self.parser.parse_policy_outputs(self.slice_outputs(self.policy_output, self.policy_output_slices))
combined_outputs_dict = {**vision_outputs_dict, **policy_outputs_dict}
if SEND_RAW_PRED:
combined_outputs_dict['raw_pred'] = np.concatenate([vision_output.copy(), policy_output.copy()])
combined_outputs_dict['raw_pred'] = np.concatenate([self.vision_output.copy(), self.policy_output.copy()])
return combined_outputs_dict
@@ -163,6 +248,11 @@ def main(demo=False):
# also need to move the aux USB interrupts for good timings
config_realtime_process(7, 54)
st = time.monotonic()
cloudlog.warning("loading model")
model = ModelState()
cloudlog.warning(f"models loaded in {time.monotonic() - st:.1f}s, modeld starting")
# visionipc clients
while True:
available_streams = VisionIpcClient.available_streams("camerad", block=False)
@@ -186,11 +276,6 @@ def main(demo=False):
if use_extra_client:
cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})")
st = time.monotonic()
cloudlog.warning("loading model")
model = ModelState(vipc_client_main.width, vipc_client_main.height)
cloudlog.warning(f"models loaded in {time.monotonic() - st:.1f}s, modeld starting")
# messaging
pm = PubMaster(["modelV2", "drivingModelData", "cameraOdometry", "modelDataV2SP"])
sm = SubMaster(["deviceState", "carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "carControl", "liveDelay"])

View File

@@ -1,6 +1,5 @@
import os
import glob
from tinygrad import Device
Import('env', 'arch')
lenv = env.Clone()
@@ -22,19 +21,10 @@ if PC:
if outputs:
lenv.Command(outputs, inputs, cmd)
available = set(Device.get_available_devices())
if 'CUDA' in available:
tg_backend = 'CUDA'
tg_flags = f'DEV={tg_backend}'
elif 'QCOM' in available:
tg_backend = 'QCOM'
tg_flags = f'DEV={tg_backend} FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0 OPENPILOT_HACKS=1'
else:
tg_backend = 'CPU' if arch == 'Darwin' else 'CPU:LLVM'
# THREADS=0 is need to prevent bug: https://github.com/tinygrad/tinygrad/issues/14689
tg_flags = f'DEV={tg_backend} THREADS=0'
mac_brew_string = f'HOME={os.path.expanduser("~")}' if arch == 'Darwin' else ''
tg_flags = {
'larch64': 'DEV=QCOM FLOAT16=1 NOLOCALS=1 JIT_BATCH_SIZE=0',
'Darwin': f'DEV=CPU THREADS=0 HOME={os.path.expanduser("~")}',
}.get(arch, 'DEV=CPU CPU_LLVM=1 THREADS=0')
image_flag = {
'larch64': 'IMAGE=2',
@@ -48,7 +38,7 @@ def tg_compile(flags, model_name):
return lenv.Command(
out,
[fn + ".onnx"] + tinygrad_files,
f'{pythonpath_string} {tg_flags} {mac_brew_string} {image_flag} python3 {Dir("#tinygrad_repo").abspath}/examples/openpilot/compile3.py {fn}.onnx {out}'
f'{pythonpath_string} {flags} {image_flag} python3 {Dir("#tinygrad_repo").abspath}/examples/openpilot/compile3.py {fn}.onnx {out}'
)
# Compile models
@@ -56,9 +46,9 @@ for model_name in ['supercombo', 'driving_vision', 'driving_off_policy', 'drivin
if File(f"models/{model_name}.onnx").exists():
tg_compile(tg_flags, model_name)
script_files = [File("warp.py"), File(Dir("#selfdrive/modeld").File("compile_modeld.py").abspath)]
script_files = [File("warp.py"), File(Dir("#selfdrive/modeld").File("compile_warp.py").abspath)]
pythonpath_string = 'PYTHONPATH="${PYTHONPATH}:' + env.Dir("#tinygrad_repo").abspath + ':' + env.Dir("#").abspath + '"'
compile_warp_cmd = f'{pythonpath_string} {tg_flags} {mac_brew_string} {image_flag} python3 -m sunnypilot.modeld_v2.warp'
compile_warp_cmd = f'{pythonpath_string} {tg_flags} python3 -m sunnypilot.modeld_v2.warp'
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
warp_targets = []

View File

@@ -2,11 +2,8 @@ import os
os.environ['DEV'] = 'CPU'
import pytest
import numpy as np
from openpilot.sunnypilot.modeld_v2.warp import CAMERA_CONFIGS
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.sunnypilot.modeld_v2.warp import Warp
from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE
MODEL_W, MODEL_H = MEDMODEL_INPUT_SIZE
from openpilot.selfdrive.modeld.compile_warp import get_nv12_info, CAMERA_CONFIGS
from openpilot.sunnypilot.modeld_v2.warp import Warp, MODEL_W, MODEL_H
VISION_NAME_PAIRS = [ # needed to account for supercombos input_imgs
('img', 'big_img'),

View File

@@ -6,79 +6,56 @@ from tinygrad.tensor import Tensor
from tinygrad.engine.jit import TinyJit
from tinygrad.device import Device
# https://github.com/tinygrad/tinygrad/issues/15682
from tinygrad.uop.ops import UOp, Ops
_orig = UOp.__reduce__
UOp.__reduce__ = lambda self: (UOp.unique, ()) if self.op is Ops.UNIQUE else _orig(self)
from openpilot.system.camerad.cameras.nv12_info import get_nv12_info
from openpilot.selfdrive.modeld.compile_modeld import (
NV12Frame, make_frame_prepare,
from openpilot.selfdrive.modeld.compile_warp import (
CAMERA_CONFIGS, MEDMODEL_INPUT_SIZE, make_frame_prepare, make_update_both_imgs,
warp_pkl_path,
)
from openpilot.common.transformations.camera import _ar_ox_fisheye, _os_fisheye
CAMERA_CONFIGS = [
(_ar_ox_fisheye.width, _ar_ox_fisheye.height), # tici: 1928x1208
(_os_fisheye.width, _os_fisheye.height), # mici: 1344x760
]
from openpilot.common.transformations.model import MEDMODEL_INPUT_SIZE
MODELS_DIR = Path(__file__).parent / 'models'
MODEL_W, MODEL_H = MEDMODEL_INPUT_SIZE
UPSTREAM_BUFFER_LENGTH = 5
def warp_pkl_path(cam_w, cam_h):
return MODELS_DIR / f'warp_{cam_w}x{cam_h}_tinygrad.pkl'
def make_update_img_input(frame_prepare, model_w, model_h):
def update_img_input_tinygrad(tensor, frame, M_inv):
M_inv = M_inv.to(Device.DEFAULT)
new_img = frame_prepare(frame, M_inv)
tensor.assign(tensor[6:].cat(new_img, dim=0).contiguous())
return Tensor.cat(tensor[:6], tensor[-6:], dim=0).contiguous().reshape(1, 12, model_h//2, model_w//2)
return update_img_input_tinygrad
def make_update_both_imgs(frame_prepare, model_w, model_h):
update_img = make_update_img_input(frame_prepare, model_w, model_h)
def update_both_imgs_tinygrad(calib_img_buffer, new_img, M_inv,
calib_big_img_buffer, new_big_img, M_inv_big):
calib_img_pair = update_img(calib_img_buffer, new_img, M_inv)
calib_big_img_pair = update_img(calib_big_img_buffer, new_big_img, M_inv_big)
return calib_img_pair, calib_big_img_pair
return update_both_imgs_tinygrad
def v2_warp_pkl_path(cam_w, cam_h, buffer_length):
return MODELS_DIR / f'warp_{cam_w}x{cam_h}_b{buffer_length}_tinygrad.pkl'
def compile_v2_warp(cam_w, cam_h, buffer_length, model_w=MEDMODEL_INPUT_SIZE[0], model_h=MEDMODEL_INPUT_SIZE[1]):
def compile_v2_warp(cam_w, cam_h, buffer_length):
_, _, _, yuv_size = get_nv12_info(cam_w, cam_h)
img_buffer_shape = (buffer_length * 6, model_h // 2, model_w // 2)
img_buffer_shape = (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
print(f"Compiling v2 warp for {cam_w}x{cam_h} buffer_length={buffer_length}...")
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
frame_prepare = make_frame_prepare(nv12, model_w, model_h)
update_both_imgs = make_update_both_imgs(frame_prepare, model_w, model_h)
frame_prepare = make_frame_prepare(cam_w, cam_h, MODEL_W, MODEL_H)
update_both_imgs = make_update_both_imgs(frame_prepare, MODEL_W, MODEL_H)
update_img_jit = TinyJit(update_both_imgs, prune=True)
full_buffer = Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize()
big_full_buffer = Tensor.zeros(img_buffer_shape, dtype='uint8').contiguous().realize()
new_frame_np = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
new_big_frame_np = np.random.randint(0, 256, yuv_size, dtype=np.uint8)
full_buffer_np = np.zeros(img_buffer_shape, dtype=np.uint8)
big_full_buffer_np = np.zeros(img_buffer_shape, dtype=np.uint8)
for i in range(10):
new_frame_np = (32 * np.random.randn(yuv_size).astype(np.float32) + 128).clip(0, 255).astype(np.uint8)
img_inputs = [full_buffer,
Tensor.from_blob(new_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
new_big_frame_np = (32 * np.random.randn(yuv_size).astype(np.float32) + 128).clip(0, 255).astype(np.uint8)
big_img_inputs = [big_full_buffer,
Tensor.from_blob(new_big_frame_np.ctypes.data, (yuv_size,), dtype='uint8').realize(),
Tensor(Tensor.randn(3, 3).mul(8).realize().numpy(), device='NPY')]
inputs = img_inputs + big_img_inputs
Device.default.synchronize()
inputs_np = [x.numpy() for x in inputs]
inputs_np[0] = full_buffer_np
inputs_np[3] = big_full_buffer_np
st = time.perf_counter()
_ = update_img_jit(*inputs)
out = update_img_jit(*inputs)
full_buffer = out[0].contiguous().realize().clone()
big_full_buffer = out[2].contiguous().realize().clone()
mt = time.perf_counter()
Device.default.synchronize()
et = time.perf_counter()
@@ -94,11 +71,9 @@ def compile_v2_warp(cam_w, cam_h, buffer_length, model_w=MEDMODEL_INPUT_SIZE[0],
class Warp:
def __init__(self, buffer_length=2, model_w=MEDMODEL_INPUT_SIZE[0], model_h=MEDMODEL_INPUT_SIZE[1]):
def __init__(self, buffer_length=2):
self.buffer_length = buffer_length
self.model_w = model_w
self.model_h = model_h
self.img_buffer_shape = (buffer_length * 6, model_h // 2, model_w // 2)
self.img_buffer_shape = (buffer_length * 6, MODEL_H // 2, MODEL_W // 2)
self.jit_cache = {}
self.full_buffers = {k: Tensor.zeros(self.img_buffer_shape, dtype='uint8').contiguous().realize() for k in ['img', 'big_img']}
@@ -126,9 +101,8 @@ class Warp:
with open(upstream_pkl, 'rb') as f:
self.jit_cache[key] = pickle.load(f)
if key not in self.jit_cache:
nv12 = NV12Frame(cam_w, cam_h, *get_nv12_info(cam_w, cam_h))
frame_prepare = make_frame_prepare(nv12, self.model_w, self.model_h)
update_both_imgs = make_update_both_imgs(frame_prepare, self.model_w, self.model_h)
frame_prepare = make_frame_prepare(cam_w, cam_h, MODEL_W, MODEL_H)
update_both_imgs = make_update_both_imgs(frame_prepare, MODEL_W, MODEL_H)
self.jit_cache[key] = TinyJit(update_both_imgs, prune=True)
if key not in self._nv12_cache:
@@ -151,8 +125,8 @@ class Warp:
self.full_buffers['img'], road_blob, self.transforms['img'],
self.full_buffers['big_img'], wide_blob, self.transforms['big_img'],
)
out_road = res[0].realize()
out_wide = res[1].realize()
self.full_buffers['img'], out_road = res[0].realize(), res[1].realize()
self.full_buffers['big_img'], out_wide = res[2].realize(), res[3].realize()
return {road: out_road, wide: out_wide}

View File

@@ -116,7 +116,7 @@ class ModelCache:
class ModelFetcher:
"""Handles fetching and caching of model data from remote source"""
MODEL_URL = "https://raw.githubusercontent.com/sunnypilot/sunnypilot-models/refs/heads/gh-pages/docs/driving_models_v17.json"
MODEL_URL = "https://raw.githubusercontent.com/sunnypilot/sunnypilot-models/refs/heads/gh-pages/docs/driving_models_v16.json"
def __init__(self, params: Params):
self.params = params

View File

@@ -46,5 +46,8 @@ class CruiseHelper:
if self.button_frame_counts[ButtonType.gapAdjustCruise] >= DISTANCE_LONG_PRESS and not self.experimental_mode_switched:
self._experimental_mode = not experimental_mode
self.params.put_bool_nonblocking("ExperimentalMode", self._experimental_mode)
events.add(EventNameSP.experimentalModeSwitched)
if self._experimental_mode:
events.add(EventNameSP.experimentalModeSwitched)
else:
events.add(EventNameSP.chillModeSwitched)
self.experimental_mode_switched = True

View File

@@ -181,7 +181,11 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
},
EventNameSP.experimentalModeSwitched: {
ET.WARNING: NormalPermanentAlert("Experimental Mode Switched", duration=1.5)
ET.WARNING: NormalPermanentAlert("Experimental Mode", duration=1.5)
},
EventNameSP.chillModeSwitched: {
ET.WARNING: NormalPermanentAlert("Chill Mode", duration=1.5)
},
EventNameSP.wrongCarModeAlertOnly: {

View File

@@ -30,6 +30,10 @@ class SunnylinkApi(BaseApi):
return super().api_get(endpoint, method, timeout, access_token, session, json, **kwargs)
def resume_queued(self, timeout=10, **kwargs):
sunnylinkId, commaId = self._resolve_dongle_ids()
return self.api_get(f"ws/{sunnylinkId}/resume_queued", "POST", timeout, access_token=self.get_token(), **kwargs)
def get_token(self, payload_extra=None, expiry_hours=1):
# Add your additional data here
additional_data = {}

View File

@@ -64,6 +64,7 @@ def handle_long_poll(ws: WebSocket, exit_event: threading.Event | None) -> None:
threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'),
threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'),
threading.Thread(target=ws_ping, args=(ws, end_event), name='ws_ping'),
threading.Thread(target=ws_queue, args=(end_event,), name='ws_queue'),
threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=sunny_log_handler, args=(end_event, comma_prime_cellular_end_event), name='log_handler'),
threading.Thread(target=stat_handler, args=(end_event, Paths.stats_sp_root(), True), name='stat_handler'),
@@ -146,6 +147,37 @@ def ws_ping(ws: WebSocket, end_event: threading.Event) -> None:
cloudlog.debug("sunnylinkd.ws_ping.end_event is set, exiting ws_ping thread")
def ws_queue(end_event: threading.Event) -> None:
sunnylink_dongle_id = params.get("SunnylinkDongleId")
sunnylink_api = SunnylinkApi(sunnylink_dongle_id)
resume_requested = False
tries = 0
while not end_event.is_set() and not resume_requested:
try:
if not resume_requested:
cloudlog.debug("sunnylinkd.ws_queue.resume_queued")
sunnylink_api.resume_queued(timeout=29)
resume_requested = True
tries = 0
except Exception as e:
if isinstance(e, (ConnectionError, TimeoutError)):
cloudlog.warning(f"sunnylinkd.ws_queue.resume_queued.{type(e).__name__}")
else:
cloudlog.exception("sunnylinkd.ws_queue.resume_queued.exception")
resume_requested = False
tries += 1
time.sleep(backoff(tries))
if end_event.is_set():
cloudlog.debug("end_event is set, exiting ws_queue thread")
elif resume_requested:
cloudlog.debug(f"Resume requested to server after {tries} tries")
else:
cloudlog.error(f"Reached end of ws_queue while end_event is not set and resume_requested is {resume_requested}")
def sunny_log_handler(end_event: threading.Event, comma_prime_cellular_end_event: threading.Event) -> None:
while not end_event.wait(0.1):
if not comma_prime_cellular_end_event.is_set():