mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-26 16:32:06 +08:00
process_replay: support for replaying multiple processes simultaneously (#28829)
* first working ProcessContainer and _replay_multi_process implementation * Setup controlsd config_callback * Add vision ipc support * Remove old code * add todo * Update estimated processing times * Use deepcopy when getting configs * Rework the API to support passing multiple names or configs * Add get_custom_params_from_lr * Add more typing * Enable simulation for controlsd * Fix typing issues * Regen refactor * Fix linter issues * Make wideRoadCameraState optional * Move DummySocket to helpers * Improve typing in regen * Minor improvements * Maintain sort order of internal_pub_queue using heapq * Move ProcessContainer elsewhere * Fix internal heap * Change comment * Remove subtest_name from ProcessConfig * Update cereal * Add newline
This commit is contained in:
+1
-1
Submodule cereal updated: cf9c5cbb91...6f7102581f
@@ -2,12 +2,15 @@ import os
|
||||
import shutil
|
||||
import uuid
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from common.params import Params
|
||||
|
||||
class OpenpilotPrefix(object):
|
||||
def __init__(self, prefix: str = None) -> None:
|
||||
def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True):
|
||||
self.prefix = prefix if prefix else str(uuid.uuid4())
|
||||
self.msgq_path = os.path.join('/dev/shm', self.prefix)
|
||||
self.clean_dirs_on_exit = clean_dirs_on_exit
|
||||
|
||||
def __enter__(self):
|
||||
os.environ['OPENPILOT_PREFIX'] = self.prefix
|
||||
@@ -17,10 +20,28 @@ class OpenpilotPrefix(object):
|
||||
pass
|
||||
|
||||
def __exit__(self, exc_type, exc_obj, exc_tb):
|
||||
if self.clean_dirs_on_exit:
|
||||
self.clean_dirs()
|
||||
del os.environ['OPENPILOT_PREFIX']
|
||||
return False
|
||||
|
||||
def clean_dirs(self):
|
||||
symlink_path = Params().get_param_path()
|
||||
if os.path.exists(symlink_path):
|
||||
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
|
||||
os.remove(symlink_path)
|
||||
shutil.rmtree(self.msgq_path, ignore_errors=True)
|
||||
del os.environ['OPENPILOT_PREFIX']
|
||||
return False
|
||||
|
||||
|
||||
class DummySocket:
|
||||
def __init__(self):
|
||||
self.data: List[bytes] = []
|
||||
|
||||
def receive(self, non_blocking: bool = False) -> Optional[bytes]:
|
||||
if non_blocking:
|
||||
return None
|
||||
|
||||
return self.data.pop()
|
||||
|
||||
def send(self, data: bytes):
|
||||
self.data.append(data)
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import time
|
||||
import copy
|
||||
import json
|
||||
import heapq
|
||||
import signal
|
||||
import platform
|
||||
from collections import OrderedDict
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional, Callable, Union, Any
|
||||
from typing import Dict, List, Optional, Callable, Union, Any, Iterable, Tuple
|
||||
from tqdm import tqdm
|
||||
import capnp
|
||||
|
||||
@@ -19,18 +22,17 @@ from common.realtime import DT_CTRL
|
||||
from panda.python import ALTERNATIVE_EXPERIENCE
|
||||
from selfdrive.car.car_helpers import get_car, interfaces
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
|
||||
from selfdrive.test.process_replay.helpers import OpenpilotPrefix, DummySocket
|
||||
from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
|
||||
from selfdrive.test.process_replay.migration import migrate_all
|
||||
from tools.lib.logreader import LogReader
|
||||
|
||||
# Numpy gives different results based on CPU features after version 19
|
||||
NUMPY_TOLERANCE = 1e-7
|
||||
CI = "CI" in os.environ
|
||||
TIMEOUT = 15
|
||||
PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
|
||||
|
||||
|
||||
class ReplayContext:
|
||||
def __init__(self, cfg):
|
||||
self.proc_name = cfg.proc_name
|
||||
@@ -40,6 +42,14 @@ class ReplayContext:
|
||||
assert(len(self.pubs) != 0 or self.main_pub is not None)
|
||||
|
||||
def __enter__(self):
|
||||
self.open()
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_obj, exc_tb):
|
||||
self.close()
|
||||
|
||||
def open(self):
|
||||
messaging.toggle_fake_events(True)
|
||||
messaging.set_fake_prefix(self.proc_name)
|
||||
|
||||
@@ -50,9 +60,7 @@ class ReplayContext:
|
||||
else:
|
||||
self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)}
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_obj, exc_tb):
|
||||
def close(self):
|
||||
del self.events
|
||||
|
||||
messaging.toggle_fake_events(False)
|
||||
@@ -101,8 +109,7 @@ class ProcessConfig:
|
||||
init_callback: Optional[Callable] = None
|
||||
should_recv_callback: Optional[Callable] = None
|
||||
tolerance: Optional[float] = None
|
||||
environ: Dict[str, str] = field(default_factory=dict)
|
||||
subtest_name: str = ""
|
||||
processing_time: float = 0.001
|
||||
field_tolerances: Dict[str, float] = field(default_factory=dict)
|
||||
timeout: int = 30
|
||||
simulation: bool = True
|
||||
@@ -112,18 +119,151 @@ class ProcessConfig:
|
||||
ignore_alive_pubs: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class DummySocket:
|
||||
def __init__(self):
|
||||
self.data = []
|
||||
class ProcessContainer:
|
||||
def __init__(self, cfg: ProcessConfig):
|
||||
self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False)
|
||||
self.cfg = copy.deepcopy(cfg)
|
||||
self.process = managed_processes[cfg.proc_name]
|
||||
self.msg_queue: List[capnp._DynamicStructReader] = []
|
||||
self.cnt = 0
|
||||
self.pm: Optional[messaging.PubMaster] = None
|
||||
self.sockets: Optional[List[messaging.SubSocket]] = None
|
||||
self.rc: Optional[ReplayContext] = None
|
||||
self.vipc_server: Optional[VisionIpcServer] = None
|
||||
|
||||
def receive(self, non_blocking=False):
|
||||
if non_blocking:
|
||||
return None
|
||||
@property
|
||||
def has_empty_queue(self) -> bool:
|
||||
return len(self.msg_queue) == 0
|
||||
|
||||
return self.data.pop()
|
||||
@property
|
||||
def pubs(self) -> List[str]:
|
||||
return self.cfg.pubs
|
||||
|
||||
def send(self, data):
|
||||
self.data.append(data)
|
||||
@property
|
||||
def subs(self) -> List[str]:
|
||||
return self.cfg.subs
|
||||
|
||||
def _setup_env(self, params_config: Dict[str, Any], environ_config: Dict[str, Any]):
|
||||
for k, v in environ_config.items():
|
||||
if len(v) != 0:
|
||||
os.environ[k] = v
|
||||
elif k in os.environ:
|
||||
del os.environ[k]
|
||||
|
||||
os.environ["PROC_NAME"] = self.cfg.proc_name
|
||||
if self.cfg.simulation:
|
||||
os.environ["SIMULATION"] = "1"
|
||||
elif "SIMULATION" in os.environ:
|
||||
del os.environ["SIMULATION"]
|
||||
|
||||
params = Params()
|
||||
for k, v in params_config.items():
|
||||
if isinstance(v, bool):
|
||||
params.put_bool(k, v)
|
||||
else:
|
||||
params.put(k, v)
|
||||
|
||||
def _setup_vision_ipc(self, all_msgs):
|
||||
assert len(self.cfg.vision_pubs) != 0
|
||||
|
||||
device_type = next(msg.initData.deviceType for msg in all_msgs if msg.which() == "initData")
|
||||
|
||||
vipc_server = VisionIpcServer("camerad")
|
||||
streams_metas = available_streams(all_msgs)
|
||||
for meta in streams_metas:
|
||||
if meta.camera_state in self.cfg.vision_pubs:
|
||||
vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type])
|
||||
vipc_server.start_listener()
|
||||
|
||||
self.vipc_server = vipc_server
|
||||
|
||||
def start(
|
||||
self, params_config: Dict[str, Any], environ_config: Dict[str, Any],
|
||||
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], fingerprint: Optional[str]
|
||||
):
|
||||
with self.prefix:
|
||||
self._setup_env(params_config, environ_config)
|
||||
|
||||
if self.cfg.config_callback is not None:
|
||||
params = Params()
|
||||
self.cfg.config_callback(params, self.cfg, all_msgs)
|
||||
|
||||
self.rc = ReplayContext(self.cfg)
|
||||
self.rc.open()
|
||||
|
||||
self.pm = messaging.PubMaster(self.cfg.pubs)
|
||||
self.sockets = [messaging.sub_sock(s, timeout=100) for s in self.cfg.subs]
|
||||
|
||||
if len(self.cfg.vision_pubs) != 0:
|
||||
self._setup_vision_ipc(all_msgs)
|
||||
assert self.vipc_server is not None
|
||||
|
||||
self.process.prepare()
|
||||
self.process.start()
|
||||
|
||||
if self.cfg.init_callback is not None:
|
||||
self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint)
|
||||
|
||||
# wait for process to startup
|
||||
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(self.cfg.proc_name)}"):
|
||||
while not all(self.pm.all_readers_updated(s) for s in self.cfg.pubs if s not in self.cfg.ignore_alive_pubs):
|
||||
time.sleep(0)
|
||||
|
||||
def stop(self):
|
||||
with self.prefix:
|
||||
self.process.signal(signal.SIGKILL)
|
||||
self.process.stop()
|
||||
self.rc.close()
|
||||
self.prefix.clean_dirs()
|
||||
|
||||
def run_step(self, msg: capnp._DynamicStructReader, frs: Optional[Dict[str, Any]]) -> List[capnp._DynamicStructReader]:
|
||||
assert self.rc and self.pm and self.sockets and self.process.proc
|
||||
|
||||
output_msgs = []
|
||||
with self.prefix, Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"):
|
||||
end_of_cycle = True
|
||||
if self.cfg.should_recv_callback is not None:
|
||||
end_of_cycle = self.cfg.should_recv_callback(msg, self.cfg, self.cnt)
|
||||
|
||||
self.msg_queue.append(msg)
|
||||
if end_of_cycle:
|
||||
self.rc.wait_for_recv_called()
|
||||
|
||||
# call recv to let sub-sockets reconnect, after we know the process is ready
|
||||
if self.cnt == 0:
|
||||
for s in self.sockets:
|
||||
messaging.recv_one_or_none(s)
|
||||
|
||||
# empty recv on drained pub indicates the end of messages, only do that if there're any
|
||||
trigger_empty_recv = False
|
||||
if self.cfg.main_pub and self.cfg.main_pub_drained:
|
||||
trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False)
|
||||
|
||||
for m in self.msg_queue:
|
||||
self.pm.send(m.which(), m.as_builder())
|
||||
# send frames if needed
|
||||
if self.vipc_server is not None and m.which() in self.cfg.vision_pubs:
|
||||
camera_state = getattr(m, m.which())
|
||||
camera_meta = meta_from_camera_state(m.which())
|
||||
assert frs is not None
|
||||
img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0]
|
||||
self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(),
|
||||
camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof)
|
||||
self.msg_queue = []
|
||||
|
||||
self.rc.unlock_sockets()
|
||||
self.rc.wait_for_next_recv(trigger_empty_recv)
|
||||
|
||||
for socket in self.sockets:
|
||||
ms = messaging.drain_sock(socket)
|
||||
for m in ms:
|
||||
m = m.as_builder()
|
||||
m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9)
|
||||
output_msgs.append(m.as_reader())
|
||||
self.cnt += 1
|
||||
assert self.process.proc.is_alive()
|
||||
|
||||
return output_msgs
|
||||
|
||||
|
||||
def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint):
|
||||
@@ -242,21 +382,38 @@ class FrequencyBasedRcvCallback:
|
||||
if frame % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0
|
||||
]
|
||||
return bool(len(resp_sockets))
|
||||
|
||||
|
||||
def controlsd_config_callback(params, cfg, lr):
|
||||
controlsState = None
|
||||
initialized = False
|
||||
for msg in lr:
|
||||
if msg.which() == "controlsState":
|
||||
controlsState = msg.controlsState
|
||||
if initialized:
|
||||
break
|
||||
elif msg.which() == "carEvents":
|
||||
initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents]
|
||||
|
||||
assert controlsState is not None and initialized, "controlsState never initialized"
|
||||
params.put("ReplayControlsState", controlsState.as_builder().to_bytes())
|
||||
|
||||
|
||||
def laikad_config_pubsub_callback(params, cfg):
|
||||
def laikad_config_pubsub_callback(params, cfg, lr):
|
||||
ublox = params.get_bool("UbloxAvailable")
|
||||
main_key = "ubloxGnss" if ublox else "qcomGnss"
|
||||
sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", })
|
||||
|
||||
return set(cfg.pubs) - sub_keys, main_key, True
|
||||
cfg.pubs = set(cfg.pubs) - sub_keys
|
||||
cfg.main_pub = main_key
|
||||
cfg.main_pub_drained = True
|
||||
|
||||
|
||||
def locationd_config_pubsub_callback(params, cfg):
|
||||
def locationd_config_pubsub_callback(params, cfg, lr):
|
||||
ublox = params.get_bool("UbloxAvailable")
|
||||
sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", })
|
||||
|
||||
return set(cfg.pubs) - sub_keys, None, False
|
||||
cfg.pubs = set(cfg.pubs) - sub_keys
|
||||
|
||||
|
||||
CONFIGS = [
|
||||
@@ -270,9 +427,11 @@ CONFIGS = [
|
||||
],
|
||||
subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
|
||||
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
|
||||
config_callback=controlsd_config_callback,
|
||||
init_callback=controlsd_fingerprint_callback,
|
||||
should_recv_callback=controlsd_rcv_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
processing_time=0.004,
|
||||
main_pub="can",
|
||||
),
|
||||
ProcessConfig(
|
||||
@@ -327,6 +486,7 @@ CONFIGS = [
|
||||
init_callback=get_car_params_callback,
|
||||
should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
processing_time=0.004,
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="ubloxd",
|
||||
@@ -341,8 +501,9 @@ CONFIGS = [
|
||||
ignore=["logMonoTime"],
|
||||
config_callback=laikad_config_pubsub_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
processing_time=0.002,
|
||||
timeout=60*10, # first messages are blocked on internet assistance
|
||||
main_pub="ubloxGnss", # config_callback will switch this to qcom if needed
|
||||
main_pub="ubloxGnss", # config_callback will switch this to qcom if needed
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="torqued",
|
||||
@@ -360,6 +521,7 @@ CONFIGS = [
|
||||
ignore=["logMonoTime", "modelV2.frameDropPerc", "modelV2.modelExecutionTime"],
|
||||
should_recv_callback=ModeldCameraSyncRcvCallback(),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
processing_time=0.020,
|
||||
main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("roadCameraState").stream),
|
||||
main_pub_drained=False,
|
||||
vision_pubs=["roadCameraState", "wideRoadCameraState"],
|
||||
@@ -372,6 +534,7 @@ CONFIGS = [
|
||||
ignore=["logMonoTime", "driverStateV2.modelExecutionTime", "driverStateV2.dspExecutionTime"],
|
||||
should_recv_callback=dmonitoringmodeld_rcv_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
processing_time=0.020,
|
||||
main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("driverCameraState").stream),
|
||||
main_pub_drained=False,
|
||||
vision_pubs=["driverCameraState"],
|
||||
@@ -380,27 +543,73 @@ CONFIGS = [
|
||||
]
|
||||
|
||||
|
||||
def get_process_config(name):
|
||||
def get_process_config(name: str) -> ProcessConfig:
|
||||
try:
|
||||
return next(c for c in CONFIGS if c.proc_name == name)
|
||||
return copy.deepcopy(next(c for c in CONFIGS if c.proc_name == name))
|
||||
except StopIteration as ex:
|
||||
raise Exception(f"Cannot find process config with name: {name}") from ex
|
||||
|
||||
|
||||
def replay_process_with_name(name, lr, *args, **kwargs):
|
||||
cfg = get_process_config(name)
|
||||
return replay_process(cfg, lr, *args, **kwargs)
|
||||
def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructReader]], initial_state: str = "first") -> Dict[str, Any]:
|
||||
"""
|
||||
Use this to get custom params dict based on provided logs.
|
||||
Useful when replaying following processes: calibrationd, paramsd, torqued
|
||||
The params may be based on first or last message of given type (carParams, liveCalibration, liveParameters, liveTorqueParameters) in the logs.
|
||||
"""
|
||||
|
||||
car_params = [m for m in lr if m.which() == "carParams"]
|
||||
live_calibration = [m for m in lr if m.which() == "liveCalibration"]
|
||||
live_parameters = [m for m in lr if m.which() == "liveParameters"]
|
||||
live_torque_parameters = [m for m in lr if m.which() == "liveTorqueParameters"]
|
||||
|
||||
assert initial_state in ["first", "last"]
|
||||
msg_index = 0 if initial_state == "first" else -1
|
||||
|
||||
assert len(car_params) > 0, "carParams required for initial state of liveParameters and liveTorqueCarParams"
|
||||
CP = car_params[msg_index].carParams
|
||||
|
||||
custom_params = {}
|
||||
if len(live_calibration) > 0:
|
||||
custom_params["CalibrationParams"] = live_calibration[msg_index].as_builder().to_bytes()
|
||||
if len(live_parameters) > 0:
|
||||
lp_dict = live_parameters[msg_index].to_dict()
|
||||
lp_dict["carFingerprint"] = CP.carFingerprint
|
||||
custom_params["LiveParameters"] = json.dumps(lp_dict)
|
||||
if len(live_torque_parameters) > 0:
|
||||
custom_params["LiveTorqueCarParams"] = CP.as_builder().to_bytes()
|
||||
custom_params["LiveTorqueParameters"] = live_torque_parameters[msg_index].as_builder().to_bytes()
|
||||
|
||||
return custom_params
|
||||
|
||||
|
||||
def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False):
|
||||
all_msgs = migrate_all(lr, old_logtime=True, camera_states=len(cfg.vision_pubs) != 0)
|
||||
process_logs = _replay_single_process(cfg, all_msgs, frs, fingerprint, custom_params, disable_progress)
|
||||
def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]:
|
||||
if isinstance(name, str):
|
||||
cfgs = [get_process_config(name)]
|
||||
elif isinstance(name, Iterable):
|
||||
cfgs = [get_process_config(n) for n in name]
|
||||
else:
|
||||
raise ValueError("name must be str or collections of strings")
|
||||
|
||||
return replay_process(cfgs, lr, *args, **kwargs)
|
||||
|
||||
|
||||
def replay_process(
|
||||
cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None,
|
||||
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False
|
||||
) -> List[capnp._DynamicStructReader]:
|
||||
if isinstance(cfg, Iterable):
|
||||
cfgs = list(cfg)
|
||||
else:
|
||||
cfgs = [cfg]
|
||||
|
||||
all_msgs = migrate_all(lr, old_logtime=True, camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs))
|
||||
process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, disable_progress)
|
||||
|
||||
if return_all_logs:
|
||||
keys = set(cfg.subs)
|
||||
keys = {m.which() for m in process_logs}
|
||||
modified_logs = [m for m in all_msgs if m.which() not in keys]
|
||||
modified_logs.extend(process_logs)
|
||||
modified_logs.sort(key=lambda m: m.logMonoTime)
|
||||
modified_logs.sort(key=lambda m: int(m.logMonoTime))
|
||||
log_msgs = modified_logs
|
||||
else:
|
||||
log_msgs = process_logs
|
||||
@@ -408,202 +617,131 @@ def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, c
|
||||
return log_msgs
|
||||
|
||||
|
||||
def _replay_single_process(
|
||||
cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]],
|
||||
def _replay_multi_process(
|
||||
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]],
|
||||
fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool
|
||||
):
|
||||
with OpenpilotPrefix():
|
||||
controlsState = None
|
||||
initialized = False
|
||||
if cfg.proc_name == "controlsd":
|
||||
for msg in lr:
|
||||
if msg.which() == "controlsState":
|
||||
controlsState = msg.controlsState
|
||||
if initialized:
|
||||
break
|
||||
elif msg.which() == "carEvents":
|
||||
initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents]
|
||||
|
||||
assert controlsState is not None and initialized, "controlsState never initialized"
|
||||
|
||||
if fingerprint is not None:
|
||||
setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint, custom_params=custom_params)
|
||||
else:
|
||||
CP = next((m.carParams for m in lr if m.which() == "carParams"), None)
|
||||
assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it"
|
||||
setup_env(cfg=cfg, CP=CP, controlsState=controlsState, lr=lr, custom_params=custom_params)
|
||||
|
||||
if cfg.config_callback is not None:
|
||||
params = Params()
|
||||
cfg.pubs, cfg.main_pub, cfg.main_pub_drained = cfg.config_callback(params, cfg)
|
||||
|
||||
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
|
||||
pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)]
|
||||
|
||||
with ReplayContext(cfg) as rc:
|
||||
pm = messaging.PubMaster(cfg.pubs)
|
||||
sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs}
|
||||
|
||||
vipc_server = None
|
||||
if len(cfg.vision_pubs) != 0:
|
||||
assert frs is not None, "frs must be provided when replaying process using vision streams"
|
||||
assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}"
|
||||
assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}"
|
||||
vipc_server = setup_vision_ipc(cfg, lr)
|
||||
|
||||
managed_processes[cfg.proc_name].prepare()
|
||||
managed_processes[cfg.proc_name].start()
|
||||
|
||||
if cfg.init_callback is not None:
|
||||
cfg.init_callback(rc, pm, all_msgs, fingerprint)
|
||||
|
||||
log_msgs, msg_queue = [], []
|
||||
try:
|
||||
# Wait for process to startup
|
||||
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"):
|
||||
while not all(pm.all_readers_updated(s) for s in cfg.pubs if s not in cfg.ignore_alive_pubs):
|
||||
time.sleep(0)
|
||||
|
||||
# Do the replay
|
||||
cnt = 0
|
||||
for msg in tqdm(pub_msgs, disable=disable_progress):
|
||||
with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
|
||||
resp_sockets, end_of_cycle = cfg.subs, True
|
||||
if cfg.should_recv_callback is not None:
|
||||
end_of_cycle = cfg.should_recv_callback(msg, cfg, cnt)
|
||||
|
||||
msg_queue.append(msg)
|
||||
if end_of_cycle:
|
||||
rc.wait_for_recv_called()
|
||||
|
||||
# call recv to let sub-sockets reconnect, after we know the process is ready
|
||||
if cnt == 0:
|
||||
for s in sockets.values():
|
||||
messaging.recv_one_or_none(s)
|
||||
|
||||
# empty recv on drained pub indicates the end of messages, only do that if there're any
|
||||
trigger_empty_recv = False
|
||||
if cfg.main_pub and cfg.main_pub_drained:
|
||||
trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.main_pub), False)
|
||||
|
||||
for m in msg_queue:
|
||||
pm.send(m.which(), m.as_builder())
|
||||
# send frames if needed
|
||||
if vipc_server is not None and m.which() in cfg.vision_pubs:
|
||||
camera_state = getattr(m, m.which())
|
||||
camera_meta = meta_from_camera_state(m.which())
|
||||
assert frs is not None
|
||||
img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0]
|
||||
vipc_server.send(camera_meta.stream, img.flatten().tobytes(),
|
||||
camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof)
|
||||
msg_queue = []
|
||||
|
||||
rc.unlock_sockets()
|
||||
rc.wait_for_next_recv(trigger_empty_recv)
|
||||
|
||||
for s in resp_sockets:
|
||||
ms = messaging.drain_sock(sockets[s])
|
||||
for m in ms:
|
||||
m = m.as_builder()
|
||||
m.logMonoTime = msg.logMonoTime
|
||||
log_msgs.append(m.as_reader())
|
||||
cnt += 1
|
||||
proc = managed_processes[cfg.proc_name].proc
|
||||
assert(proc and proc.is_alive())
|
||||
finally:
|
||||
managed_processes[cfg.proc_name].signal(signal.SIGKILL)
|
||||
managed_processes[cfg.proc_name].stop()
|
||||
|
||||
return log_msgs
|
||||
|
||||
|
||||
def setup_vision_ipc(cfg, lr):
|
||||
assert len(cfg.vision_pubs) != 0
|
||||
|
||||
device_type = next(msg.initData.deviceType for msg in lr if msg.which() == "initData")
|
||||
|
||||
vipc_server = VisionIpcServer("camerad")
|
||||
streams_metas = available_streams(lr)
|
||||
for meta in streams_metas:
|
||||
if meta.camera_state in cfg.vision_pubs:
|
||||
vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type])
|
||||
vipc_server.start_listener()
|
||||
|
||||
return vipc_server
|
||||
|
||||
|
||||
def setup_env(cfg=None, CP=None, controlsState=None, lr=None, fingerprint=None, custom_params=None, log_dir=None):
|
||||
if platform.system() != "Darwin":
|
||||
os.environ["PARAMS_ROOT"] = "/dev/shm/params"
|
||||
if log_dir is not None:
|
||||
os.environ["LOG_ROOT"] = log_dir
|
||||
|
||||
params = Params()
|
||||
params.clear_all()
|
||||
params.put_bool("OpenpilotEnabledToggle", True)
|
||||
params.put_bool("Passive", False)
|
||||
params.put_bool("DisengageOnAccelerator", True)
|
||||
params.put_bool("DisableLogging", False)
|
||||
if custom_params is not None:
|
||||
for k, v in custom_params.items():
|
||||
if type(v) == bool:
|
||||
params.put_bool(k, v)
|
||||
else:
|
||||
params.put(k, v)
|
||||
|
||||
os.environ["NO_RADAR_SLEEP"] = "1"
|
||||
os.environ["REPLAY"] = "1"
|
||||
) -> List[capnp._DynamicStructReader]:
|
||||
if fingerprint is not None:
|
||||
os.environ['SKIP_FW_QUERY'] = "1"
|
||||
os.environ['FINGERPRINT'] = fingerprint
|
||||
params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params)
|
||||
env_config = generate_environ_config(fingerprint=fingerprint)
|
||||
else:
|
||||
os.environ["SKIP_FW_QUERY"] = ""
|
||||
os.environ["FINGERPRINT"] = ""
|
||||
CP = next((m.carParams for m in lr if m.which() == "carParams"), None)
|
||||
params_config = generate_params_config(lr=lr, CP=CP, custom_params=custom_params)
|
||||
env_config = generate_environ_config(CP=CP)
|
||||
|
||||
if lr is not None:
|
||||
services = {m.which() for m in lr}
|
||||
params.put_bool("UbloxAvailable", "ubloxGnss" in services)
|
||||
# validate frs and vision pubs
|
||||
for cfg in cfgs:
|
||||
if len(cfg.vision_pubs) == 0:
|
||||
continue
|
||||
|
||||
assert frs is not None, "frs must be provided when replaying process using vision streams"
|
||||
assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}"
|
||||
assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}"
|
||||
|
||||
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
|
||||
log_msgs = []
|
||||
try:
|
||||
containers = []
|
||||
for cfg in cfgs:
|
||||
container = ProcessContainer(cfg)
|
||||
container.start(params_config, env_config, all_msgs, fingerprint)
|
||||
containers.append(container)
|
||||
|
||||
all_pubs = set([pub for container in containers for pub in container.pubs])
|
||||
all_subs = set([sub for container in containers for sub in container.subs])
|
||||
lr_pubs = all_pubs - all_subs
|
||||
pubs_to_containers = {pub: [container for container in containers if pub in container.pubs] for pub in all_pubs}
|
||||
|
||||
pub_msgs = [msg for msg in all_msgs if msg.which() in lr_pubs]
|
||||
# external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished
|
||||
external_pub_queue: List[capnp._DynamicStructReader] = pub_msgs.copy()
|
||||
internal_pub_queue: List[capnp._DynamicStructReader] = []
|
||||
# heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue)
|
||||
internal_pub_index_heap: List[Tuple[int, int]] = []
|
||||
|
||||
pbar = tqdm(total=len(external_pub_queue), disable=disable_progress)
|
||||
while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)):
|
||||
if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]):
|
||||
msg = external_pub_queue.pop(0)
|
||||
pbar.update(1)
|
||||
else:
|
||||
_, index = heapq.heappop(internal_pub_index_heap)
|
||||
msg = internal_pub_queue[index]
|
||||
|
||||
target_containers = pubs_to_containers[msg.which()]
|
||||
for container in target_containers:
|
||||
output_msgs = container.run_step(msg, frs)
|
||||
for m in output_msgs:
|
||||
if m.which() in all_pubs:
|
||||
internal_pub_queue.append(m)
|
||||
heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1))
|
||||
log_msgs.extend(output_msgs)
|
||||
finally:
|
||||
for container in containers:
|
||||
container.stop()
|
||||
|
||||
return log_msgs
|
||||
|
||||
|
||||
def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) -> Dict[str, Any]:
|
||||
params_dict = {
|
||||
"OpenpilotEnabledToggle": True,
|
||||
"Passive": False,
|
||||
"DisengageOnAccelerator": True,
|
||||
"DisableLogging": False,
|
||||
}
|
||||
|
||||
if cfg is not None:
|
||||
# Clear all custom processConfig environment variables
|
||||
for config in CONFIGS:
|
||||
for k, _ in config.environ.items():
|
||||
if k in os.environ:
|
||||
del os.environ[k]
|
||||
if custom_params is not None:
|
||||
params_dict.update(custom_params)
|
||||
if lr is not None:
|
||||
has_ublox = any(msg.which() == "ubloxGnss" for msg in lr)
|
||||
params_dict["UbloxAvailable"] = has_ublox
|
||||
is_rhd = next((msg.driverMonitoringState.isRHD for msg in lr if msg.which() == "driverMonitoringState"), False)
|
||||
params_dict["IsRhdDetected"] = is_rhd
|
||||
|
||||
os.environ.update(cfg.environ)
|
||||
os.environ['PROC_NAME'] = cfg.proc_name
|
||||
|
||||
if cfg is not None and cfg.simulation:
|
||||
os.environ["SIMULATION"] = "1"
|
||||
elif "SIMULATION" in os.environ:
|
||||
del os.environ["SIMULATION"]
|
||||
|
||||
# Initialize controlsd with a controlsState packet
|
||||
if controlsState is not None:
|
||||
params.put("ReplayControlsState", controlsState.as_builder().to_bytes())
|
||||
else:
|
||||
params.remove("ReplayControlsState")
|
||||
|
||||
# Regen or python process
|
||||
if CP is not None:
|
||||
if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS:
|
||||
params.put_bool("DisengageOnAccelerator", False)
|
||||
params_dict["DisengageOnAccelerator"] = False
|
||||
|
||||
if fingerprint is None:
|
||||
if CP.fingerprintSource == "fw":
|
||||
params.put("CarParamsCache", CP.as_builder().to_bytes())
|
||||
os.environ['SKIP_FW_QUERY'] = ""
|
||||
os.environ['FINGERPRINT'] = ""
|
||||
else:
|
||||
os.environ['SKIP_FW_QUERY'] = "1"
|
||||
os.environ['FINGERPRINT'] = CP.carFingerprint
|
||||
params_dict["CarParamsCache"] = CP.as_builder().to_bytes()
|
||||
|
||||
if CP.openpilotLongitudinalControl:
|
||||
params.put_bool("ExperimentalLongitudinalEnabled", True)
|
||||
params_dict["ExperimentalLongitudinalEnabled"] = True
|
||||
|
||||
return params_dict
|
||||
|
||||
|
||||
def check_openpilot_enabled(msgs):
|
||||
def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> Dict[str, Any]:
|
||||
environ_dict = {}
|
||||
if platform.system() != "Darwin":
|
||||
environ_dict["PARAMS_ROOT"] = "/dev/shm/params"
|
||||
if log_dir is not None:
|
||||
environ_dict["LOG_ROOT"] = log_dir
|
||||
|
||||
environ_dict["NO_RADAR_SLEEP"] = "1"
|
||||
environ_dict["REPLAY"] = "1"
|
||||
|
||||
# Regen or python process
|
||||
if CP is not None and fingerprint is None:
|
||||
if CP.fingerprintSource == "fw":
|
||||
environ_dict['SKIP_FW_QUERY'] = ""
|
||||
environ_dict['FINGERPRINT'] = ""
|
||||
else:
|
||||
environ_dict['SKIP_FW_QUERY'] = "1"
|
||||
environ_dict['FINGERPRINT'] = CP.carFingerprint
|
||||
elif fingerprint is not None:
|
||||
environ_dict['SKIP_FW_QUERY'] = "1"
|
||||
environ_dict['FINGERPRINT'] = fingerprint
|
||||
else:
|
||||
environ_dict["SKIP_FW_QUERY"] = ""
|
||||
environ_dict["FINGERPRINT"] = ""
|
||||
|
||||
return environ_dict
|
||||
|
||||
|
||||
def check_openpilot_enabled(msgs: Union[LogReader, List[capnp._DynamicStructReader]]) -> bool:
|
||||
cur_enabled_count = 0
|
||||
max_enabled_count = 0
|
||||
for msg in msgs:
|
||||
|
||||
@@ -1,336 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
import bz2
|
||||
import os
|
||||
import time
|
||||
import multiprocessing
|
||||
import argparse
|
||||
from tqdm import tqdm
|
||||
# run DM procs
|
||||
os.environ["USE_WEBCAM"] = "1"
|
||||
import time
|
||||
import capnp
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import car
|
||||
from cereal.services import service_list
|
||||
from cereal.visionipc import VisionIpcServer, VisionStreamType
|
||||
from common.params import Params
|
||||
from common.realtime import Ratekeeper, DT_MDL, DT_DMON, sec_since_boot
|
||||
from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, tici_f_frame_size, tici_d_frame_size, tici_e_frame_size
|
||||
from panda.python import Panda
|
||||
from selfdrive.car.toyota.values import EPS_SCALE
|
||||
from selfdrive.manager.process import ensure_running
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.test.process_replay.process_replay import CONFIGS, FAKEDATA, setup_env, check_openpilot_enabled
|
||||
from selfdrive.test.process_replay.migration import migrate_all
|
||||
from typing import Union, Iterable, Optional, List, Any, Dict, Tuple
|
||||
|
||||
from selfdrive.test.process_replay.process_replay import CONFIGS, FAKEDATA, replay_process, get_process_config, check_openpilot_enabled, get_custom_params_from_lr
|
||||
from selfdrive.test.update_ci_routes import upload_route
|
||||
from tools.lib.route import Route
|
||||
from tools.lib.framereader import FrameReader
|
||||
from tools.lib.logreader import LogReader
|
||||
|
||||
def replay_panda_states(s, msgs):
|
||||
pm = messaging.PubMaster([s, 'peripheralState'])
|
||||
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
|
||||
smsgs = [m for m in msgs if m.which() in ['pandaStates', 'pandaStateDEPRECATED']]
|
||||
|
||||
# TODO: safety param migration should be handled automatically
|
||||
safety_param_migration = {
|
||||
"TOYOTA PRIUS 2017": EPS_SCALE["TOYOTA PRIUS 2017"] | Panda.FLAG_TOYOTA_STOCK_LONGITUDINAL,
|
||||
"TOYOTA RAV4 2017": EPS_SCALE["TOYOTA RAV4 2017"] | Panda.FLAG_TOYOTA_ALT_BRAKE,
|
||||
"KIA EV6 2022": Panda.FLAG_HYUNDAI_EV_GAS | Panda.FLAG_HYUNDAI_CANFD_HDA2,
|
||||
}
|
||||
|
||||
# Migrate safety param base on carState
|
||||
cp = [m for m in msgs if m.which() == 'carParams'][0].carParams
|
||||
if cp.carFingerprint in safety_param_migration:
|
||||
safety_param = safety_param_migration[cp.carFingerprint]
|
||||
elif len(cp.safetyConfigs):
|
||||
safety_param = cp.safetyConfigs[0].safetyParam
|
||||
if cp.safetyConfigs[0].safetyParamDEPRECATED != 0:
|
||||
safety_param = cp.safetyConfigs[0].safetyParamDEPRECATED
|
||||
else:
|
||||
safety_param = cp.safetyParamDEPRECATED
|
||||
|
||||
while True:
|
||||
for m in smsgs:
|
||||
if m.which() == 'pandaStateDEPRECATED':
|
||||
new_m = messaging.new_message('pandaStates', 1)
|
||||
new_m.pandaStates[0] = m.pandaStateDEPRECATED
|
||||
new_m.pandaStates[0].safetyParam = safety_param
|
||||
pm.send(s, new_m)
|
||||
else:
|
||||
new_m = m.as_builder()
|
||||
new_m.pandaStates[-1].safetyParam = safety_param
|
||||
new_m.logMonoTime = int(sec_since_boot() * 1e9)
|
||||
pm.send(s, new_m)
|
||||
|
||||
new_m = messaging.new_message('peripheralState')
|
||||
pm.send('peripheralState', new_m)
|
||||
|
||||
rk.keep_time()
|
||||
from tools.lib.helpers import save_log
|
||||
|
||||
|
||||
def replay_manager_state(s, msgs):
|
||||
pm = messaging.PubMaster([s, ])
|
||||
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
|
||||
|
||||
while True:
|
||||
new_m = messaging.new_message('managerState')
|
||||
new_m.managerState.processes = [{'name': name, 'running': True} for name in managed_processes]
|
||||
pm.send(s, new_m)
|
||||
rk.keep_time()
|
||||
|
||||
|
||||
def replay_device_state(s, msgs):
|
||||
pm = messaging.PubMaster([s, ])
|
||||
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
|
||||
smsgs = [m for m in msgs if m.which() == s]
|
||||
while True:
|
||||
for m in smsgs:
|
||||
new_m = m.as_builder()
|
||||
new_m.logMonoTime = int(sec_since_boot() * 1e9)
|
||||
new_m.deviceState.freeSpacePercent = 50
|
||||
new_m.deviceState.memoryUsagePercent = 50
|
||||
pm.send(s, new_m)
|
||||
rk.keep_time()
|
||||
|
||||
|
||||
def replay_sensor_event(s, msgs):
|
||||
pm = messaging.PubMaster([s, ])
|
||||
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
|
||||
smsgs = [m for m in msgs if m.which() == s]
|
||||
while True:
|
||||
for m in smsgs:
|
||||
m = m.as_builder()
|
||||
m.logMonoTime = int(sec_since_boot() * 1e9)
|
||||
getattr(m, m.which()).timestamp = m.logMonoTime
|
||||
pm.send(m.which(), m)
|
||||
rk.keep_time()
|
||||
|
||||
|
||||
def replay_service(s, msgs):
|
||||
pm = messaging.PubMaster([s, ])
|
||||
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
|
||||
smsgs = [m for m in msgs if m.which() == s]
|
||||
while True:
|
||||
for m in smsgs:
|
||||
new_m = m.as_builder()
|
||||
new_m.logMonoTime = int(sec_since_boot() * 1e9)
|
||||
pm.send(s, new_m)
|
||||
rk.keep_time()
|
||||
|
||||
|
||||
def replay_cameras(lr, frs, disable_tqdm=False):
|
||||
eon_cameras = [
|
||||
("roadCameraState", DT_MDL, eon_f_frame_size, VisionStreamType.VISION_STREAM_ROAD),
|
||||
("driverCameraState", DT_DMON, eon_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER),
|
||||
]
|
||||
tici_cameras = [
|
||||
("roadCameraState", DT_MDL, tici_f_frame_size, VisionStreamType.VISION_STREAM_ROAD),
|
||||
("wideRoadCameraState", DT_MDL, tici_e_frame_size, VisionStreamType.VISION_STREAM_WIDE_ROAD),
|
||||
("driverCameraState", DT_DMON, tici_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER),
|
||||
]
|
||||
|
||||
def replay_camera(s, stream, dt, vipc_server, frames, size):
|
||||
services = [(s, stream)]
|
||||
pm = messaging.PubMaster([s for s, _ in services])
|
||||
rk = Ratekeeper(1 / dt, print_delay_threshold=None)
|
||||
|
||||
img = b"\x00" * int(size[0] * size[1] * 3 / 2)
|
||||
while True:
|
||||
if frames is not None:
|
||||
img = frames[rk.frame % len(frames)]
|
||||
|
||||
rk.keep_time()
|
||||
|
||||
for s, stream in services:
|
||||
m = messaging.new_message(s)
|
||||
msg = getattr(m, s)
|
||||
msg.frameId = rk.frame
|
||||
msg.timestampSof = m.logMonoTime
|
||||
msg.timestampEof = m.logMonoTime
|
||||
pm.send(s, m)
|
||||
|
||||
vipc_server.send(stream, img, msg.frameId, msg.timestampSof, msg.timestampEof)
|
||||
|
||||
init_data = [m for m in lr if m.which() == 'initData'][0]
|
||||
cameras = tici_cameras if (init_data.initData.deviceType in ['tici', 'tizi']) else eon_cameras
|
||||
|
||||
# init vipc server and cameras
|
||||
p = []
|
||||
vs = VisionIpcServer("camerad")
|
||||
for (s, dt, size, stream) in cameras:
|
||||
fr = frs.get(s, None)
|
||||
|
||||
frames = None
|
||||
if fr is not None:
|
||||
print(f"Decompressing frames {s}")
|
||||
frames = []
|
||||
for i in tqdm(range(fr.frame_count), disable=disable_tqdm):
|
||||
img = fr.get(i, pix_fmt='nv12')[0]
|
||||
frames.append(img.flatten().tobytes())
|
||||
|
||||
vs.create_buffers(stream, 40, False, size[0], size[1])
|
||||
p.append(multiprocessing.Process(target=replay_camera,
|
||||
args=(s, stream, dt, vs, frames, size)))
|
||||
|
||||
vs.start_listener()
|
||||
return vs, p
|
||||
|
||||
|
||||
def regen_segment(lr, frs=None, daemons="all", outdir=FAKEDATA, disable_tqdm=False):
|
||||
def regen_segment(
|
||||
lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None,
|
||||
daemons: Union[str, Iterable[str]] = "all", disable_tqdm: bool = False
|
||||
) -> List[capnp._DynamicStructReader]:
|
||||
if not isinstance(daemons, str) and not hasattr(daemons, "__iter__"):
|
||||
raise ValueError("whitelist_proc must be a string or iterable")
|
||||
|
||||
lr = migrate_all(lr)
|
||||
if frs is None:
|
||||
frs = dict()
|
||||
all_msgs = sorted(lr, key=lambda m: m.logMonoTime)
|
||||
custom_params = get_custom_params_from_lr(all_msgs)
|
||||
|
||||
# Get and setup initial state
|
||||
CP = [m for m in lr if m.which() == 'carParams'][0].carParams
|
||||
controlsState = [m for m in lr if m.which() == 'controlsState'][0].controlsState
|
||||
liveCalibration = [m for m in lr if m.which() == 'liveCalibration'][0]
|
||||
|
||||
setup_env(CP=CP, controlsState=controlsState, log_dir=outdir)
|
||||
|
||||
params = Params()
|
||||
params.put("CalibrationParams", liveCalibration.as_builder().to_bytes())
|
||||
|
||||
vs, cam_procs = replay_cameras(lr, frs, disable_tqdm=disable_tqdm)
|
||||
fake_daemons = {
|
||||
'sensord': [
|
||||
multiprocessing.Process(target=replay_sensor_event, args=('accelerometer', lr)),
|
||||
multiprocessing.Process(target=replay_sensor_event, args=('gyroscope', lr)),
|
||||
multiprocessing.Process(target=replay_sensor_event, args=('magnetometer', lr)),
|
||||
],
|
||||
'pandad': [
|
||||
multiprocessing.Process(target=replay_service, args=('can', lr)),
|
||||
multiprocessing.Process(target=replay_service, args=('ubloxRaw', lr)),
|
||||
multiprocessing.Process(target=replay_panda_states, args=('pandaStates', lr)),
|
||||
],
|
||||
'manager': [
|
||||
multiprocessing.Process(target=replay_manager_state, args=('managerState', lr)),
|
||||
],
|
||||
'thermald': [
|
||||
multiprocessing.Process(target=replay_device_state, args=('deviceState', lr)),
|
||||
],
|
||||
'rawgpsd': [
|
||||
multiprocessing.Process(target=replay_service, args=('qcomGnss', lr)),
|
||||
multiprocessing.Process(target=replay_service, args=('gpsLocation', lr)),
|
||||
],
|
||||
'camerad': [
|
||||
*cam_procs,
|
||||
],
|
||||
}
|
||||
# TODO add configs for modeld, dmonitoringmodeld
|
||||
fakeable_daemons = {}
|
||||
for config in CONFIGS:
|
||||
processes = [
|
||||
multiprocessing.Process(target=replay_service, args=(msg, lr))
|
||||
for msg in config.subs
|
||||
]
|
||||
fakeable_daemons[config.proc_name] = processes
|
||||
|
||||
additional_fake_daemons = {}
|
||||
if daemons != "all":
|
||||
additional_fake_daemons = fakeable_daemons
|
||||
if isinstance(daemons, str):
|
||||
raise ValueError(f"Invalid value for daemons: {daemons}")
|
||||
|
||||
replayed_processes = []
|
||||
for d in daemons:
|
||||
if d in fake_daemons:
|
||||
raise ValueError(f"Running daemon {d} is not supported!")
|
||||
|
||||
if d in fakeable_daemons:
|
||||
del additional_fake_daemons[d]
|
||||
cfg = get_process_config(d)
|
||||
replayed_processes.append(cfg)
|
||||
else:
|
||||
replayed_processes = CONFIGS
|
||||
|
||||
all_fake_daemons = {**fake_daemons, **additional_fake_daemons}
|
||||
print("Replayed processes:", [p.proc_name for p in replayed_processes])
|
||||
print("\n\n", "*"*30, "\n\n", sep="")
|
||||
|
||||
try:
|
||||
# TODO: make first run of onnxruntime CUDA provider fast
|
||||
if "modeld" not in all_fake_daemons:
|
||||
managed_processes["modeld"].start()
|
||||
if "dmonitoringmodeld" not in all_fake_daemons:
|
||||
managed_processes["dmonitoringmodeld"].start()
|
||||
time.sleep(5)
|
||||
output_logs = replay_process(replayed_processes, all_msgs, frs, return_all_logs=True, custom_params=custom_params, disable_progress=disable_tqdm)
|
||||
|
||||
# start procs up
|
||||
ignore = list(all_fake_daemons.keys()) \
|
||||
+ ['ui', 'manage_athenad', 'uploader', 'soundd', 'micd', 'navd']
|
||||
|
||||
print("Faked daemons:", ", ".join(all_fake_daemons.keys()))
|
||||
print("Running daemons:", ", ".join([key for key in managed_processes.keys() if key not in ignore]))
|
||||
|
||||
ensure_running(managed_processes.values(), started=True, params=Params(), CP=car.CarParams(), not_run=ignore)
|
||||
for procs in all_fake_daemons.values():
|
||||
for p in procs:
|
||||
p.start()
|
||||
|
||||
for _ in tqdm(range(60), disable=disable_tqdm):
|
||||
# ensure all procs are running
|
||||
for d, procs in all_fake_daemons.items():
|
||||
for p in procs:
|
||||
if not p.is_alive():
|
||||
raise Exception(f"{d}'s {p.name} died")
|
||||
time.sleep(1)
|
||||
finally:
|
||||
# kill everything
|
||||
for p in managed_processes.values():
|
||||
p.stop()
|
||||
for procs in all_fake_daemons.values():
|
||||
for p in procs:
|
||||
p.terminate()
|
||||
|
||||
del vs
|
||||
|
||||
segment = params.get("CurrentRoute", encoding='utf-8') + "--0"
|
||||
seg_path = os.path.join(outdir, segment)
|
||||
# check to make sure openpilot is engaged in the route
|
||||
if not check_openpilot_enabled(LogReader(os.path.join(seg_path, "rlog"))):
|
||||
raise Exception(f"Route did not engage for long enough: {segment}")
|
||||
|
||||
return seg_path
|
||||
return output_logs
|
||||
|
||||
|
||||
def regen_and_save(route, sidx, daemons="all", upload=False, use_route_meta=False, outdir=FAKEDATA, disable_tqdm=False):
|
||||
def setup_data_readers(route: str, sidx: int, use_route_meta: bool) -> Tuple[LogReader, Dict[str, Any]]:
|
||||
if use_route_meta:
|
||||
r = Route(route)
|
||||
lr = LogReader(r.log_paths()[sidx])
|
||||
fr = FrameReader(r.camera_paths()[sidx])
|
||||
if r.ecamera_paths()[sidx] is not None:
|
||||
wfr = FrameReader(r.ecamera_paths()[sidx])
|
||||
else:
|
||||
wfr = None
|
||||
frs = {}
|
||||
if len(r.camera_paths()) > sidx and r.camera_paths()[sidx] is not None:
|
||||
frs['roadCameraState'] = FrameReader(r.camera_paths()[sidx])
|
||||
if len(r.ecamera_paths()) > sidx and r.ecamera_paths()[sidx] is not None:
|
||||
frs['wideCameraState'] = FrameReader(r.ecamera_paths()[sidx])
|
||||
if len(r.dcamera_paths()) > sidx and r.dcamera_paths()[sidx] is not None:
|
||||
frs['driverCameraState'] = FrameReader(r.dcamera_paths()[sidx])
|
||||
else:
|
||||
lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2")
|
||||
fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc")
|
||||
device_type = next(iter(lr)).initData.deviceType
|
||||
if device_type in ['tici', 'tizi']:
|
||||
wfr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/ecamera.hevc")
|
||||
else:
|
||||
wfr = None
|
||||
|
||||
frs = {'roadCameraState': fr}
|
||||
if wfr is not None:
|
||||
frs['wideRoadCameraState'] = wfr
|
||||
rpath = regen_segment(lr, frs, daemons, outdir=outdir, disable_tqdm=disable_tqdm)
|
||||
frs = {
|
||||
'roadCameraState': FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc"),
|
||||
'driverCameraState': FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/dcamera.hevc"),
|
||||
}
|
||||
if next((True for m in lr if m.which() == "wideRoadCameraState"), False):
|
||||
frs['wideRoadCameraState'] = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/ecamera.hevc")
|
||||
|
||||
# compress raw rlog before uploading
|
||||
with open(os.path.join(rpath, "rlog"), "rb") as f:
|
||||
data = bz2.compress(f.read())
|
||||
with open(os.path.join(rpath, "rlog.bz2"), "wb") as f:
|
||||
f.write(data)
|
||||
os.remove(os.path.join(rpath, "rlog"))
|
||||
return lr, frs
|
||||
|
||||
lr = LogReader(os.path.join(rpath, 'rlog.bz2'))
|
||||
controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState']
|
||||
assert any(controls_state_active), "Segment did not engage"
|
||||
|
||||
relr = os.path.relpath(rpath)
|
||||
def regen_and_save(
|
||||
route: str, sidx: int, daemons: Union[str, Iterable[str]] = "all", outdir: str = FAKEDATA,
|
||||
upload: bool = False, use_route_meta: bool = False, disable_tqdm: bool = False
|
||||
) -> str:
|
||||
lr, frs = setup_data_readers(route, sidx, use_route_meta)
|
||||
output_logs = regen_segment(lr, frs, daemons, disable_tqdm=disable_tqdm)
|
||||
|
||||
log_dir = os.path.join(outdir, time.strftime("%Y-%m-%d--%H-%M-%S--0", time.gmtime()))
|
||||
rel_log_dir = os.path.relpath(log_dir)
|
||||
rpath = os.path.join(log_dir, "rlog.bz2")
|
||||
|
||||
os.makedirs(log_dir)
|
||||
save_log(rpath, output_logs, compress=True)
|
||||
|
||||
print("\n\n", "*"*30, "\n\n", sep="")
|
||||
print("New route:", rel_log_dir, "\n")
|
||||
|
||||
if not check_openpilot_enabled(output_logs):
|
||||
raise Exception("Route did not engage for long enough")
|
||||
|
||||
print("\n\n", "*"*30, "\n\n")
|
||||
print("New route:", relr, "\n")
|
||||
if upload:
|
||||
upload_route(relr, exclude_patterns=['*.hevc', ])
|
||||
return relr
|
||||
upload_route(rel_log_dir)
|
||||
|
||||
return rel_log_dir
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -348,4 +107,4 @@ if __name__ == "__main__":
|
||||
parser.add_argument("seg", type=int, help="Segment in source route")
|
||||
args = parser.parse_args()
|
||||
|
||||
regen_and_save(args.route, args.seg, args.whitelist_procs, args.upload, outdir=args.outdir)
|
||||
regen_and_save(args.route, args.seg, daemons=args.whitelist_procs, upload=args.upload, outdir=args.outdir)
|
||||
|
||||
@@ -82,7 +82,7 @@ def run_test_process(data):
|
||||
assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}"
|
||||
upload_file(cur_log_fn, os.path.basename(cur_log_fn))
|
||||
os.remove(cur_log_fn)
|
||||
return (segment, cfg.proc_name, cfg.subtest_name, res)
|
||||
return (segment, cfg.proc_name, res)
|
||||
|
||||
|
||||
def get_log_data(segment):
|
||||
@@ -224,24 +224,24 @@ if __name__ == "__main__":
|
||||
if cfg.proc_name not in tested_procs:
|
||||
continue
|
||||
|
||||
cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}{cfg.subtest_name}_{cur_commit}.bz2")
|
||||
cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2")
|
||||
if args.update_refs: # reference logs will not exist if routes were just regenerated
|
||||
ref_log_path = get_url(*segment.rsplit("--", 1))
|
||||
else:
|
||||
ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}{cfg.subtest_name}_{ref_commit}.bz2")
|
||||
ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2")
|
||||
ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn)
|
||||
|
||||
dat = None if args.upload_only else log_data[segment]
|
||||
pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat))
|
||||
|
||||
log_paths[segment][cfg.proc_name + cfg.subtest_name]['ref'] = ref_log_path
|
||||
log_paths[segment][cfg.proc_name + cfg.subtest_name]['new'] = cur_log_fn
|
||||
log_paths[segment][cfg.proc_name]['ref'] = ref_log_path
|
||||
log_paths[segment][cfg.proc_name]['new'] = cur_log_fn
|
||||
|
||||
results: Any = defaultdict(dict)
|
||||
p2 = pool.map(run_test_process, pool_args)
|
||||
for (segment, proc, subtest_name, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)):
|
||||
for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)):
|
||||
if not args.upload_only:
|
||||
results[segment][proc + subtest_name] = result
|
||||
results[segment][proc] = result
|
||||
|
||||
diff1, diff2, failed = format_diff(results, log_paths, ref_commit)
|
||||
if not upload:
|
||||
|
||||
Reference in New Issue
Block a user