mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-28 01:52:06 +08:00
basic jotpluggler (#36045)
* jotpluggler! * demo, executable, fontfile * calc max and min, numpy, cloudlog * mypy things * simplified data.py * multiprocessed data ingest * allow verrryyy long search results * stream in multiprocessed segments * bug fixes * simplify and speed up timeseries * small fixes * rewrite layout * resizable layouts * cleanup * downsampling * deque for consistency * use item_visible_handler * only build visible UI * don't delete item handlers, add locks, don't expand large lists * delete item handlers after a frame * small data tree improvements * seperate datatree into its own file * reset when loading new segments * fix plot window resizing and recursive split resizing logic
This commit is contained in:
@@ -124,6 +124,7 @@ dev = [
|
||||
|
||||
tools = [
|
||||
"metadrive-simulator @ https://github.com/commaai/metadrive/releases/download/MetaDrive-minimal-0.4.2.4/metadrive_simulator-0.4.2.4-py3-none-any.whl ; (platform_machine != 'aarch64')",
|
||||
"dearpygui>=2.1.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -0,0 +1,311 @@
|
||||
import numpy as np
|
||||
import threading
|
||||
import multiprocessing
|
||||
import bisect
|
||||
from collections import defaultdict
|
||||
import tqdm
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.tools.lib.logreader import _LogFileReader, LogReader
|
||||
|
||||
|
||||
def flatten_dict(d: dict, sep: str = "/", prefix: str = None) -> dict:
|
||||
result = {}
|
||||
stack: list[tuple] = [(d, prefix)]
|
||||
|
||||
while stack:
|
||||
obj, current_prefix = stack.pop()
|
||||
|
||||
if isinstance(obj, dict):
|
||||
for key, val in obj.items():
|
||||
new_prefix = key if current_prefix is None else f"{current_prefix}{sep}{key}"
|
||||
if isinstance(val, (dict, list)):
|
||||
stack.append((val, new_prefix))
|
||||
else:
|
||||
result[new_prefix] = val
|
||||
elif isinstance(obj, list):
|
||||
for i, item in enumerate(obj):
|
||||
new_prefix = f"{current_prefix}{sep}{i}"
|
||||
if isinstance(item, (dict, list)):
|
||||
stack.append((item, new_prefix))
|
||||
else:
|
||||
result[new_prefix] = item
|
||||
else:
|
||||
if current_prefix is not None:
|
||||
result[current_prefix] = obj
|
||||
return result
|
||||
|
||||
|
||||
def extract_field_types(schema, prefix, field_types_dict):
|
||||
stack = [(schema, prefix)]
|
||||
|
||||
while stack:
|
||||
current_schema, current_prefix = stack.pop()
|
||||
|
||||
for field in current_schema.fields_list:
|
||||
field_name = field.proto.name
|
||||
field_path = f"{current_prefix}/{field_name}"
|
||||
field_proto = field.proto
|
||||
field_which = field_proto.which()
|
||||
|
||||
field_type = field_proto.slot.type.which() if field_which == 'slot' else field_which
|
||||
field_types_dict[field_path] = field_type
|
||||
|
||||
if field_which == 'slot':
|
||||
slot_type = field_proto.slot.type
|
||||
type_which = slot_type.which()
|
||||
|
||||
if type_which == 'list':
|
||||
element_type = slot_type.list.elementType.which()
|
||||
list_path = f"{field_path}/*"
|
||||
field_types_dict[list_path] = element_type
|
||||
|
||||
if element_type == 'struct':
|
||||
stack.append((field.schema.elementType, list_path))
|
||||
|
||||
elif type_which == 'struct':
|
||||
stack.append((field.schema, field_path))
|
||||
|
||||
elif field_which == 'group':
|
||||
stack.append((field.schema, field_path))
|
||||
|
||||
|
||||
def _convert_to_optimal_dtype(values_list, capnp_type):
|
||||
if not values_list:
|
||||
return np.array([])
|
||||
|
||||
dtype_mapping = {
|
||||
'bool': np.bool_, 'int8': np.int8, 'int16': np.int16, 'int32': np.int32, 'int64': np.int64,
|
||||
'uint8': np.uint8, 'uint16': np.uint16, 'uint32': np.uint32, 'uint64': np.uint64,
|
||||
'float32': np.float32, 'float64': np.float64, 'text': object, 'data': object,
|
||||
'enum': object, 'anyPointer': object,
|
||||
}
|
||||
|
||||
target_dtype = dtype_mapping.get(capnp_type)
|
||||
return np.array(values_list, dtype=target_dtype) if target_dtype else np.array(values_list)
|
||||
|
||||
|
||||
def _match_field_type(field_path, field_types):
|
||||
if field_path in field_types:
|
||||
return field_types[field_path]
|
||||
|
||||
path_parts = field_path.split('/')
|
||||
template_parts = [p if not p.isdigit() else '*' for p in path_parts]
|
||||
template_path = '/'.join(template_parts)
|
||||
return field_types.get(template_path)
|
||||
|
||||
|
||||
def msgs_to_time_series(msgs):
|
||||
"""Extract scalar fields and return (time_series_data, start_time, end_time)."""
|
||||
collected_data = defaultdict(lambda: {'timestamps': [], 'columns': defaultdict(list), 'sparse_fields': set()})
|
||||
field_types = {}
|
||||
extracted_schemas = set()
|
||||
min_time = max_time = None
|
||||
|
||||
for msg in msgs:
|
||||
typ = msg.which()
|
||||
timestamp = msg.logMonoTime * 1e-9
|
||||
if typ != 'initData':
|
||||
if min_time is None:
|
||||
min_time = timestamp
|
||||
max_time = timestamp
|
||||
|
||||
sub_msg = getattr(msg, typ)
|
||||
if not hasattr(sub_msg, 'to_dict') or typ in ('qcomGnss', 'ubloxGnss'):
|
||||
continue
|
||||
|
||||
if hasattr(sub_msg, 'schema') and typ not in extracted_schemas:
|
||||
extract_field_types(sub_msg.schema, typ, field_types)
|
||||
extracted_schemas.add(typ)
|
||||
|
||||
msg_dict = sub_msg.to_dict(verbose=True)
|
||||
flat_dict = flatten_dict(msg_dict)
|
||||
flat_dict['_valid'] = msg.valid
|
||||
|
||||
type_data = collected_data[typ]
|
||||
columns, sparse_fields = type_data['columns'], type_data['sparse_fields']
|
||||
known_fields = set(columns.keys())
|
||||
missing_fields = known_fields - flat_dict.keys()
|
||||
|
||||
for field, value in flat_dict.items():
|
||||
if field not in known_fields and type_data['timestamps']:
|
||||
sparse_fields.add(field)
|
||||
columns[field].append(value)
|
||||
if value is None:
|
||||
sparse_fields.add(field)
|
||||
|
||||
for field in missing_fields:
|
||||
columns[field].append(None)
|
||||
sparse_fields.add(field)
|
||||
|
||||
type_data['timestamps'].append(timestamp)
|
||||
|
||||
final_result = {}
|
||||
for typ, data in collected_data.items():
|
||||
if not data['timestamps']:
|
||||
continue
|
||||
|
||||
typ_result = {'t': np.array(data['timestamps'], dtype=np.float64)}
|
||||
sparse_fields = data['sparse_fields']
|
||||
|
||||
for field_name, values in data['columns'].items():
|
||||
if len(values) < len(data['timestamps']):
|
||||
values = [None] * (len(data['timestamps']) - len(values)) + values
|
||||
sparse_fields.add(field_name)
|
||||
|
||||
if field_name in sparse_fields:
|
||||
typ_result[field_name] = np.array(values, dtype=object)
|
||||
else:
|
||||
capnp_type = _match_field_type(f"{typ}/{field_name}", field_types)
|
||||
typ_result[field_name] = _convert_to_optimal_dtype(values, capnp_type)
|
||||
|
||||
final_result[typ] = typ_result
|
||||
|
||||
return final_result, min_time or 0.0, max_time or 0.0
|
||||
|
||||
|
||||
def _process_segment(segment_identifier: str):
|
||||
try:
|
||||
lr = _LogFileReader(segment_identifier, sort_by_time=True)
|
||||
return msgs_to_time_series(lr)
|
||||
except Exception as e:
|
||||
cloudlog.warning(f"Warning: Failed to process segment {segment_identifier}: {e}")
|
||||
return {}, 0.0, 0.0
|
||||
|
||||
|
||||
class DataManager:
|
||||
def __init__(self):
|
||||
self._segments = []
|
||||
self._segment_starts = []
|
||||
self._start_time = 0.0
|
||||
self._duration = 0.0
|
||||
self._paths = set()
|
||||
self._observers = []
|
||||
self._loading = False
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def load_route(self, route: str) -> None:
|
||||
if self._loading:
|
||||
return
|
||||
self._reset()
|
||||
threading.Thread(target=self._load_async, args=(route,), daemon=True).start()
|
||||
|
||||
def get_timeseries(self, path: str):
|
||||
with self._lock:
|
||||
msg_type, field = path.split('/', 1)
|
||||
times, values = [], []
|
||||
|
||||
for segment in self._segments:
|
||||
if msg_type in segment and field in segment[msg_type]:
|
||||
times.append(segment[msg_type]['t'])
|
||||
values.append(segment[msg_type][field])
|
||||
|
||||
if not times:
|
||||
return [], []
|
||||
|
||||
combined_times = np.concatenate(times) - self._start_time
|
||||
if len(values) > 1 and any(arr.dtype != values[0].dtype for arr in values):
|
||||
values = [arr.astype(object) for arr in values]
|
||||
|
||||
return combined_times, np.concatenate(values)
|
||||
|
||||
def get_value_at(self, path: str, time: float):
|
||||
with self._lock:
|
||||
MAX_LOOKBACK = 5.0 # seconds
|
||||
absolute_time = self._start_time + time
|
||||
message_type, field = path.split('/', 1)
|
||||
current_index = bisect.bisect_right(self._segment_starts, absolute_time) - 1
|
||||
for index in (current_index, current_index - 1):
|
||||
if not 0 <= index < len(self._segments):
|
||||
continue
|
||||
segment = self._segments[index].get(message_type)
|
||||
if not segment or field not in segment:
|
||||
continue
|
||||
times = segment['t']
|
||||
if len(times) == 0 or (index != current_index and absolute_time - times[-1] > MAX_LOOKBACK):
|
||||
continue
|
||||
position = np.searchsorted(times, absolute_time, 'right') - 1
|
||||
if position >= 0 and absolute_time - times[position] <= MAX_LOOKBACK:
|
||||
return segment[field][position]
|
||||
return None
|
||||
|
||||
def get_all_paths(self):
|
||||
with self._lock:
|
||||
return sorted(self._paths)
|
||||
|
||||
def get_duration(self):
|
||||
with self._lock:
|
||||
return self._duration
|
||||
|
||||
def is_plottable(self, path: str):
|
||||
data = self.get_timeseries(path)
|
||||
if data is None:
|
||||
return False
|
||||
_, values = data
|
||||
return np.issubdtype(values.dtype, np.number) or np.issubdtype(values.dtype, np.bool_)
|
||||
|
||||
def add_observer(self, callback):
|
||||
with self._lock:
|
||||
self._observers.append(callback)
|
||||
|
||||
def remove_observer(self, callback):
|
||||
with self._lock:
|
||||
if callback in self._observers:
|
||||
self._observers.remove(callback)
|
||||
|
||||
def _reset(self):
|
||||
with self._lock:
|
||||
self._loading = True
|
||||
self._segments.clear()
|
||||
self._segment_starts.clear()
|
||||
self._paths.clear()
|
||||
self._start_time = self._duration = 0.0
|
||||
observers = self._observers.copy()
|
||||
|
||||
for callback in observers:
|
||||
callback({'reset': True})
|
||||
|
||||
def _load_async(self, route: str):
|
||||
try:
|
||||
lr = LogReader(route, sort_by_time=True)
|
||||
if not lr.logreader_identifiers:
|
||||
cloudlog.warning(f"Warning: No log segments found for route: {route}")
|
||||
return
|
||||
|
||||
num_processes = max(1, multiprocessing.cpu_count() // 2)
|
||||
with multiprocessing.Pool(processes=num_processes) as pool, tqdm.tqdm(total=len(lr.logreader_identifiers), desc="Processing Segments") as pbar:
|
||||
for segment_result, start_time, end_time in pool.imap(_process_segment, lr.logreader_identifiers):
|
||||
pbar.update(1)
|
||||
if segment_result:
|
||||
self._add_segment(segment_result, start_time, end_time)
|
||||
except Exception:
|
||||
cloudlog.exception(f"Error loading route {route}:")
|
||||
finally:
|
||||
self._finalize_loading()
|
||||
|
||||
def _add_segment(self, segment_data: dict, start_time: float, end_time: float):
|
||||
with self._lock:
|
||||
self._segments.append(segment_data)
|
||||
self._segment_starts.append(start_time)
|
||||
|
||||
if len(self._segments) == 1:
|
||||
self._start_time = start_time
|
||||
self._duration = end_time - self._start_time
|
||||
|
||||
for msg_type, data in segment_data.items():
|
||||
for field in data.keys():
|
||||
if field != 't':
|
||||
self._paths.add(f"{msg_type}/{field}")
|
||||
|
||||
observers = self._observers.copy()
|
||||
|
||||
for callback in observers:
|
||||
callback({'segment_added': True, 'duration': self._duration, 'segment_count': len(self._segments)})
|
||||
|
||||
def _finalize_loading(self):
|
||||
with self._lock:
|
||||
self._loading = False
|
||||
observers = self._observers.copy()
|
||||
duration = self._duration
|
||||
|
||||
for callback in observers:
|
||||
callback({'loading_complete': True, 'duration': duration})
|
||||
@@ -0,0 +1,266 @@
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import numpy as np
|
||||
from collections import deque
|
||||
import dearpygui.dearpygui as dpg
|
||||
|
||||
|
||||
class DataTreeNode:
|
||||
def __init__(self, name: str, full_path: str = "", parent=None):
|
||||
self.name = name
|
||||
self.full_path = full_path
|
||||
self.parent = parent
|
||||
self.children: dict[str, DataTreeNode] = {}
|
||||
self.is_leaf = False
|
||||
self.child_count = 0
|
||||
self.is_plottable: bool | None = None
|
||||
self.ui_created = False
|
||||
self.children_ui_created = False
|
||||
self.ui_tag: str | None = None
|
||||
|
||||
|
||||
class DataTree:
|
||||
MAX_NODES_PER_FRAME = 50
|
||||
|
||||
def __init__(self, data_manager, playback_manager):
|
||||
self.data_manager = data_manager
|
||||
self.playback_manager = playback_manager
|
||||
self.current_search = ""
|
||||
self.data_tree = DataTreeNode(name="root")
|
||||
self._build_queue: deque[tuple[DataTreeNode, str | None, str | int]] = deque()
|
||||
self._all_paths_cache: set[str] = set()
|
||||
self._item_handlers: set[str] = set()
|
||||
self._avg_char_width = None
|
||||
self._queued_search = None
|
||||
self._new_data = False
|
||||
self._ui_lock = threading.RLock()
|
||||
self.data_manager.add_observer(self._on_data_loaded)
|
||||
|
||||
def create_ui(self, parent_tag: str):
|
||||
with dpg.child_window(parent=parent_tag, border=False, width=-1, height=-1):
|
||||
dpg.add_text("Available Data")
|
||||
dpg.add_separator()
|
||||
dpg.add_input_text(tag="search_input", width=-1, hint="Search fields...", callback=self.search_data)
|
||||
dpg.add_separator()
|
||||
with dpg.group(tag="data_tree_container"):
|
||||
pass
|
||||
|
||||
def _on_data_loaded(self, data: dict):
|
||||
with self._ui_lock:
|
||||
if data.get('segment_added'):
|
||||
self._new_data = True
|
||||
elif data.get('reset'):
|
||||
self._all_paths_cache = set()
|
||||
self._new_data = True
|
||||
|
||||
|
||||
def _populate_tree(self):
|
||||
self._clear_ui()
|
||||
self.data_tree = self._add_paths_to_tree(self._all_paths_cache, incremental=False)
|
||||
if self.data_tree:
|
||||
self._request_children_build(self.data_tree)
|
||||
|
||||
def _add_paths_to_tree(self, paths, incremental=False):
|
||||
search_term = self.current_search.strip().lower()
|
||||
filtered_paths = [path for path in paths if self._should_show_path(path, search_term)]
|
||||
target_tree = self.data_tree if incremental else DataTreeNode(name="root")
|
||||
|
||||
if not filtered_paths:
|
||||
return target_tree
|
||||
|
||||
parent_nodes_to_recheck = set()
|
||||
for path in sorted(filtered_paths):
|
||||
parts = path.split('/')
|
||||
current_node = target_tree
|
||||
current_path_prefix = ""
|
||||
|
||||
for i, part in enumerate(parts):
|
||||
current_path_prefix = f"{current_path_prefix}/{part}" if current_path_prefix else part
|
||||
if i < len(parts) - 1:
|
||||
parent_nodes_to_recheck.add(current_node) # for incremental changes from new data
|
||||
if part not in current_node.children:
|
||||
current_node.children[part] = DataTreeNode(name=part, full_path=current_path_prefix, parent=current_node)
|
||||
current_node = current_node.children[part]
|
||||
|
||||
if not current_node.is_leaf:
|
||||
current_node.is_leaf = True
|
||||
|
||||
self._calculate_child_counts(target_tree)
|
||||
if incremental:
|
||||
for p_node in parent_nodes_to_recheck:
|
||||
p_node.children_ui_created = False
|
||||
self._request_children_build(p_node)
|
||||
return target_tree
|
||||
|
||||
def update_frame(self, font):
|
||||
with self._ui_lock:
|
||||
if self._avg_char_width is None and dpg.is_dearpygui_running():
|
||||
self._avg_char_width = self.calculate_avg_char_width(font)
|
||||
|
||||
if self._new_data:
|
||||
current_paths = set(self.data_manager.get_all_paths())
|
||||
new_paths = current_paths - self._all_paths_cache
|
||||
all_paths_empty = not self._all_paths_cache
|
||||
self._all_paths_cache = current_paths
|
||||
if all_paths_empty:
|
||||
self._populate_tree()
|
||||
elif new_paths:
|
||||
self._add_paths_to_tree(new_paths, incremental=True)
|
||||
self._new_data = False
|
||||
return
|
||||
|
||||
if self._queued_search is not None:
|
||||
self.current_search = self._queued_search
|
||||
self._all_paths_cache = set(self.data_manager.get_all_paths())
|
||||
self._populate_tree()
|
||||
self._queued_search = None
|
||||
return
|
||||
|
||||
nodes_processed = 0
|
||||
while self._build_queue and nodes_processed < self.MAX_NODES_PER_FRAME:
|
||||
child_node, parent_tag, before_tag = self._build_queue.popleft()
|
||||
if not child_node.ui_created:
|
||||
if child_node.is_leaf:
|
||||
self._create_leaf_ui(child_node, parent_tag, before_tag)
|
||||
else:
|
||||
self._create_tree_node_ui(child_node, parent_tag, before_tag)
|
||||
nodes_processed += 1
|
||||
|
||||
def search_data(self):
|
||||
self._queued_search = dpg.get_value("search_input")
|
||||
|
||||
def _clear_ui(self):
|
||||
for handler_tag in self._item_handlers:
|
||||
dpg.configure_item(handler_tag, show=False)
|
||||
dpg.set_frame_callback(dpg.get_frame_count() + 1, callback=self._delete_handlers, user_data=list(self._item_handlers))
|
||||
self._item_handlers.clear()
|
||||
|
||||
if dpg.does_item_exist("data_tree_container"):
|
||||
dpg.delete_item("data_tree_container", children_only=True)
|
||||
|
||||
self._build_queue.clear()
|
||||
|
||||
def _delete_handlers(self, sender, app_data, user_data):
|
||||
for handler in user_data:
|
||||
dpg.delete_item(handler)
|
||||
|
||||
def _calculate_child_counts(self, node: DataTreeNode):
|
||||
if node.is_leaf:
|
||||
node.child_count = 0
|
||||
else:
|
||||
node.child_count = len(node.children)
|
||||
for child in node.children.values():
|
||||
self._calculate_child_counts(child)
|
||||
|
||||
def _create_tree_node_ui(self, node: DataTreeNode, parent_tag: str, before: str | int):
|
||||
tag = f"tree_{node.full_path}"
|
||||
node.ui_tag = tag
|
||||
label = f"{node.name} ({node.child_count} fields)"
|
||||
search_term = self.current_search.strip().lower()
|
||||
expand = bool(search_term) and len(search_term) > 1 and any(search_term in path for path in self._get_descendant_paths(node))
|
||||
if expand and node.parent and node.parent.child_count > 100 and node.child_count > 2: # don't fully autoexpand large lists (only affects procLog rn)
|
||||
label += " (+)"
|
||||
expand = False
|
||||
|
||||
with dpg.tree_node(
|
||||
label=label, parent=parent_tag, tag=tag, default_open=expand, open_on_arrow=True, open_on_double_click=True, before=before, delay_search=True
|
||||
):
|
||||
with dpg.item_handler_registry() as handler_tag:
|
||||
dpg.add_item_toggled_open_handler(callback=lambda s, a, u: self._request_children_build(node))
|
||||
dpg.add_item_visible_handler(callback=lambda s, a, u: self._request_children_build(node))
|
||||
dpg.bind_item_handler_registry(tag, handler_tag)
|
||||
self._item_handlers.add(handler_tag)
|
||||
|
||||
node.ui_created = True
|
||||
|
||||
def _create_leaf_ui(self, node: DataTreeNode, parent_tag: str, before: str | int):
|
||||
with dpg.group(parent=parent_tag, tag=f"leaf_{node.full_path}", before=before, delay_search=True) as draggable_group:
|
||||
with dpg.table(header_row=False, policy=dpg.mvTable_SizingStretchProp, delay_search=True):
|
||||
dpg.add_table_column(init_width_or_weight=0.5)
|
||||
dpg.add_table_column(init_width_or_weight=0.5)
|
||||
with dpg.table_row():
|
||||
dpg.add_text(node.name)
|
||||
dpg.add_text("N/A", tag=f"value_{node.full_path}")
|
||||
|
||||
if node.is_plottable is None:
|
||||
node.is_plottable = self.data_manager.is_plottable(node.full_path)
|
||||
if node.is_plottable:
|
||||
with dpg.drag_payload(parent=draggable_group, drag_data=node.full_path, payload_type="TIMESERIES_PAYLOAD"):
|
||||
dpg.add_text(f"Plot: {node.full_path}")
|
||||
|
||||
with dpg.item_handler_registry() as handler_tag:
|
||||
dpg.add_item_visible_handler(callback=self._on_item_visible, user_data=node.full_path)
|
||||
dpg.bind_item_handler_registry(draggable_group, handler_tag)
|
||||
self._item_handlers.add(handler_tag)
|
||||
|
||||
node.ui_created = True
|
||||
node.ui_tag = f"value_{node.full_path}"
|
||||
|
||||
def _on_item_visible(self, sender, app_data, user_data):
|
||||
with self._ui_lock:
|
||||
path = user_data
|
||||
value_tag = f"value_{path}"
|
||||
value_column_width = dpg.get_item_rect_size("sidebar_window")[0] // 2
|
||||
value = self.data_manager.get_value_at(path, self.playback_manager.current_time_s)
|
||||
if value is not None:
|
||||
formatted_value = self.format_and_truncate(value, value_column_width, self._avg_char_width)
|
||||
dpg.set_value(value_tag, formatted_value)
|
||||
else:
|
||||
dpg.set_value(value_tag, "N/A")
|
||||
|
||||
def _request_children_build(self, node: DataTreeNode):
|
||||
with self._ui_lock:
|
||||
if not node.children_ui_created and (node.name == "root" or (node.ui_tag is not None and dpg.get_value(node.ui_tag))): # check root or node expanded
|
||||
parent_tag = "data_tree_container" if node.name == "root" else node.ui_tag
|
||||
sorted_children = sorted(node.children.values(), key=self._natural_sort_key)
|
||||
next_existing: list[int | str] = [0] * len(sorted_children)
|
||||
current_before_tag: int | str = 0
|
||||
|
||||
for i in range(len(sorted_children) - 1, -1, -1): # calculate "before_tag" for correct ordering when incrementally building tree
|
||||
child = sorted_children[i]
|
||||
next_existing[i] = current_before_tag
|
||||
if child.ui_created:
|
||||
candidate_tag = f"leaf_{child.full_path}" if child.is_leaf else f"tree_{child.full_path}"
|
||||
if dpg.does_item_exist(candidate_tag):
|
||||
current_before_tag = candidate_tag
|
||||
|
||||
for i, child_node in enumerate(sorted_children):
|
||||
if not child_node.ui_created:
|
||||
before_tag = next_existing[i]
|
||||
self._build_queue.append((child_node, parent_tag, before_tag))
|
||||
node.children_ui_created = True
|
||||
|
||||
def _should_show_path(self, path: str, search_term: str) -> bool:
|
||||
if 'DEPRECATED' in path and not os.environ.get('SHOW_DEPRECATED'):
|
||||
return False
|
||||
return not search_term or search_term in path.lower()
|
||||
|
||||
def _natural_sort_key(self, node: DataTreeNode):
|
||||
node_type_key = node.is_leaf
|
||||
parts = [int(p) if p.isdigit() else p.lower() for p in re.split(r'(\d+)', node.name) if p]
|
||||
return (node_type_key, parts)
|
||||
|
||||
def _get_descendant_paths(self, node: DataTreeNode):
|
||||
for child_name, child_node in node.children.items():
|
||||
child_name_lower = child_name.lower()
|
||||
if child_node.is_leaf:
|
||||
yield child_name_lower
|
||||
else:
|
||||
for path in self._get_descendant_paths(child_node):
|
||||
yield f"{child_name_lower}/{path}"
|
||||
|
||||
@staticmethod
|
||||
def calculate_avg_char_width(font):
|
||||
sample_text = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
if size := dpg.get_text_size(sample_text, font=font):
|
||||
return size[0] / len(sample_text)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def format_and_truncate(value, available_width: float, avg_char_width: float) -> str:
|
||||
s = f"{value:.5f}" if np.issubdtype(type(value), np.floating) else str(value)
|
||||
max_chars = int(available_width / avg_char_width) - 3
|
||||
if len(s) > max_chars:
|
||||
return s[: max(0, max_chars)] + "..."
|
||||
return s
|
||||
@@ -0,0 +1,262 @@
|
||||
import dearpygui.dearpygui as dpg
|
||||
from openpilot.tools.jotpluggler.data import DataManager
|
||||
from openpilot.tools.jotpluggler.views import TimeSeriesPanel
|
||||
|
||||
GRIP_SIZE = 4
|
||||
MIN_PANE_SIZE = 60
|
||||
|
||||
|
||||
class PlotLayoutManager:
|
||||
def __init__(self, data_manager: DataManager, playback_manager, worker_manager, scale: float = 1.0):
|
||||
self.data_manager = data_manager
|
||||
self.playback_manager = playback_manager
|
||||
self.worker_manager = worker_manager
|
||||
self.scale = scale
|
||||
self.container_tag = "plot_layout_container"
|
||||
self.active_panels: list = []
|
||||
|
||||
self.grip_size = int(GRIP_SIZE * self.scale)
|
||||
self.min_pane_size = int(MIN_PANE_SIZE * self.scale)
|
||||
|
||||
initial_panel = TimeSeriesPanel(data_manager, playback_manager, worker_manager)
|
||||
self.layout: dict = {"type": "panel", "panel": initial_panel}
|
||||
|
||||
def create_ui(self, parent_tag: str):
|
||||
if dpg.does_item_exist(self.container_tag):
|
||||
dpg.delete_item(self.container_tag)
|
||||
|
||||
with dpg.child_window(tag=self.container_tag, parent=parent_tag, border=False, width=-1, height=-1, no_scrollbar=True):
|
||||
container_width, container_height = dpg.get_item_rect_size(self.container_tag)
|
||||
self._create_ui_recursive(self.layout, self.container_tag, [], container_width, container_height)
|
||||
|
||||
def _create_ui_recursive(self, layout: dict, parent_tag: str, path: list[int], width: int, height: int):
|
||||
if layout["type"] == "panel":
|
||||
self._create_panel_ui(layout, parent_tag, path)
|
||||
else:
|
||||
self._create_split_ui(layout, parent_tag, path, width, height)
|
||||
|
||||
def _create_panel_ui(self, layout: dict, parent_tag: str, path: list[int]):
|
||||
panel_tag = self._path_to_tag(path, "panel")
|
||||
panel = layout["panel"]
|
||||
self.active_panels.append(panel)
|
||||
|
||||
with dpg.child_window(tag=panel_tag, parent=parent_tag, border=True, width=-1, height=-1, no_scrollbar=True):
|
||||
with dpg.group(horizontal=True):
|
||||
dpg.add_input_text(default_value=panel.title, width=int(100 * self.scale), callback=lambda s, v: setattr(panel, "title", v))
|
||||
dpg.add_combo(items=["Time Series"], default_value="Time Series", width=int(100 * self.scale))
|
||||
dpg.add_button(label="Clear", callback=lambda: self.clear_panel(panel), width=int(40 * self.scale))
|
||||
dpg.add_button(label="Delete", callback=lambda: self.delete_panel(path), width=int(40 * self.scale))
|
||||
dpg.add_button(label="Split H", callback=lambda: self.split_panel(path, 0), width=int(40 * self.scale))
|
||||
dpg.add_button(label="Split V", callback=lambda: self.split_panel(path, 1), width=int(40 * self.scale))
|
||||
|
||||
dpg.add_separator()
|
||||
|
||||
content_tag = self._path_to_tag(path, "content")
|
||||
with dpg.child_window(tag=content_tag, border=False, height=-1, width=-1, no_scrollbar=True):
|
||||
panel.create_ui(content_tag)
|
||||
|
||||
def _create_split_ui(self, layout: dict, parent_tag: str, path: list[int], width: int, height: int):
|
||||
split_tag = self._path_to_tag(path, "split")
|
||||
orientation, _, pane_sizes = self._get_split_geometry(layout, (width, height))
|
||||
|
||||
with dpg.group(tag=split_tag, parent=parent_tag, horizontal=orientation == 0):
|
||||
for i, child_layout in enumerate(layout["children"]):
|
||||
child_path = path + [i]
|
||||
container_tag = self._path_to_tag(child_path, "container")
|
||||
pane_width, pane_height = [(pane_sizes[i], -1), (-1, pane_sizes[i])][orientation] # fill 2nd dim up to the border
|
||||
with dpg.child_window(tag=container_tag, width=pane_width, height=pane_height, border=False, no_scrollbar=True):
|
||||
child_width, child_height = [(pane_sizes[i], height), (width, pane_sizes[i])][orientation]
|
||||
self._create_ui_recursive(child_layout, container_tag, child_path, child_width, child_height)
|
||||
if i < len(layout["children"]) - 1:
|
||||
self._create_grip(split_tag, path, i, orientation)
|
||||
|
||||
def clear_panel(self, panel):
|
||||
panel.clear()
|
||||
|
||||
def delete_panel(self, panel_path: list[int]):
|
||||
if not panel_path: # Root deletion
|
||||
old_panel = self.layout["panel"]
|
||||
old_panel.destroy_ui()
|
||||
self.active_panels.remove(old_panel)
|
||||
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager)
|
||||
self.layout = {"type": "panel", "panel": new_panel}
|
||||
self._rebuild_ui_at_path([])
|
||||
return
|
||||
|
||||
parent, child_index = self._get_parent_and_index(panel_path)
|
||||
layout_to_delete = parent["children"][child_index]
|
||||
self._cleanup_ui_recursive(layout_to_delete, panel_path)
|
||||
|
||||
parent["children"].pop(child_index)
|
||||
parent["proportions"].pop(child_index)
|
||||
|
||||
if len(parent["children"]) == 1: # remove parent and collapse
|
||||
remaining_child = parent["children"][0]
|
||||
if len(panel_path) == 1: # parent is at root level - promote remaining child to root
|
||||
self.layout = remaining_child
|
||||
self._rebuild_ui_at_path([])
|
||||
else: # replace parent with remaining child in grandparent
|
||||
grandparent_path = panel_path[:-2]
|
||||
parent_index = panel_path[-2]
|
||||
self._replace_layout_at_path(grandparent_path + [parent_index], remaining_child)
|
||||
self._rebuild_ui_at_path(grandparent_path + [parent_index])
|
||||
else: # redistribute proportions
|
||||
equal_prop = 1.0 / len(parent["children"])
|
||||
parent["proportions"] = [equal_prop] * len(parent["children"])
|
||||
self._rebuild_ui_at_path(panel_path[:-1])
|
||||
|
||||
def split_panel(self, panel_path: list[int], orientation: int):
|
||||
current_layout = self._get_layout_at_path(panel_path)
|
||||
existing_panel = current_layout["panel"]
|
||||
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager)
|
||||
parent, child_index = self._get_parent_and_index(panel_path)
|
||||
|
||||
if parent is None: # Root split
|
||||
self.layout = {
|
||||
"type": "split",
|
||||
"orientation": orientation,
|
||||
"children": [{"type": "panel", "panel": existing_panel}, {"type": "panel", "panel": new_panel}],
|
||||
"proportions": [0.5, 0.5],
|
||||
}
|
||||
self._rebuild_ui_at_path([])
|
||||
elif parent["type"] == "split" and parent["orientation"] == orientation: # Same orientation - insert into existing split
|
||||
parent["children"].insert(child_index + 1, {"type": "panel", "panel": new_panel})
|
||||
parent["proportions"] = [1.0 / len(parent["children"])] * len(parent["children"])
|
||||
self._rebuild_ui_at_path(panel_path[:-1])
|
||||
else: # Different orientation - create new split level
|
||||
new_split = {"type": "split", "orientation": orientation, "children": [current_layout, {"type": "panel", "panel": new_panel}], "proportions": [0.5, 0.5]}
|
||||
self._replace_layout_at_path(panel_path, new_split)
|
||||
self._rebuild_ui_at_path(panel_path)
|
||||
|
||||
def _rebuild_ui_at_path(self, path: list[int]):
|
||||
layout = self._get_layout_at_path(path)
|
||||
if path:
|
||||
container_tag = self._path_to_tag(path, "container")
|
||||
else: # Root update
|
||||
container_tag = self.container_tag
|
||||
|
||||
self._cleanup_ui_recursive(layout, path)
|
||||
dpg.delete_item(container_tag, children_only=True)
|
||||
width, height = dpg.get_item_rect_size(container_tag)
|
||||
self._create_ui_recursive(layout, container_tag, path, width, height)
|
||||
|
||||
def _cleanup_ui_recursive(self, layout: dict, path: list[int]):
|
||||
if layout["type"] == "panel":
|
||||
panel = layout["panel"]
|
||||
panel.destroy_ui()
|
||||
if panel in self.active_panels:
|
||||
self.active_panels.remove(panel)
|
||||
else:
|
||||
for i in range(len(layout["children"]) - 1):
|
||||
handler_tag = f"{self._path_to_tag(path, f'grip_{i}')}_handler"
|
||||
if dpg.does_item_exist(handler_tag):
|
||||
dpg.delete_item(handler_tag)
|
||||
|
||||
for i, child in enumerate(layout["children"]):
|
||||
self._cleanup_ui_recursive(child, path + [i])
|
||||
|
||||
def update_all_panels(self):
|
||||
for panel in self.active_panels:
|
||||
panel.update()
|
||||
|
||||
def on_viewport_resize(self):
|
||||
self._resize_splits_recursive(self.layout, [])
|
||||
|
||||
def _resize_splits_recursive(self, layout: dict, path: list[int], width: int | None = None, height: int | None = None):
|
||||
if layout["type"] == "split":
|
||||
split_tag = self._path_to_tag(path, "split")
|
||||
if dpg.does_item_exist(split_tag):
|
||||
available_sizes = (width, height) if width and height else dpg.get_item_rect_size(dpg.get_item_parent(split_tag))
|
||||
orientation, _, pane_sizes = self._get_split_geometry(layout, available_sizes)
|
||||
size_properties = ("width", "height")
|
||||
|
||||
for i, child_layout in enumerate(layout["children"]):
|
||||
child_path = path + [i]
|
||||
container_tag = self._path_to_tag(child_path, "container")
|
||||
if dpg.does_item_exist(container_tag):
|
||||
dpg.configure_item(container_tag, **{size_properties[orientation]: pane_sizes[i]})
|
||||
child_width, child_height = [(pane_sizes[i], available_sizes[1]), (available_sizes[0], pane_sizes[i])][orientation]
|
||||
self._resize_splits_recursive(child_layout, child_path, child_width, child_height)
|
||||
|
||||
def _get_split_geometry(self, layout: dict, available_size: tuple[int, int]) -> tuple[int, int, list[int]]:
|
||||
orientation = layout["orientation"]
|
||||
num_grips = len(layout["children"]) - 1
|
||||
usable_size = max(self.min_pane_size, available_size[orientation] - (num_grips * self.grip_size))
|
||||
pane_sizes = [max(self.min_pane_size, int(usable_size * prop)) for prop in layout["proportions"]]
|
||||
return orientation, usable_size, pane_sizes
|
||||
|
||||
def _get_layout_at_path(self, path: list[int]) -> dict:
|
||||
current = self.layout
|
||||
for index in path:
|
||||
current = current["children"][index]
|
||||
return current
|
||||
|
||||
def _get_parent_and_index(self, path: list[int]) -> tuple:
|
||||
return (None, -1) if not path else (self._get_layout_at_path(path[:-1]), path[-1])
|
||||
|
||||
def _replace_layout_at_path(self, path: list[int], new_layout: dict):
|
||||
if not path:
|
||||
self.layout = new_layout
|
||||
else:
|
||||
parent, index = self._get_parent_and_index(path)
|
||||
parent["children"][index] = new_layout
|
||||
|
||||
def _path_to_tag(self, path: list[int], prefix: str = "") -> str:
|
||||
path_str = "_".join(map(str, path)) if path else "root"
|
||||
return f"{prefix}_{path_str}" if prefix else path_str
|
||||
|
||||
def _create_grip(self, parent_tag: str, path: list[int], grip_index: int, orientation: int):
|
||||
grip_tag = self._path_to_tag(path, f"grip_{grip_index}")
|
||||
width, height = [(self.grip_size, -1), (-1, self.grip_size)][orientation]
|
||||
|
||||
with dpg.child_window(tag=grip_tag, parent=parent_tag, width=width, height=height, no_scrollbar=True, border=False):
|
||||
button_tag = dpg.add_button(label="", width=-1, height=-1)
|
||||
|
||||
with dpg.item_handler_registry(tag=f"{grip_tag}_handler"):
|
||||
user_data = (path, grip_index, orientation)
|
||||
dpg.add_item_active_handler(callback=self._on_grip_drag, user_data=user_data)
|
||||
dpg.add_item_deactivated_handler(callback=self._on_grip_end, user_data=user_data)
|
||||
dpg.bind_item_handler_registry(button_tag, f"{grip_tag}_handler")
|
||||
|
||||
def _on_grip_drag(self, sender, app_data, user_data):
|
||||
path, grip_index, orientation = user_data
|
||||
layout = self._get_layout_at_path(path)
|
||||
|
||||
if "_drag_data" not in layout:
|
||||
layout["_drag_data"] = {"initial_proportions": layout["proportions"][:], "start_mouse": dpg.get_mouse_pos(local=False)[orientation]}
|
||||
return
|
||||
|
||||
drag_data = layout["_drag_data"]
|
||||
split_tag = self._path_to_tag(path, "split")
|
||||
if not dpg.does_item_exist(split_tag):
|
||||
return
|
||||
|
||||
_, usable_size, _ = self._get_split_geometry(layout, dpg.get_item_rect_size(split_tag))
|
||||
current_coord = dpg.get_mouse_pos(local=False)[orientation]
|
||||
delta = current_coord - drag_data["start_mouse"]
|
||||
delta_prop = delta / usable_size
|
||||
|
||||
left_idx = grip_index
|
||||
right_idx = left_idx + 1
|
||||
initial = drag_data["initial_proportions"]
|
||||
min_prop = self.min_pane_size / usable_size
|
||||
|
||||
new_left = max(min_prop, initial[left_idx] + delta_prop)
|
||||
new_right = max(min_prop, initial[right_idx] - delta_prop)
|
||||
|
||||
total_available = initial[left_idx] + initial[right_idx]
|
||||
if new_left + new_right > total_available:
|
||||
if new_left > new_right:
|
||||
new_left = total_available - new_right
|
||||
else:
|
||||
new_right = total_available - new_left
|
||||
|
||||
layout["proportions"] = initial[:]
|
||||
layout["proportions"][left_idx] = new_left
|
||||
layout["proportions"][right_idx] = new_right
|
||||
|
||||
self._resize_splits_recursive(layout, path)
|
||||
|
||||
def _on_grip_end(self, sender, app_data, user_data):
|
||||
path, _, _ = user_data
|
||||
self._get_layout_at_path(path).pop("_drag_data", None)
|
||||
Executable
+247
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
import pyautogui
|
||||
import subprocess
|
||||
import dearpygui.dearpygui as dpg
|
||||
import multiprocessing
|
||||
import uuid
|
||||
import signal
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
from openpilot.tools.jotpluggler.data import DataManager
|
||||
from openpilot.tools.jotpluggler.datatree import DataTree
|
||||
from openpilot.tools.jotpluggler.layout import PlotLayoutManager
|
||||
|
||||
DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
|
||||
|
||||
|
||||
class WorkerManager:
|
||||
def __init__(self, max_workers=None):
|
||||
self.pool = multiprocessing.Pool(max_workers or min(4, multiprocessing.cpu_count()), initializer=WorkerManager.worker_initializer)
|
||||
self.active_tasks = {}
|
||||
|
||||
def submit_task(self, func, args_list, callback=None, task_id=None):
|
||||
task_id = task_id or str(uuid.uuid4())
|
||||
|
||||
if task_id in self.active_tasks:
|
||||
try:
|
||||
self.active_tasks[task_id].terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def handle_success(result):
|
||||
self.active_tasks.pop(task_id, None)
|
||||
if callback:
|
||||
try:
|
||||
callback(result)
|
||||
except Exception as e:
|
||||
print(f"Callback for task {task_id} failed: {e}")
|
||||
|
||||
def handle_error(error):
|
||||
self.active_tasks.pop(task_id, None)
|
||||
print(f"Task {task_id} failed: {error}")
|
||||
|
||||
async_result = self.pool.starmap_async(func, args_list, callback=handle_success, error_callback=handle_error)
|
||||
self.active_tasks[task_id] = async_result
|
||||
return task_id
|
||||
|
||||
@staticmethod
|
||||
def worker_initializer():
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
def shutdown(self):
|
||||
for task in self.active_tasks.values():
|
||||
try:
|
||||
task.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
self.pool.terminate()
|
||||
self.pool.join()
|
||||
|
||||
|
||||
class PlaybackManager:
|
||||
def __init__(self):
|
||||
self.is_playing = False
|
||||
self.current_time_s = 0.0
|
||||
self.duration_s = 0.0
|
||||
|
||||
def set_route_duration(self, duration: float):
|
||||
self.duration_s = duration
|
||||
self.seek(min(self.current_time_s, duration))
|
||||
|
||||
def toggle_play_pause(self):
|
||||
if not self.is_playing and self.current_time_s >= self.duration_s:
|
||||
self.seek(0.0)
|
||||
self.is_playing = not self.is_playing
|
||||
|
||||
def seek(self, time_s: float):
|
||||
self.is_playing = False
|
||||
self.current_time_s = max(0.0, min(time_s, self.duration_s))
|
||||
|
||||
def update_time(self, delta_t: float):
|
||||
if self.is_playing:
|
||||
self.current_time_s = min(self.current_time_s + delta_t, self.duration_s)
|
||||
if self.current_time_s >= self.duration_s:
|
||||
self.is_playing = False
|
||||
return self.current_time_s
|
||||
|
||||
|
||||
class MainController:
|
||||
def __init__(self, scale: float = 1.0):
|
||||
self.scale = scale
|
||||
self.data_manager = DataManager()
|
||||
self.playback_manager = PlaybackManager()
|
||||
self.worker_manager = WorkerManager()
|
||||
self._create_global_themes()
|
||||
self.data_tree = DataTree(self.data_manager, self.playback_manager)
|
||||
self.plot_layout_manager = PlotLayoutManager(self.data_manager, self.playback_manager, self.worker_manager, scale=self.scale)
|
||||
self.data_manager.add_observer(self.on_data_loaded)
|
||||
|
||||
def _create_global_themes(self):
|
||||
with dpg.theme(tag="global_line_theme"):
|
||||
with dpg.theme_component(dpg.mvLineSeries):
|
||||
scaled_thickness = max(1.0, self.scale)
|
||||
dpg.add_theme_style(dpg.mvPlotStyleVar_LineWeight, scaled_thickness, category=dpg.mvThemeCat_Plots)
|
||||
|
||||
with dpg.theme(tag="global_timeline_theme"):
|
||||
with dpg.theme_component(dpg.mvInfLineSeries):
|
||||
scaled_thickness = max(1.0, self.scale)
|
||||
dpg.add_theme_style(dpg.mvPlotStyleVar_LineWeight, scaled_thickness, category=dpg.mvThemeCat_Plots)
|
||||
dpg.add_theme_color(dpg.mvPlotCol_Line, (255, 0, 0, 128), category=dpg.mvThemeCat_Plots)
|
||||
|
||||
|
||||
def on_data_loaded(self, data: dict):
|
||||
duration = data.get('duration', 0.0)
|
||||
self.playback_manager.set_route_duration(duration)
|
||||
|
||||
if data.get('reset'):
|
||||
self.playback_manager.current_time_s = 0.0
|
||||
self.playback_manager.duration_s = 0.0
|
||||
self.playback_manager.is_playing = False
|
||||
dpg.set_value("load_status", "Loading...")
|
||||
dpg.set_value("timeline_slider", 0.0)
|
||||
dpg.configure_item("timeline_slider", max_value=0.0)
|
||||
dpg.configure_item("play_pause_button", label="Play")
|
||||
dpg.configure_item("load_button", enabled=True)
|
||||
elif data.get('loading_complete'):
|
||||
num_paths = len(self.data_manager.get_all_paths())
|
||||
dpg.set_value("load_status", f"Loaded {num_paths} data paths")
|
||||
dpg.configure_item("load_button", enabled=True)
|
||||
elif data.get('segment_added'):
|
||||
segment_count = data.get('segment_count', 0)
|
||||
dpg.set_value("load_status", f"Loading... {segment_count} segments processed")
|
||||
|
||||
dpg.configure_item("timeline_slider", max_value=duration)
|
||||
|
||||
def setup_ui(self):
|
||||
with dpg.window(tag="Primary Window"):
|
||||
with dpg.group(horizontal=True):
|
||||
# Left panel - Data tree
|
||||
with dpg.child_window(label="Sidebar", width=300 * self.scale, tag="sidebar_window", border=True, resizable_x=True):
|
||||
with dpg.group(horizontal=True):
|
||||
dpg.add_input_text(tag="route_input", width=-75 * self.scale, hint="Enter route name...")
|
||||
dpg.add_button(label="Load", callback=self.load_route, tag="load_button", width=-1)
|
||||
dpg.add_text("Ready to load route", tag="load_status")
|
||||
dpg.add_separator()
|
||||
self.data_tree.create_ui("sidebar_window")
|
||||
|
||||
# Right panel - Plots and timeline
|
||||
with dpg.group(tag="right_panel"):
|
||||
with dpg.child_window(label="Plot Window", border=True, height=-(30 + 13 * self.scale), tag="main_plot_area"):
|
||||
self.plot_layout_manager.create_ui("main_plot_area")
|
||||
|
||||
with dpg.child_window(label="Timeline", border=True):
|
||||
with dpg.table(header_row=False, borders_innerH=False, borders_innerV=False, borders_outerH=False, borders_outerV=False):
|
||||
dpg.add_table_column(width_fixed=True, init_width_or_weight=int(50 * self.scale)) # Play button
|
||||
dpg.add_table_column(width_stretch=True) # Timeline slider
|
||||
dpg.add_table_column(width_fixed=True, init_width_or_weight=int(50 * self.scale)) # FPS counter
|
||||
with dpg.table_row():
|
||||
dpg.add_button(label="Play", tag="play_pause_button", callback=self.toggle_play_pause, width=int(50 * self.scale))
|
||||
dpg.add_slider_float(tag="timeline_slider", default_value=0.0, label="", width=-1, callback=self.timeline_drag)
|
||||
dpg.add_text("", tag="fps_counter")
|
||||
with dpg.item_handler_registry(tag="plot_resize_handler"):
|
||||
dpg.add_item_resize_handler(callback=self.on_plot_resize)
|
||||
dpg.bind_item_handler_registry("right_panel", "plot_resize_handler")
|
||||
|
||||
dpg.set_primary_window("Primary Window", True)
|
||||
|
||||
def on_plot_resize(self, sender, app_data, user_data):
|
||||
self.plot_layout_manager.on_viewport_resize()
|
||||
|
||||
def load_route(self):
|
||||
route_name = dpg.get_value("route_input").strip()
|
||||
if route_name:
|
||||
dpg.set_value("load_status", "Loading route...")
|
||||
dpg.configure_item("load_button", enabled=False)
|
||||
self.data_manager.load_route(route_name)
|
||||
|
||||
def toggle_play_pause(self, sender):
|
||||
self.playback_manager.toggle_play_pause()
|
||||
label = "Pause" if self.playback_manager.is_playing else "Play"
|
||||
dpg.configure_item(sender, label=label)
|
||||
|
||||
def timeline_drag(self, sender, app_data):
|
||||
self.playback_manager.seek(app_data)
|
||||
dpg.configure_item("play_pause_button", label="Play")
|
||||
|
||||
def update_frame(self, font):
|
||||
self.data_tree.update_frame(font)
|
||||
|
||||
new_time = self.playback_manager.update_time(dpg.get_delta_time())
|
||||
if not dpg.is_item_active("timeline_slider"):
|
||||
dpg.set_value("timeline_slider", new_time)
|
||||
|
||||
self.plot_layout_manager.update_all_panels()
|
||||
|
||||
dpg.set_value("fps_counter", f"{dpg.get_frame_rate():.1f} FPS")
|
||||
|
||||
def shutdown(self):
|
||||
self.worker_manager.shutdown()
|
||||
|
||||
|
||||
def main(route_to_load=None):
|
||||
dpg.create_context()
|
||||
|
||||
# TODO: find better way of calculating display scaling
|
||||
try:
|
||||
w, h = next(tuple(map(int, l.split()[0].split('x'))) for l in subprocess.check_output(['xrandr']).decode().split('\n') if '*' in l) # actual resolution
|
||||
scale = pyautogui.size()[0] / w # scaled resolution
|
||||
except Exception:
|
||||
scale = 1
|
||||
|
||||
with dpg.font_registry():
|
||||
default_font = dpg.add_font(os.path.join(BASEDIR, "selfdrive/assets/fonts/Inter-Regular.ttf"), int(13 * scale))
|
||||
dpg.bind_font(default_font)
|
||||
|
||||
viewport_width, viewport_height = int(1200 * scale), int(800 * scale)
|
||||
mouse_x, mouse_y = pyautogui.position() # TODO: find better way of creating the window where the user is (default dpg behavior annoying on multiple displays)
|
||||
dpg.create_viewport(
|
||||
title='JotPluggler', width=viewport_width, height=viewport_height, x_pos=mouse_x - viewport_width // 2, y_pos=mouse_y - viewport_height // 2
|
||||
)
|
||||
dpg.setup_dearpygui()
|
||||
|
||||
controller = MainController(scale=scale)
|
||||
controller.setup_ui()
|
||||
|
||||
if route_to_load:
|
||||
dpg.set_value("route_input", route_to_load)
|
||||
controller.load_route()
|
||||
|
||||
dpg.show_viewport()
|
||||
|
||||
# Main loop
|
||||
try:
|
||||
while dpg.is_dearpygui_running():
|
||||
controller.update_frame(default_font)
|
||||
dpg.render_dearpygui_frame()
|
||||
finally:
|
||||
controller.shutdown()
|
||||
dpg.destroy_context()
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="A tool for visualizing openpilot logs.")
|
||||
parser.add_argument("--demo", action="store_true", help="Use the demo route instead of providing one")
|
||||
parser.add_argument("route", nargs='?', default=None, help="Optional route name to load on startup.")
|
||||
args = parser.parse_args()
|
||||
route = DEMO_ROUTE if args.demo else args.route
|
||||
main(route_to_load=route)
|
||||
@@ -0,0 +1,195 @@
|
||||
import uuid
|
||||
import threading
|
||||
import numpy as np
|
||||
from collections import deque
|
||||
import dearpygui.dearpygui as dpg
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ViewPanel(ABC):
|
||||
"""Abstract base class for all view panels that can be displayed in a plot container"""
|
||||
|
||||
def __init__(self, panel_id: str = None):
|
||||
self.panel_id = panel_id or str(uuid.uuid4())
|
||||
self.title = "Untitled Panel"
|
||||
|
||||
@abstractmethod
|
||||
def clear(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_ui(self, parent_tag: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def destroy_ui(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_panel_type(self) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update(self):
|
||||
pass
|
||||
|
||||
|
||||
class TimeSeriesPanel(ViewPanel):
|
||||
def __init__(self, data_manager, playback_manager, worker_manager, panel_id: str | None = None):
|
||||
super().__init__(panel_id)
|
||||
self.data_manager = data_manager
|
||||
self.playback_manager = playback_manager
|
||||
self.worker_manager = worker_manager
|
||||
self.title = "Time Series Plot"
|
||||
self.plot_tag = f"plot_{self.panel_id}"
|
||||
self.x_axis_tag = f"{self.plot_tag}_x_axis"
|
||||
self.y_axis_tag = f"{self.plot_tag}_y_axis"
|
||||
self.timeline_indicator_tag = f"{self.plot_tag}_timeline"
|
||||
self._ui_created = False
|
||||
self._series_data: dict[str, tuple[list, list]] = {}
|
||||
self._last_plot_duration = 0
|
||||
self._update_lock = threading.RLock()
|
||||
self.results_deque: deque[tuple[str, list, list]] = deque()
|
||||
self._new_data = False
|
||||
|
||||
def create_ui(self, parent_tag: str):
|
||||
self.data_manager.add_observer(self.on_data_loaded)
|
||||
with dpg.plot(height=-1, width=-1, tag=self.plot_tag, parent=parent_tag, drop_callback=self._on_series_drop, payload_type="TIMESERIES_PAYLOAD"):
|
||||
dpg.add_plot_legend()
|
||||
dpg.add_plot_axis(dpg.mvXAxis, no_label=True, tag=self.x_axis_tag)
|
||||
dpg.add_plot_axis(dpg.mvYAxis, no_label=True, tag=self.y_axis_tag)
|
||||
timeline_series_tag = dpg.add_inf_line_series(x=[0], label="Timeline", parent=self.y_axis_tag, tag=self.timeline_indicator_tag)
|
||||
dpg.bind_item_theme(timeline_series_tag, "global_timeline_theme")
|
||||
|
||||
for series_path in list(self._series_data.keys()):
|
||||
self.add_series(series_path)
|
||||
self._ui_created = True
|
||||
|
||||
def update(self):
|
||||
with self._update_lock:
|
||||
if not self._ui_created:
|
||||
return
|
||||
|
||||
if self._new_data: # handle new data in main thread
|
||||
self._new_data = False
|
||||
for series_path in list(self._series_data.keys()):
|
||||
self.add_series(series_path, update=True)
|
||||
|
||||
while self.results_deque: # handle downsampled results in main thread
|
||||
results = self.results_deque.popleft()
|
||||
for series_path, downsampled_time, downsampled_values in results:
|
||||
series_tag = f"series_{self.panel_id}_{series_path}"
|
||||
if dpg.does_item_exist(series_tag):
|
||||
dpg.set_value(series_tag, [downsampled_time, downsampled_values])
|
||||
|
||||
# update timeline
|
||||
current_time_s = self.playback_manager.current_time_s
|
||||
dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]])
|
||||
|
||||
# update timeseries legend label
|
||||
for series_path, (time_array, value_array) in self._series_data.items():
|
||||
position = np.searchsorted(time_array, current_time_s, side='right') - 1
|
||||
if position >= 0 and (current_time_s - time_array[position]) <= 1.0:
|
||||
value = value_array[position]
|
||||
formatted_value = f"{value:.5f}" if np.issubdtype(type(value), np.floating) else str(value)
|
||||
series_tag = f"series_{self.panel_id}_{series_path}"
|
||||
if dpg.does_item_exist(series_tag):
|
||||
dpg.configure_item(series_tag, label=f"{series_path}: {formatted_value}")
|
||||
|
||||
# downsample if plot zoom changed significantly
|
||||
plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0]
|
||||
if plot_duration > self._last_plot_duration * 2 or plot_duration < self._last_plot_duration * 0.5:
|
||||
self._downsample_all_series(plot_duration)
|
||||
|
||||
def _downsample_all_series(self, plot_duration):
|
||||
plot_width = dpg.get_item_rect_size(self.plot_tag)[0]
|
||||
if plot_width <= 0 or plot_duration <= 0:
|
||||
return
|
||||
|
||||
self._last_plot_duration = plot_duration
|
||||
target_points_per_second = plot_width / plot_duration
|
||||
work_items = []
|
||||
for series_path, (time_array, value_array) in self._series_data.items():
|
||||
if len(time_array) == 0:
|
||||
continue
|
||||
series_duration = time_array[-1] - time_array[0] if len(time_array) > 1 else 1
|
||||
points_per_second = len(time_array) / series_duration
|
||||
if points_per_second > target_points_per_second * 2:
|
||||
target_points = max(int(target_points_per_second * series_duration), plot_width)
|
||||
work_items.append((series_path, time_array, value_array, target_points))
|
||||
elif dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
|
||||
dpg.set_value(f"series_{self.panel_id}_{series_path}", [time_array, value_array])
|
||||
|
||||
if work_items:
|
||||
self.worker_manager.submit_task(
|
||||
TimeSeriesPanel._downsample_worker, work_items, callback=lambda results: self.results_deque.append(results), task_id=f"downsample_{self.panel_id}"
|
||||
)
|
||||
|
||||
def add_series(self, series_path: str, update: bool = False):
|
||||
with self._update_lock:
|
||||
if update or series_path not in self._series_data:
|
||||
self._series_data[series_path] = self.data_manager.get_timeseries(series_path)
|
||||
|
||||
time_array, value_array = self._series_data[series_path]
|
||||
series_tag = f"series_{self.panel_id}_{series_path}"
|
||||
if dpg.does_item_exist(series_tag):
|
||||
dpg.set_value(series_tag, [time_array, value_array])
|
||||
else:
|
||||
line_series_tag = dpg.add_line_series(x=time_array, y=value_array, label=series_path, parent=self.y_axis_tag, tag=series_tag)
|
||||
dpg.bind_item_theme(line_series_tag, "global_line_theme")
|
||||
dpg.fit_axis_data(self.x_axis_tag)
|
||||
dpg.fit_axis_data(self.y_axis_tag)
|
||||
plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0]
|
||||
self._downsample_all_series(plot_duration)
|
||||
|
||||
def destroy_ui(self):
|
||||
with self._update_lock:
|
||||
self.data_manager.remove_observer(self.on_data_loaded)
|
||||
if dpg.does_item_exist(self.plot_tag):
|
||||
dpg.delete_item(self.plot_tag)
|
||||
self._ui_created = False
|
||||
|
||||
def get_panel_type(self) -> str:
|
||||
return "timeseries"
|
||||
|
||||
def clear(self):
|
||||
with self._update_lock:
|
||||
for series_path in list(self._series_data.keys()):
|
||||
self.remove_series(series_path)
|
||||
|
||||
def remove_series(self, series_path: str):
|
||||
with self._update_lock:
|
||||
if series_path in self._series_data:
|
||||
if dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
|
||||
dpg.delete_item(f"series_{self.panel_id}_{series_path}")
|
||||
del self._series_data[series_path]
|
||||
|
||||
def on_data_loaded(self, data: dict):
|
||||
self._new_data = True
|
||||
|
||||
def _on_series_drop(self, sender, app_data, user_data):
|
||||
self.add_series(app_data)
|
||||
|
||||
@staticmethod
|
||||
def _downsample_worker(series_path, time_array, value_array, target_points):
|
||||
if len(time_array) <= target_points:
|
||||
return series_path, time_array, value_array
|
||||
|
||||
step = len(time_array) / target_points
|
||||
indices = []
|
||||
|
||||
for i in range(target_points):
|
||||
start_idx = int(i * step)
|
||||
end_idx = int((i + 1) * step)
|
||||
if start_idx == end_idx:
|
||||
indices.append(start_idx)
|
||||
else:
|
||||
bucket_values = value_array[start_idx:end_idx]
|
||||
min_idx = start_idx + np.argmin(bucket_values)
|
||||
max_idx = start_idx + np.argmax(bucket_values)
|
||||
if min_idx != max_idx:
|
||||
indices.extend([min(min_idx, max_idx), max(min_idx, max_idx)])
|
||||
else:
|
||||
indices.append(min_idx)
|
||||
indices = sorted(set(indices))
|
||||
return series_path, time_array[indices], value_array[indices]
|
||||
@@ -451,6 +451,21 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/d2/fc/c0a3f4c4eaa5a22fbef91713474666e13d0ea2a69c84532579490a9f2cc8/dbus_next-0.2.3-py3-none-any.whl", hash = "sha256:58948f9aff9db08316734c0be2a120f6dc502124d9642f55e90ac82ffb16a18b", size = 57885, upload-time = "2021-07-25T22:11:25.466Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dearpygui"
|
||||
version = "2.1.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/92/fe/66293fc40254a29f060efd3398f2b1001ed79263ae1837db9ec42caa8f1d/dearpygui-2.1.0-cp311-cp311-macosx_10_6_x86_64.whl", hash = "sha256:03e5dc0b3dd2f7965e50bbe41f3316a814408064b582586de994d93afedb125c", size = 2100924, upload-time = "2025-07-07T14:20:00.602Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c4/4d/9fa1c3156ba7bbf4dc89e2e322998752fccfdc3575923a98dd6a4da48911/dearpygui-2.1.0-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:b5b37710c3fa135c48e2347f39ecd1f415146e86db5d404707a0bf72d16bd304", size = 1874441, upload-time = "2025-07-07T14:20:09.165Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/5a/3c/af5673b50699e1734296a0b5bcef39bb6989175b001ad1f9b0e7888ad90d/dearpygui-2.1.0-cp311-cp311-manylinux1_x86_64.whl", hash = "sha256:b0cfd7ac7eaa090fc22d6aa60fc4b527fc631cee10c348e4d8df92bb39af03d2", size = 2636574, upload-time = "2025-07-07T14:20:14.951Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/7f/db/ed4db0bb3d88e7a8c405472641419086bef9632c4b8b0489dc0c43519c0d/dearpygui-2.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:a9af54f96d3ef30c5db9d12cdf3266f005507396fb0da2e12e6b22b662161070", size = 1810266, upload-time = "2025-07-07T14:19:51.565Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/55/9d/20a55786cc9d9266395544463d5db3be3528f7d5244bc52ba760de5dcc2d/dearpygui-2.1.0-cp312-cp312-macosx_10_6_x86_64.whl", hash = "sha256:1270ceb9cdb8ecc047c42477ccaa075b7864b314a5d09191f9280a24c8aa90a0", size = 2101499, upload-time = "2025-07-07T14:20:01.701Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a7/b2/39d820796b7ac4d0ebf93306c1f031bf3516b159408286f1fb495c6babeb/dearpygui-2.1.0-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:ce9969eb62057b9d4c88a8baaed13b5fbe4058caa9faf5b19fec89da75aece3d", size = 1874385, upload-time = "2025-07-07T14:20:11.226Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/fc/26/c29998ffeb5eb8d638f307851e51a81c8bd4aeaf89ad660fc67ea4d1ac1a/dearpygui-2.1.0-cp312-cp312-manylinux1_x86_64.whl", hash = "sha256:a3ca8cf788db63ef7e2e8d6f277631b607d548b37606f080ca1b42b1f0a9b183", size = 2635863, upload-time = "2025-07-07T14:20:17.186Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/28/9c/3ab33927f1d8c839c5b7033a33d44fc9f0aeb00c264fc9772cb7555a03c4/dearpygui-2.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:43f0e4db9402f44fc3683a1f5c703564819de18cc15a042de7f1ed1c8cb5d148", size = 1810460, upload-time = "2025-07-07T14:19:53.13Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dictdiffer"
|
||||
version = "0.9.0"
|
||||
@@ -1246,6 +1261,7 @@ dependencies = [
|
||||
{ name = "cffi" },
|
||||
{ name = "crcmod" },
|
||||
{ name = "cython" },
|
||||
{ name = "dearpygui" },
|
||||
{ name = "future-fstrings" },
|
||||
{ name = "inputs" },
|
||||
{ name = "json-rpc" },
|
||||
@@ -1337,6 +1353,7 @@ requires-dist = [
|
||||
{ name = "crcmod" },
|
||||
{ name = "cython" },
|
||||
{ name = "dbus-next", marker = "extra == 'dev'" },
|
||||
{ name = "dearpygui", specifier = ">=2.1.0" },
|
||||
{ name = "dictdiffer", marker = "extra == 'dev'" },
|
||||
{ name = "future-fstrings" },
|
||||
{ name = "hypothesis", marker = "extra == 'testing'", specifier = "==6.47.*" },
|
||||
|
||||
Reference in New Issue
Block a user