mirror of
https://github.com/commaai/msgq.git
synced 2026-06-08 05:54:44 +08:00
Restructure package (#622)
* Move around * rename messaging to ipc * More renames * refactor visionipc ipc * more movement * compiles * works well * update workflow * Update * test fake * fix names * Fix test * exclude library * exclude from lint too * Rm dir * rm this wayu * Try again * mv logger * delete old * HAX * Move logger down * add warning abck
This commit is contained in:
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -40,8 +40,8 @@ jobs:
|
||||
run: |
|
||||
$RUN "export ${{ matrix.backend }}=1 && \
|
||||
scons ${{ matrix.flags }} -j$(nproc) && \
|
||||
messaging/test_runner && \
|
||||
visionipc/test_runner"
|
||||
msgq/test_runner && \
|
||||
msgq/visionipc/test_runner"
|
||||
- name: python tests
|
||||
run: $RUN_NAMED "${{ matrix.backend }}=1 coverage run -m unittest discover ."
|
||||
- name: Upload coverage
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
files: ^msgq/
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.6.0
|
||||
@@ -21,7 +22,7 @@ repos:
|
||||
entry: cppcheck
|
||||
language: system
|
||||
types: [c++]
|
||||
exclude: '^(messaging/msgq_tests.cc|messaging/test_runner.cc)'
|
||||
exclude: '^(msgq/msgq_tests.cc|msgq/test_runner.cc)'
|
||||
args:
|
||||
- --error-exitcode=1
|
||||
- --inline-suppr
|
||||
|
||||
@@ -37,11 +37,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
|
||||
RUN pip3 install --break-system-packages --no-cache-dir pyyaml Cython scons pycapnp pre-commit ruff parameterized coverage numpy
|
||||
|
||||
WORKDIR /project/
|
||||
WORKDIR /project/msgq/
|
||||
RUN cd /tmp/ && \
|
||||
git clone -b v2.x --depth 1 https://github.com/catchorg/Catch2.git && \
|
||||
cd Catch2 && \
|
||||
mv single_include/catch2/ /project/ && \
|
||||
mv single_include/* /project/msgq/ && \
|
||||
cd .. \
|
||||
rm -rf Catch2
|
||||
|
||||
@@ -50,5 +50,5 @@ WORKDIR /project/msgq
|
||||
ENV PYTHONPATH=/project
|
||||
|
||||
COPY . .
|
||||
RUN rm -rf .git && \
|
||||
RUN ls && rm -rf .git && \
|
||||
scons -c && scons -j$(nproc)
|
||||
|
||||
40
SConscript
40
SConscript
@@ -1,27 +1,24 @@
|
||||
Import('env', 'envCython', 'arch', 'common')
|
||||
|
||||
|
||||
visionipc_dir = Dir('visionipc')
|
||||
visionipc_dir = Dir('msgq/visionipc')
|
||||
gen_dir = Dir('gen')
|
||||
|
||||
|
||||
# Build messaging
|
||||
|
||||
|
||||
messaging_objects = env.SharedObject([
|
||||
'messaging/messaging.cc',
|
||||
'messaging/event.cc',
|
||||
'messaging/impl_zmq.cc',
|
||||
'messaging/impl_msgq.cc',
|
||||
'messaging/impl_fake.cc',
|
||||
'messaging/msgq.cc',
|
||||
# Build msgq
|
||||
msgq_objects = env.SharedObject([
|
||||
'msgq/ipc.cc',
|
||||
'msgq/event.cc',
|
||||
'msgq/impl_zmq.cc',
|
||||
'msgq/impl_msgq.cc',
|
||||
'msgq/impl_fake.cc',
|
||||
'msgq/msgq.cc',
|
||||
])
|
||||
messaging = env.Library('messaging', messaging_objects)
|
||||
messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common])
|
||||
|
||||
msgq = env.Library('msgq', msgq_objects)
|
||||
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
|
||||
|
||||
# Build Vision IPC
|
||||
vipc_files = ['ipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
|
||||
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
|
||||
vipc_sources = [f'{visionipc_dir.abspath}/{f}' for f in vipc_files]
|
||||
|
||||
if arch == "larch64":
|
||||
@@ -29,12 +26,15 @@ if arch == "larch64":
|
||||
else:
|
||||
vipc_sources += [f'{visionipc_dir.abspath}/visionbuf_cl.cc']
|
||||
|
||||
print(f'Building Vision IPC with {vipc_sources}')
|
||||
vipc_objects = env.SharedObject(vipc_sources)
|
||||
print(f'Building Vision IPC with {vipc_objects}')
|
||||
|
||||
visionipc = env.Library('visionipc', vipc_objects)
|
||||
|
||||
|
||||
vipc_frameworks = []
|
||||
vipc_libs = envCython["LIBS"] + [visionipc, messaging, common, "zmq"]
|
||||
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
|
||||
if arch == "Darwin":
|
||||
vipc_frameworks.append('OpenCL')
|
||||
else:
|
||||
@@ -43,9 +43,9 @@ envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.
|
||||
LIBS=vipc_libs, FRAMEWORKS=vipc_frameworks)
|
||||
|
||||
if GetOption('extras'):
|
||||
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging, common])
|
||||
env.Program('visionipc/test_runner',
|
||||
['visionipc/test_runner.cc', 'visionipc/visionipc_tests.cc'],
|
||||
env.Program('msgq/test_runner', ['msgq/test_runner.cc', 'msgq/msgq_tests.cc'], LIBS=[msgq, common])
|
||||
env.Program(f'{visionipc_dir.abspath}/test_runner',
|
||||
[f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'],
|
||||
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)
|
||||
|
||||
Export('visionipc', 'messaging', 'messaging_python')
|
||||
Export('visionipc', 'msgq', 'msgq_python')
|
||||
|
||||
@@ -11,7 +11,8 @@ if platform.system() == "Darwin":
|
||||
common = ''
|
||||
|
||||
cpppath = [
|
||||
f"#/../",
|
||||
f"#/",
|
||||
'#msgq/',
|
||||
'/usr/lib/include',
|
||||
'/opt/homebrew/include',
|
||||
sysconfig.get_paths()['include'],
|
||||
|
||||
1
messaging/.gitignore
vendored
1
messaging/.gitignore
vendored
@@ -1 +0,0 @@
|
||||
messaging_pyx.cpp
|
||||
1
msgq/.gitignore
vendored
Normal file
1
msgq/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
ipc_pyx.cpp
|
||||
@@ -1,12 +1,12 @@
|
||||
# must be built with scons
|
||||
from msgq.messaging.messaging_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
|
||||
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
|
||||
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
|
||||
from msgq.messaging.messaging_pyx import MultiplePublishersError, MessagingError
|
||||
from msgq.ipc_pyx import MultiplePublishersError, IpcError
|
||||
|
||||
from typing import Optional, List
|
||||
|
||||
assert MultiplePublishersError
|
||||
assert MessagingError
|
||||
assert IpcError
|
||||
assert toggle_fake_events
|
||||
assert set_fake_prefix
|
||||
assert get_fake_prefix
|
||||
@@ -13,7 +13,7 @@
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "msgq/messaging/event.h"
|
||||
#include "msgq/event.h"
|
||||
|
||||
#ifndef __APPLE__
|
||||
#include <sys/eventfd.h>
|
||||
@@ -1,4 +1,4 @@
|
||||
#include "msgq/messaging/impl_fake.h"
|
||||
#include "msgq/impl_fake.h"
|
||||
|
||||
void FakePoller::registerSocket(SubSocket *socket) {
|
||||
this->sockets.push_back(socket);
|
||||
@@ -11,8 +11,8 @@
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/messaging/event.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/event.h"
|
||||
|
||||
template<typename TSubSocket>
|
||||
class FakeSubSocket: public TSubSocket {
|
||||
@@ -5,7 +5,7 @@
|
||||
#include <csignal>
|
||||
#include <cerrno>
|
||||
|
||||
#include "msgq/messaging/impl_msgq.h"
|
||||
#include "msgq/impl_msgq.h"
|
||||
|
||||
|
||||
volatile sig_atomic_t msgq_do_exit = 0;
|
||||
@@ -3,8 +3,8 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/messaging/msgq.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/msgq.h"
|
||||
|
||||
#define MAX_POLLERS 128
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#include <cerrno>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "msgq/messaging/impl_zmq.h"
|
||||
#include "msgq/impl_zmq.h"
|
||||
|
||||
//FIXME: This is a hack to get the port number from the socket name, might have collisions
|
||||
static int get_port(std::string endpoint) {
|
||||
@@ -4,7 +4,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/ipc.h"
|
||||
|
||||
#define MAX_POLLERS 128
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
#include <cassert>
|
||||
#include <iostream>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/messaging/impl_zmq.h"
|
||||
#include "msgq/messaging/impl_msgq.h"
|
||||
#include "msgq/messaging/impl_fake.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/impl_zmq.h"
|
||||
#include "msgq/impl_msgq.h"
|
||||
#include "msgq/impl_fake.h"
|
||||
|
||||
#ifdef __APPLE__
|
||||
const bool MUST_USE_ZMQ = true;
|
||||
@@ -6,7 +6,7 @@ from libcpp.vector cimport vector
|
||||
from libcpp cimport bool
|
||||
|
||||
|
||||
cdef extern from "msgq/messaging/impl_fake.h":
|
||||
cdef extern from "msgq/impl_fake.h":
|
||||
cdef cppclass Event:
|
||||
@staticmethod
|
||||
int wait_for_one(vector[Event], int) except +
|
||||
@@ -34,7 +34,7 @@ cdef extern from "msgq/messaging/impl_fake.h":
|
||||
Event recv_ready()
|
||||
|
||||
|
||||
cdef extern from "msgq/messaging/messaging.h":
|
||||
cdef extern from "msgq/ipc.h":
|
||||
cdef cppclass Context:
|
||||
@staticmethod
|
||||
Context * create()
|
||||
@@ -10,22 +10,22 @@ from libc.string cimport strerror
|
||||
from cython.operator import dereference
|
||||
|
||||
|
||||
from .messaging cimport Context as cppContext
|
||||
from .messaging cimport SubSocket as cppSubSocket
|
||||
from .messaging cimport PubSocket as cppPubSocket
|
||||
from .messaging cimport Poller as cppPoller
|
||||
from .messaging cimport Message as cppMessage
|
||||
from .messaging cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle
|
||||
from .ipc cimport Context as cppContext
|
||||
from .ipc cimport SubSocket as cppSubSocket
|
||||
from .ipc cimport PubSocket as cppPubSocket
|
||||
from .ipc cimport Poller as cppPoller
|
||||
from .ipc cimport Message as cppMessage
|
||||
from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle
|
||||
|
||||
|
||||
class MessagingError(Exception):
|
||||
class IpcError(Exception):
|
||||
def __init__(self, endpoint=None):
|
||||
suffix = f"with {endpoint.decode('utf-8')}" if endpoint else ""
|
||||
message = f"Messaging failure {suffix}: {strerror(errno.errno).decode('utf-8')}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class MultiplePublishersError(MessagingError):
|
||||
class MultiplePublishersError(IpcError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -170,7 +170,7 @@ cdef class SubSocket:
|
||||
self.is_owner = True
|
||||
|
||||
if self.socket == NULL:
|
||||
raise MessagingError
|
||||
raise IpcError
|
||||
|
||||
def __dealloc__(self):
|
||||
if self.is_owner:
|
||||
@@ -190,7 +190,7 @@ cdef class SubSocket:
|
||||
if errno.errno == errno.EADDRINUSE:
|
||||
raise MultiplePublishersError(endpoint)
|
||||
else:
|
||||
raise MessagingError(endpoint)
|
||||
raise IpcError(endpoint)
|
||||
|
||||
def setTimeout(self, int timeout):
|
||||
self.socket.setTimeout(timeout)
|
||||
@@ -219,7 +219,7 @@ cdef class PubSocket:
|
||||
def __cinit__(self):
|
||||
self.socket = cppPubSocket.create()
|
||||
if self.socket == NULL:
|
||||
raise MessagingError
|
||||
raise IpcError
|
||||
|
||||
def __dealloc__(self):
|
||||
del self.socket
|
||||
@@ -231,7 +231,7 @@ cdef class PubSocket:
|
||||
if errno.errno == errno.EADDRINUSE:
|
||||
raise MultiplePublishersError(endpoint)
|
||||
else:
|
||||
raise MessagingError(endpoint)
|
||||
raise IpcError(endpoint)
|
||||
|
||||
def send(self, bytes data):
|
||||
length = len(data)
|
||||
@@ -241,7 +241,7 @@ cdef class PubSocket:
|
||||
if errno.errno == errno.EADDRINUSE:
|
||||
raise MultiplePublishersError
|
||||
else:
|
||||
raise MessagingError
|
||||
raise IpcError
|
||||
|
||||
def all_readers_updated(self):
|
||||
return self.socket.all_readers_updated()
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "msgq/messaging/msgq.h"
|
||||
#include "msgq/msgq.h"
|
||||
|
||||
void sigusr2_handler(int signal) {
|
||||
assert(signal == SIGUSR2);
|
||||
@@ -1,5 +1,5 @@
|
||||
#include "catch2/catch.hpp"
|
||||
#include "msgq/messaging/msgq.h"
|
||||
#include "msgq/msgq.h"
|
||||
|
||||
TEST_CASE("ALIGN")
|
||||
{
|
||||
@@ -2,11 +2,10 @@ import os
|
||||
import unittest
|
||||
import multiprocessing
|
||||
import platform
|
||||
import msgq
|
||||
from parameterized import parameterized_class
|
||||
from typing import Optional
|
||||
|
||||
import msgq.messaging as messaging
|
||||
|
||||
WAIT_TIMEOUT = 5
|
||||
|
||||
|
||||
@@ -14,7 +13,7 @@ WAIT_TIMEOUT = 5
|
||||
class TestEvents(unittest.TestCase):
|
||||
|
||||
def test_mutation(self):
|
||||
handle = messaging.fake_event_handle("carState")
|
||||
handle = msgq.fake_event_handle("carState")
|
||||
event = handle.recv_called_event
|
||||
|
||||
self.assertFalse(event.peek())
|
||||
@@ -26,7 +25,7 @@ class TestEvents(unittest.TestCase):
|
||||
del event
|
||||
|
||||
def test_wait(self):
|
||||
handle = messaging.fake_event_handle("carState")
|
||||
handle = msgq.fake_event_handle("carState")
|
||||
event = handle.recv_called_event
|
||||
|
||||
event.set()
|
||||
@@ -37,7 +36,7 @@ class TestEvents(unittest.TestCase):
|
||||
self.fail("event.wait() timed out")
|
||||
|
||||
def test_wait_multiprocess(self):
|
||||
handle = messaging.fake_event_handle("carState")
|
||||
handle = msgq.fake_event_handle("carState")
|
||||
event = handle.recv_called_event
|
||||
|
||||
def set_event_run():
|
||||
@@ -54,7 +53,7 @@ class TestEvents(unittest.TestCase):
|
||||
p.kill()
|
||||
|
||||
def test_wait_zero_timeout(self):
|
||||
handle = messaging.fake_event_handle("carState")
|
||||
handle = msgq.fake_event_handle("carState")
|
||||
event = handle.recv_called_event
|
||||
|
||||
try:
|
||||
@@ -71,18 +70,18 @@ class TestFakeSockets(unittest.TestCase):
|
||||
prefix: Optional[str] = None
|
||||
|
||||
def setUp(self):
|
||||
messaging.toggle_fake_events(True)
|
||||
msgq.toggle_fake_events(True)
|
||||
if self.prefix is not None:
|
||||
messaging.set_fake_prefix(self.prefix)
|
||||
msgq.set_fake_prefix(self.prefix)
|
||||
else:
|
||||
messaging.delete_fake_prefix()
|
||||
msgq.delete_fake_prefix()
|
||||
|
||||
def tearDown(self):
|
||||
messaging.toggle_fake_events(False)
|
||||
messaging.delete_fake_prefix()
|
||||
msgq.toggle_fake_events(False)
|
||||
msgq.delete_fake_prefix()
|
||||
|
||||
def test_event_handle_init(self):
|
||||
handle = messaging.fake_event_handle("controlsState", override=True)
|
||||
handle = msgq.fake_event_handle("controlsState", override=True)
|
||||
|
||||
self.assertFalse(handle.enabled)
|
||||
self.assertGreaterEqual(handle.recv_called_event.fd, 0)
|
||||
@@ -90,9 +89,9 @@ class TestFakeSockets(unittest.TestCase):
|
||||
|
||||
def test_non_managed_socket_state(self):
|
||||
# non managed socket should have zero state
|
||||
_ = messaging.pub_sock("ubloxGnss")
|
||||
_ = msgq.pub_sock("ubloxGnss")
|
||||
|
||||
handle = messaging.fake_event_handle("ubloxGnss", override=False)
|
||||
handle = msgq.fake_event_handle("ubloxGnss", override=False)
|
||||
|
||||
self.assertFalse(handle.enabled)
|
||||
self.assertEqual(handle.recv_called_event.fd, 0)
|
||||
@@ -100,26 +99,26 @@ class TestFakeSockets(unittest.TestCase):
|
||||
|
||||
def test_managed_socket_state(self):
|
||||
# managed socket should not change anything about the state
|
||||
handle = messaging.fake_event_handle("ubloxGnss")
|
||||
handle = msgq.fake_event_handle("ubloxGnss")
|
||||
handle.enabled = True
|
||||
|
||||
expected_enabled = handle.enabled
|
||||
expected_recv_called_fd = handle.recv_called_event.fd
|
||||
expected_recv_ready_fd = handle.recv_ready_event.fd
|
||||
|
||||
_ = messaging.pub_sock("ubloxGnss")
|
||||
_ = msgq.pub_sock("ubloxGnss")
|
||||
|
||||
self.assertEqual(handle.enabled, expected_enabled)
|
||||
self.assertEqual(handle.recv_called_event.fd, expected_recv_called_fd)
|
||||
self.assertEqual(handle.recv_ready_event.fd, expected_recv_ready_fd)
|
||||
|
||||
def test_sockets_enable_disable(self):
|
||||
carState_handle = messaging.fake_event_handle("ubloxGnss", enable=True)
|
||||
carState_handle = msgq.fake_event_handle("ubloxGnss", enable=True)
|
||||
recv_called = carState_handle.recv_called_event
|
||||
recv_ready = carState_handle.recv_ready_event
|
||||
|
||||
pub_sock = messaging.pub_sock("ubloxGnss")
|
||||
sub_sock = messaging.sub_sock("ubloxGnss")
|
||||
pub_sock = msgq.pub_sock("ubloxGnss")
|
||||
sub_sock = msgq.sub_sock("ubloxGnss")
|
||||
|
||||
try:
|
||||
carState_handle.enabled = True
|
||||
@@ -139,8 +138,8 @@ class TestFakeSockets(unittest.TestCase):
|
||||
|
||||
def test_synced_pub_sub(self):
|
||||
def daemon_repub_process_run():
|
||||
pub_sock = messaging.pub_sock("ubloxGnss")
|
||||
sub_sock = messaging.sub_sock("carState")
|
||||
pub_sock = msgq.pub_sock("ubloxGnss")
|
||||
sub_sock = msgq.sub_sock("carState")
|
||||
|
||||
frame = -1
|
||||
while True:
|
||||
@@ -153,15 +152,15 @@ class TestFakeSockets(unittest.TestCase):
|
||||
bts = frame.to_bytes(8, 'little')
|
||||
pub_sock.send(bts)
|
||||
|
||||
carState_handle = messaging.fake_event_handle("carState", enable=True)
|
||||
carState_handle = msgq.fake_event_handle("carState", enable=True)
|
||||
recv_called = carState_handle.recv_called_event
|
||||
recv_ready = carState_handle.recv_ready_event
|
||||
|
||||
p = multiprocessing.Process(target=daemon_repub_process_run)
|
||||
p.start()
|
||||
|
||||
pub_sock = messaging.pub_sock("carState")
|
||||
sub_sock = messaging.sub_sock("ubloxGnss")
|
||||
pub_sock = msgq.pub_sock("carState")
|
||||
sub_sock = msgq.sub_sock("ubloxGnss")
|
||||
|
||||
try:
|
||||
for i in range(10):
|
||||
@@ -5,8 +5,7 @@ import threading
|
||||
import time
|
||||
import string
|
||||
import unittest
|
||||
|
||||
import msgq.messaging as messaging
|
||||
import msgq
|
||||
|
||||
|
||||
def random_sock():
|
||||
@@ -39,8 +38,8 @@ class TestPubSubSockets(unittest.TestCase):
|
||||
|
||||
def test_pub_sub(self):
|
||||
sock = random_sock()
|
||||
pub_sock = messaging.pub_sock(sock)
|
||||
sub_sock = messaging.sub_sock(sock, conflate=False, timeout=None)
|
||||
pub_sock = msgq.pub_sock(sock)
|
||||
sub_sock = msgq.sub_sock(sock, conflate=False, timeout=None)
|
||||
zmq_sleep(3)
|
||||
|
||||
for _ in range(1000):
|
||||
@@ -51,11 +50,11 @@ class TestPubSubSockets(unittest.TestCase):
|
||||
|
||||
def test_conflate(self):
|
||||
sock = random_sock()
|
||||
pub_sock = messaging.pub_sock(sock)
|
||||
pub_sock = msgq.pub_sock(sock)
|
||||
for conflate in [True, False]:
|
||||
for _ in range(10):
|
||||
num_msgs = random.randint(3, 10)
|
||||
sub_sock = messaging.sub_sock(sock, conflate=conflate, timeout=None)
|
||||
sub_sock = msgq.sub_sock(sock, conflate=conflate, timeout=None)
|
||||
zmq_sleep()
|
||||
|
||||
sent_msgs = []
|
||||
@@ -64,7 +63,7 @@ class TestPubSubSockets(unittest.TestCase):
|
||||
pub_sock.send(msg)
|
||||
sent_msgs.append(msg)
|
||||
time.sleep(0.1)
|
||||
recvd_msgs = messaging.drain_sock_raw(sub_sock)
|
||||
recvd_msgs = msgq.drain_sock_raw(sub_sock)
|
||||
if conflate:
|
||||
self.assertEqual(len(recvd_msgs), 1)
|
||||
else:
|
||||
@@ -75,7 +74,7 @@ class TestPubSubSockets(unittest.TestCase):
|
||||
sock = random_sock()
|
||||
for _ in range(10):
|
||||
timeout = random.randrange(200)
|
||||
sub_sock = messaging.sub_sock(sock, timeout=timeout)
|
||||
sub_sock = msgq.sub_sock(sock, timeout=timeout)
|
||||
zmq_sleep()
|
||||
|
||||
start_time = time.monotonic()
|
||||
@@ -1,16 +1,16 @@
|
||||
import unittest
|
||||
import time
|
||||
import msgq.messaging as messaging
|
||||
import msgq
|
||||
import concurrent.futures
|
||||
|
||||
SERVICE_NAME = 'myService'
|
||||
|
||||
def poller():
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
p = messaging.Poller()
|
||||
p = msgq.Poller()
|
||||
|
||||
sub = messaging.SubSocket()
|
||||
sub = msgq.SubSocket()
|
||||
sub.connect(context, SERVICE_NAME)
|
||||
p.registerSocket(sub)
|
||||
|
||||
@@ -22,9 +22,9 @@ def poller():
|
||||
|
||||
class TestPoller(unittest.TestCase):
|
||||
def test_poll_once(self):
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
pub = messaging.PubSocket()
|
||||
pub = msgq.PubSocket()
|
||||
pub.connect(context, SERVICE_NAME)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as e:
|
||||
@@ -44,18 +44,18 @@ class TestPoller(unittest.TestCase):
|
||||
self.assertEqual(result, [b"a"])
|
||||
|
||||
def test_poll_and_create_many_subscribers(self):
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
pub = messaging.PubSocket()
|
||||
pub = msgq.PubSocket()
|
||||
pub.connect(context, SERVICE_NAME)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as e:
|
||||
poll = e.submit(poller)
|
||||
|
||||
time.sleep(0.1) # Slow joiner syndrome
|
||||
c = messaging.Context()
|
||||
c = msgq.Context()
|
||||
for _ in range(10):
|
||||
messaging.SubSocket().connect(c, SERVICE_NAME)
|
||||
msgq.SubSocket().connect(c, SERVICE_NAME)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
@@ -71,13 +71,13 @@ class TestPoller(unittest.TestCase):
|
||||
self.assertEqual(result, [b"a"])
|
||||
|
||||
def test_multiple_publishers_exception(self):
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
with self.assertRaises(messaging.MultiplePublishersError):
|
||||
pub1 = messaging.PubSocket()
|
||||
with self.assertRaises(msgq.MultiplePublishersError):
|
||||
pub1 = msgq.PubSocket()
|
||||
pub1.connect(context, SERVICE_NAME)
|
||||
|
||||
pub2 = messaging.PubSocket()
|
||||
pub2 = msgq.PubSocket()
|
||||
pub2.connect(context, SERVICE_NAME)
|
||||
|
||||
pub1.send(b"a")
|
||||
@@ -87,12 +87,12 @@ class TestPoller(unittest.TestCase):
|
||||
context.term()
|
||||
|
||||
def test_multiple_messages(self):
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
pub = messaging.PubSocket()
|
||||
pub = msgq.PubSocket()
|
||||
pub.connect(context, SERVICE_NAME)
|
||||
|
||||
sub = messaging.SubSocket()
|
||||
sub = msgq.SubSocket()
|
||||
sub.connect(context, SERVICE_NAME)
|
||||
|
||||
time.sleep(0.1) # Slow joiner
|
||||
@@ -119,12 +119,12 @@ class TestPoller(unittest.TestCase):
|
||||
context.term()
|
||||
|
||||
def test_conflate(self):
|
||||
context = messaging.Context()
|
||||
context = msgq.Context()
|
||||
|
||||
pub = messaging.PubSocket()
|
||||
pub = msgq.PubSocket()
|
||||
pub.connect(context, SERVICE_NAME)
|
||||
|
||||
sub = messaging.SubSocket()
|
||||
sub = msgq.SubSocket()
|
||||
sub.connect(context, SERVICE_NAME, conflate=True)
|
||||
|
||||
time.sleep(0.1) # Slow joiner
|
||||
@@ -15,7 +15,7 @@
|
||||
#define getsocket() socket(AF_UNIX, SOCK_SEQPACKET, 0)
|
||||
#endif
|
||||
|
||||
#include "msgq/visionipc/ipc.h"
|
||||
#include "msgq/visionipc/visionipc.h"
|
||||
|
||||
int ipc_connect(const char* socket_path) {
|
||||
int err;
|
||||
@@ -3,6 +3,12 @@
|
||||
#include <cstdint>
|
||||
#include <cstddef>
|
||||
|
||||
|
||||
int ipc_connect(const char* socket_path);
|
||||
int ipc_bind(const char* socket_path);
|
||||
int ipc_sendrecv_with_fds(bool send, int fd, void *buf, size_t buf_size, int* fds, int num_fds,
|
||||
int *out_num_fds);
|
||||
|
||||
constexpr int VISIONIPC_MAX_FDS = 128;
|
||||
|
||||
struct VisionIpcBufExtra {
|
||||
@@ -4,11 +4,11 @@
|
||||
#include <thread>
|
||||
|
||||
#include <unistd.h>
|
||||
#include "msgq/visionipc/ipc.h"
|
||||
#include "msgq/visionipc/visionipc.h"
|
||||
#include "msgq/visionipc/visionipc_client.h"
|
||||
#include "msgq/visionipc/visionipc_server.h"
|
||||
#include "msgq/logger/logger.h"
|
||||
#include "msgq/logger/logger.h"
|
||||
#include "logger/logger.h"
|
||||
#include "logger/logger.h"
|
||||
|
||||
static int connect_to_vipc_server(const std::string &name, bool blocking) {
|
||||
const std::string ipc_path = get_ipc_path(name);
|
||||
@@ -3,7 +3,7 @@
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/visionipc/visionbuf.h"
|
||||
|
||||
|
||||
@@ -8,10 +8,10 @@
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/visionipc/ipc.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/visionipc/visionipc.h"
|
||||
#include "msgq/visionipc/visionipc_server.h"
|
||||
#include "msgq/logger/logger.h"
|
||||
#include "logger/logger.h"
|
||||
|
||||
std::string get_endpoint_name(std::string name, VisionStreamType type){
|
||||
if (messaging_use_zmq()){
|
||||
@@ -5,7 +5,7 @@
|
||||
#include <atomic>
|
||||
#include <map>
|
||||
|
||||
#include "msgq/messaging/messaging.h"
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/visionipc/visionbuf.h"
|
||||
|
||||
std::string get_endpoint_name(std::string name, VisionStreamType type);
|
||||
@@ -1,7 +0,0 @@
|
||||
#pragma once
|
||||
#include <cstddef>
|
||||
|
||||
int ipc_connect(const char* socket_path);
|
||||
int ipc_bind(const char* socket_path);
|
||||
int ipc_sendrecv_with_fds(bool send, int fd, void *buf, size_t buf_size, int* fds, int num_fds,
|
||||
int *out_num_fds);
|
||||
Reference in New Issue
Block a user