mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-26 16:32:06 +08:00
replay/ui.py: get image from vipc (#22433)
* get image from vipc * cleanup import * if not None * lgtm * bump cereal&do reconnect * bump cereal Co-authored-by: Willem Melching <willem.melching@gmail.com>
This commit is contained in:
+1
-1
Submodule cereal updated: 2881d47cea...d5a85fe5d7
+10
-9
@@ -13,13 +13,14 @@ import cereal.messaging as messaging
|
||||
from common.numpy_fast import clip
|
||||
from common.basedir import BASEDIR
|
||||
from selfdrive.config import UIParams as UP
|
||||
from tools.replay.lib.ui_helpers import (_BB_TO_FULL_FRAME, _FULL_FRAME_SIZE,
|
||||
from tools.replay.lib.ui_helpers import (_BB_TO_FULL_FRAME,
|
||||
_INTRINSICS, BLACK, GREEN,
|
||||
YELLOW, Calibration,
|
||||
get_blank_lid_overlay, init_plots,
|
||||
maybe_update_radar_points, plot_lead,
|
||||
plot_model,
|
||||
pygame_modules_have_loaded)
|
||||
from cereal.visionipc.visionipc_pyx import VisionIpcClient, VisionStreamType # pylint: disable=no-name-in-module, import-error
|
||||
|
||||
os.environ['BASEDIR'] = BASEDIR
|
||||
|
||||
@@ -55,7 +56,6 @@ def ui_thread(addr, frame_address):
|
||||
camera_surface = pygame.surface.Surface((640, 480), 0, 24).convert()
|
||||
top_down_surface = pygame.surface.Surface((UP.lidar_x, UP.lidar_y), 0, 8)
|
||||
|
||||
frame = messaging.sub_sock('roadCameraState', addr=addr, conflate=True)
|
||||
sm = messaging.SubMaster(['carState', 'longitudinalPlan', 'carControl', 'radarState', 'liveCalibration', 'controlsState',
|
||||
'liveTracks', 'modelV2', 'liveParameters', 'lateralPlan', 'roadCameraState'], addr=addr)
|
||||
|
||||
@@ -101,6 +101,7 @@ def ui_thread(addr, frame_address):
|
||||
|
||||
draw_plots = init_plots(plot_arr, name_to_arr_idx, plot_xlims, plot_ylims, plot_names, plot_colors, plot_styles, bigplots=True)
|
||||
|
||||
vipc_client = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_RGB_BACK, True)
|
||||
while 1:
|
||||
list(pygame.event.get())
|
||||
|
||||
@@ -109,19 +110,19 @@ def ui_thread(addr, frame_address):
|
||||
top_down = top_down_surface, lid_overlay
|
||||
|
||||
# ***** frame *****
|
||||
fpkt = messaging.recv_one(frame)
|
||||
rgb_img_raw = fpkt.roadCameraState.image
|
||||
if not vipc_client.is_connected():
|
||||
vipc_client.connect(True)
|
||||
|
||||
num_px = len(rgb_img_raw) // 3
|
||||
if rgb_img_raw and num_px in _FULL_FRAME_SIZE.keys():
|
||||
FULL_FRAME_SIZE = _FULL_FRAME_SIZE[num_px]
|
||||
rgb_img_raw = vipc_client.recv()
|
||||
|
||||
imgff_shape = (FULL_FRAME_SIZE[1], FULL_FRAME_SIZE[0], 3)
|
||||
if rgb_img_raw is not None and rgb_img_raw.any():
|
||||
num_px = len(rgb_img_raw) // 3
|
||||
imgff_shape = (vipc_client.height, vipc_client.width, 3)
|
||||
|
||||
if imgff is None or imgff.shape != imgff_shape:
|
||||
imgff = np.zeros(imgff_shape, dtype=np.uint8)
|
||||
|
||||
imgff = np.frombuffer(rgb_img_raw, dtype=np.uint8).reshape((FULL_FRAME_SIZE[1], FULL_FRAME_SIZE[0], 3))
|
||||
imgff = np.frombuffer(rgb_img_raw, dtype=np.uint8).reshape((vipc_client.height, vipc_client.width, 3))
|
||||
imgff = imgff[:, :, ::-1] # Convert BGR to RGB
|
||||
zoom_matrix = _BB_TO_FULL_FRAME[num_px]
|
||||
cv2.warpAffine(imgff, zoom_matrix[:2], (img.shape[1], img.shape[0]), dst=img, flags=cv2.WARP_INVERSE_MAP)
|
||||
|
||||
Reference in New Issue
Block a user