From 2c191c1a72ae8119b93b49e1bc12d4a99b751855 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 7 Feb 2026 15:18:55 -0800 Subject: [PATCH] Remove ZMQ support (#674) * remove zmq * less abstraction --- README.md | 2 +- SConscript | 5 +- msgq/__init__.py | 4 +- msgq/conftest.py | 12 +- msgq/impl_msgq.cc | 39 ++---- msgq/impl_msgq.h | 36 +---- msgq/impl_zmq.cc | 179 ------------------------- msgq/impl_zmq.h | 68 ---------- msgq/ipc.cc | 49 +------ msgq/ipc.h | 36 ++--- msgq/ipc.pxd | 2 - msgq/ipc_pyx.pyx | 5 - msgq/tests/test_fake.py | 3 - msgq/tests/test_messaging.py | 13 -- msgq/tests/test_poller.py | 2 +- msgq/visionipc/tests/test_visionipc.py | 7 - msgq/visionipc/visionipc_server.cc | 9 +- msgq/visionipc/visionipc_tests.cc | 15 --- setup.sh | 5 +- 19 files changed, 50 insertions(+), 441 deletions(-) delete mode 100644 msgq/impl_zmq.cc delete mode 100644 msgq/impl_zmq.h diff --git a/README.md b/README.md index dc10530..23d3b9e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # MSGQ: A lock free single producer multi consumer message queue ## What is this library? -MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. MSGQ is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. While MSGQ is the core of this library, this library also allows replacing the MSGQ backend with ZMQ or a spoofed implementation that can be used for deterministic testing. This library also contains visionipc, an IPC system specifically for large contiguous buffers (like images/video). +MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. This library also provides a spoofed implementation that can be used for deterministic testing, and visionipc, an IPC system specifically for large contiguous buffers (like images/video). ## Storage The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains: diff --git a/SConscript b/SConscript index db1663f..2926e79 100644 --- a/SConscript +++ b/SConscript @@ -9,13 +9,12 @@ gen_dir = Dir('gen') msgq_objects = env.SharedObject([ 'msgq/ipc.cc', 'msgq/event.cc', - 'msgq/impl_zmq.cc', 'msgq/impl_msgq.cc', 'msgq/impl_fake.cc', 'msgq/msgq.cc', ]) msgq = env.Library('msgq', msgq_objects) -msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common]) +msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, common]) # Build Vision IPC vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc'] @@ -29,7 +28,7 @@ vipc_objects = env.SharedObject(vipc_sources) visionipc = env.Library('visionipc', vipc_objects) -vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"] +vipc_libs = envCython["LIBS"] + [visionipc, msgq, common] envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.abspath}/visionipc_pyx.pyx', LIBS=vipc_libs) diff --git a/msgq/__init__.py b/msgq/__init__.py index b47b3db..574e100 100644 --- a/msgq/__init__.py +++ b/msgq/__init__.py @@ -1,7 +1,6 @@ # must be built with scons from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ - set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \ - context_is_zmq + set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event from msgq.ipc_pyx import MultiplePublishersError, IpcError from typing import Optional, List, Union @@ -13,7 +12,6 @@ assert set_fake_prefix assert get_fake_prefix assert delete_fake_prefix assert wait_for_one_event -assert context_is_zmq NO_TRAVERSAL_LIMIT = 2**64-1 diff --git a/msgq/conftest.py b/msgq/conftest.py index bfd7792..14a7706 100644 --- a/msgq/conftest.py +++ b/msgq/conftest.py @@ -1,14 +1,6 @@ -import os import pytest import msgq -@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True) -def zmq_mode(request): - if request.param: - os.environ["ZMQ"] = "1" - else: - os.environ.pop("ZMQ", None) +@pytest.fixture(autouse=True) +def msgq_context(): msgq.context = msgq.Context() - assert msgq.context_is_zmq() == request.param - yield request.param - os.environ.pop("ZMQ", None) diff --git a/msgq/impl_msgq.cc b/msgq/impl_msgq.cc index ab99514..8243cee 100644 --- a/msgq/impl_msgq.cc +++ b/msgq/impl_msgq.cc @@ -8,36 +8,30 @@ #include "msgq/impl_msgq.h" -MSGQContext::MSGQContext() { -} - -MSGQContext::~MSGQContext() { -} - -void MSGQMessage::init(size_t sz) { +void Message::init(size_t sz) { size = sz; data = new char[size]; } -void MSGQMessage::init(char * d, size_t sz) { +void Message::init(char * d, size_t sz) { size = sz; data = new char[size]; memcpy(data, d, size); } -void MSGQMessage::takeOwnership(char * d, size_t sz) { +void Message::takeOwnership(char * d, size_t sz) { size = sz; data = d; } -void MSGQMessage::close() { +void Message::close() { if (size > 0){ delete[] data; } size = 0; } -MSGQMessage::~MSGQMessage() { +Message::~Message() { this->close(); } @@ -67,7 +61,7 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a Message * MSGQSubSocket::receive(bool non_blocking){ msgq_msg_t msg; - MSGQMessage *r = NULL; + Message *r = NULL; int rc = msgq_msg_recv(&msg, q); @@ -92,11 +86,11 @@ Message * MSGQSubSocket::receive(bool non_blocking){ } if (rc > 0){ - r = new MSGQMessage; + r = new Message; r->takeOwnership(msg.data, msg.size); } - return (Message*)r; + return r; } void MSGQSubSocket::setTimeout(int t){ @@ -110,14 +104,9 @@ MSGQSubSocket::~MSGQSubSocket(){ } } -int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){ +int PubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){ assert(context); - // TODO - //if (check_endpoint && !service_exists(std::string(endpoint))){ - // std::cout << "Warning, " << std::string(endpoint) << " is not in service list." << std::endl; - //} - q = new msgq_queue_t; size_t size = segment_size > 0 ? segment_size : DEFAULT_SEGMENT_SIZE; int r = msgq_new_queue(q, endpoint.c_str(), size); @@ -130,7 +119,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en return 0; } -int MSGQPubSocket::sendMessage(Message *message){ +int PubSocket::sendMessage(Message *message){ msgq_msg_t msg; msg.data = message->getData(); msg.size = message->getSize(); @@ -138,7 +127,7 @@ int MSGQPubSocket::sendMessage(Message *message){ return msgq_msg_send(&msg, q); } -int MSGQPubSocket::send(char *data, size_t size){ +int PubSocket::send(char *data, size_t size){ msgq_msg_t msg; msg.data = data; msg.size = size; @@ -146,11 +135,11 @@ int MSGQPubSocket::send(char *data, size_t size){ return msgq_msg_send(&msg, q); } -bool MSGQPubSocket::all_readers_updated() { +bool PubSocket::all_readers_updated() { return msgq_all_readers_updated(q); } -MSGQPubSocket::~MSGQPubSocket(){ +PubSocket::~PubSocket(){ if (q != NULL){ msgq_close_queue(q); delete q; @@ -160,7 +149,7 @@ MSGQPubSocket::~MSGQPubSocket(){ void MSGQPoller::registerSocket(SubSocket * socket){ assert(num_polls + 1 < MAX_POLLERS); - polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket(); + polls[num_polls].q = static_cast(socket)->getQueue(); sockets.push_back(socket); num_polls++; diff --git a/msgq/impl_msgq.h b/msgq/impl_msgq.h index c792e21..bd9a9aa 100644 --- a/msgq/impl_msgq.h +++ b/msgq/impl_msgq.h @@ -8,29 +8,6 @@ #define MAX_POLLERS 128 -class MSGQContext : public Context { -private: - void * context = NULL; -public: - MSGQContext(); - void * getRawContext() {return context;} - ~MSGQContext(); -}; - -class MSGQMessage : public Message { -private: - char * data; - size_t size; -public: - void init(size_t size); - void init(char *data, size_t size); - void takeOwnership(char *data, size_t size); - size_t getSize(){return size;} - char * getData(){return data;} - void close(); - ~MSGQMessage(); -}; - class MSGQSubSocket : public SubSocket { private: msgq_queue_t * q = NULL; @@ -38,22 +15,11 @@ private: public: int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0); void setTimeout(int timeout); - void * getRawSocket() {return (void*)q;} + msgq_queue_t * getQueue() {return q;} Message *receive(bool non_blocking=false); ~MSGQSubSocket(); }; -class MSGQPubSocket : public PubSocket { -private: - msgq_queue_t * q = NULL; -public: - int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0); - int sendMessage(Message *message); - int send(char *data, size_t size); - bool all_readers_updated(); - ~MSGQPubSocket(); -}; - class MSGQPoller : public Poller { private: std::vector sockets; diff --git a/msgq/impl_zmq.cc b/msgq/impl_zmq.cc deleted file mode 100644 index e04b4f8..0000000 --- a/msgq/impl_zmq.cc +++ /dev/null @@ -1,179 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "msgq/impl_zmq.h" - -static size_t fnv1a_hash(const std::string &str) { - const size_t fnv_prime = 0x100000001b3; - size_t hash_value = 0xcbf29ce484222325; - for (char c : str) { - hash_value ^= (unsigned char)c; - hash_value *= fnv_prime; - } - return hash_value; -} - -//FIXME: This is a hack to get the port number from the socket name, might have collisions -static int get_port(std::string endpoint) { - size_t hash_value = fnv1a_hash(endpoint); - int start_port = 8023; - int max_port = 65535; - int port = start_port + (hash_value % (max_port - start_port)); - return port; -} - -ZMQContext::ZMQContext() { - context = zmq_ctx_new(); -} - -ZMQContext::~ZMQContext() { - zmq_ctx_term(context); -} - -void ZMQMessage::init(size_t sz) { - size = sz; - data = new char[size]; -} - -void ZMQMessage::init(char * d, size_t sz) { - size = sz; - data = new char[size]; - memcpy(data, d, size); -} - -void ZMQMessage::close() { - if (size > 0){ - delete[] data; - } - size = 0; -} - -ZMQMessage::~ZMQMessage() { - this->close(); -} - - -int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate, bool check_endpoint, size_t segment_size){ - sock = zmq_socket(context->getRawContext(), ZMQ_SUB); - if (sock == NULL){ - return -1; - } - - zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0); - - if (conflate){ - int arg = 1; - zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int)); - } - - int reconnect_ivl = 500; - zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl)); - - - full_endpoint = "tcp://" + address + ":"; - if (check_endpoint){ - full_endpoint += std::to_string(get_port(endpoint)); - } else { - full_endpoint += endpoint; - } - - return zmq_connect(sock, full_endpoint.c_str()); -} - - -Message * ZMQSubSocket::receive(bool non_blocking){ - zmq_msg_t msg; - assert(zmq_msg_init(&msg) == 0); - - int flags = non_blocking ? ZMQ_DONTWAIT : 0; - int rc = zmq_msg_recv(&msg, sock, flags); - Message *r = NULL; - - if (rc >= 0){ - // Make a copy to ensure the data is aligned - r = new ZMQMessage; - r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); - } - - zmq_msg_close(&msg); - return r; -} - -void ZMQSubSocket::setTimeout(int timeout){ - zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int)); -} - -ZMQSubSocket::~ZMQSubSocket(){ - zmq_close(sock); -} - -int ZMQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){ - sock = zmq_socket(context->getRawContext(), ZMQ_PUB); - if (sock == NULL){ - return -1; - } - - full_endpoint = "tcp://*:"; - if (check_endpoint){ - full_endpoint += std::to_string(get_port(endpoint)); - } else { - full_endpoint += endpoint; - } - - // ZMQ pub sockets cannot be shared between processes, so we need to ensure pid stays the same - pid = getpid(); - - return zmq_bind(sock, full_endpoint.c_str()); -} - -int ZMQPubSocket::sendMessage(Message *message) { - assert(pid == getpid()); - return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT); -} - -int ZMQPubSocket::send(char *data, size_t size) { - assert(pid == getpid()); - return zmq_send(sock, data, size, ZMQ_DONTWAIT); -} - -bool ZMQPubSocket::all_readers_updated() { - assert(false); // TODO not implemented - return false; -} - -ZMQPubSocket::~ZMQPubSocket(){ - zmq_close(sock); -} - - -void ZMQPoller::registerSocket(SubSocket * socket){ - assert(num_polls + 1 < MAX_POLLERS); - polls[num_polls].socket = socket->getRawSocket(); - polls[num_polls].events = ZMQ_POLLIN; - - sockets.push_back(socket); - num_polls++; -} - -std::vector ZMQPoller::poll(int timeout){ - std::vector r; - - int rc = zmq_poll(polls, num_polls, timeout); - if (rc < 0){ - return r; - } - - for (size_t i = 0; i < num_polls; i++){ - if (polls[i].revents){ - r.push_back(sockets[i]); - } - } - - return r; -} diff --git a/msgq/impl_zmq.h b/msgq/impl_zmq.h deleted file mode 100644 index 2f84eea..0000000 --- a/msgq/impl_zmq.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "msgq/ipc.h" - -#define MAX_POLLERS 128 - -class ZMQContext : public Context { -private: - void * context = NULL; -public: - ZMQContext(); - void * getRawContext() {return context;} - ~ZMQContext(); -}; - -class ZMQMessage : public Message { -private: - char * data; - size_t size; -public: - void init(size_t size); - void init(char *data, size_t size); - size_t getSize(){return size;} - char * getData(){return data;} - void close(); - ~ZMQMessage(); -}; - -class ZMQSubSocket : public SubSocket { -private: - void * sock; - std::string full_endpoint; -public: - int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0); - void setTimeout(int timeout); - void * getRawSocket() {return sock;} - Message *receive(bool non_blocking=false); - ~ZMQSubSocket(); -}; - -class ZMQPubSocket : public PubSocket { -private: - void * sock; - std::string full_endpoint; - int pid = -1; -public: - int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0); - int sendMessage(Message *message); - int send(char *data, size_t size); - bool all_readers_updated(); - ~ZMQPubSocket(); -}; - -class ZMQPoller : public Poller { -private: - std::vector sockets; - zmq_pollitem_t polls[MAX_POLLERS]; - size_t num_polls = 0; - -public: - void registerSocket(SubSocket *socket); - std::vector poll(int timeout); - ~ZMQPoller(){} -}; diff --git a/msgq/ipc.cc b/msgq/ipc.cc index b5cc20e..ec4a64e 100644 --- a/msgq/ipc.cc +++ b/msgq/ipc.cc @@ -3,46 +3,23 @@ #include #include "msgq/ipc.h" -#include "msgq/impl_zmq.h" #include "msgq/impl_msgq.h" #include "msgq/impl_fake.h" -bool messaging_use_zmq(){ - return std::getenv("ZMQ") != nullptr; -} - bool messaging_use_fake(){ char* fake_enabled = std::getenv("CEREAL_FAKE"); return fake_enabled != NULL; } Context * Context::create(){ - Context * c; - if (messaging_use_zmq()){ - c = new ZMQContext(); - } else { - c = new MSGQContext(); - } - return c; + return new Context(); } SubSocket * SubSocket::create(){ - SubSocket * s; if (messaging_use_fake()) { - if (messaging_use_zmq()) { - s = new FakeSubSocket(); - } else { - s = new FakeSubSocket(); - } - } else { - if (messaging_use_zmq()){ - s = new ZMQSubSocket(); - } else { - s = new MSGQSubSocket(); - } + return new FakeSubSocket(); } - - return s; + return new MSGQSubSocket(); } SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate, bool check_endpoint, size_t segment_size){ @@ -60,14 +37,7 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri } PubSocket * PubSocket::create(){ - PubSocket * s; - if (messaging_use_zmq()){ - s = new ZMQPubSocket(); - } else { - s = new MSGQPubSocket(); - } - - return s; + return new PubSocket(); } PubSocket * PubSocket::create(Context * context, std::string endpoint, bool check_endpoint, size_t segment_size){ @@ -85,17 +55,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint, bool chec } Poller * Poller::create(){ - Poller * p; if (messaging_use_fake()) { - p = new FakePoller(); - } else { - if (messaging_use_zmq()){ - p = new ZMQPoller(); - } else { - p = new MSGQPoller(); - } + return new FakePoller(); } - return p; + return new MSGQPoller(); } Poller * Poller::create(std::vector sockets){ diff --git a/msgq/ipc.h b/msgq/ipc.h index 45330e0..62a13e4 100644 --- a/msgq/ipc.h +++ b/msgq/ipc.h @@ -15,23 +15,24 @@ #define MSG_MULTIPLE_PUBLISHERS 100 -bool messaging_use_zmq(); - class Context { public: - virtual void * getRawContext() = 0; static Context * create(); - virtual ~Context(){} + ~Context(){} }; class Message { +private: + char * data = nullptr; + size_t size = 0; public: - virtual void init(size_t size) = 0; - virtual void init(char * data, size_t size) = 0; - virtual void close() = 0; - virtual size_t getSize() = 0; - virtual char * getData() = 0; - virtual ~Message(){} + void init(size_t size); + void init(char * data, size_t size); + void takeOwnership(char * data, size_t size); + void close(); + size_t getSize(){return size;} + char * getData(){return data;} + ~Message(); }; @@ -40,21 +41,22 @@ public: virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0) = 0; virtual void setTimeout(int timeout) = 0; virtual Message *receive(bool non_blocking=false) = 0; - virtual void * getRawSocket() = 0; static SubSocket * create(); static SubSocket * create(Context * context, std::string endpoint, std::string address="127.0.0.1", bool conflate=false, bool check_endpoint=true, size_t segment_size=0); virtual ~SubSocket(){} }; class PubSocket { +private: + struct msgq_queue_t * q = nullptr; public: - virtual int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0) = 0; - virtual int sendMessage(Message *message) = 0; - virtual int send(char *data, size_t size) = 0; - virtual bool all_readers_updated() = 0; + int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0); + int sendMessage(Message *message); + int send(char *data, size_t size); + bool all_readers_updated(); static PubSocket * create(); static PubSocket * create(Context * context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0); - virtual ~PubSocket(){} + ~PubSocket(); }; class Poller { @@ -64,4 +66,4 @@ public: static Poller * create(); static Poller * create(std::vector sockets); virtual ~Poller(){} -}; \ No newline at end of file +}; diff --git a/msgq/ipc.pxd b/msgq/ipc.pxd index 171ddf6..0760561 100644 --- a/msgq/ipc.pxd +++ b/msgq/ipc.pxd @@ -35,8 +35,6 @@ cdef extern from "msgq/impl_fake.h": cdef extern from "msgq/ipc.h": - bool messaging_use_zmq() - cdef cppclass Context: @staticmethod Context * create() diff --git a/msgq/ipc_pyx.pyx b/msgq/ipc_pyx.pyx index 128f352..909262f 100644 --- a/msgq/ipc_pyx.pyx +++ b/msgq/ipc_pyx.pyx @@ -10,7 +10,6 @@ from libc.string cimport strerror from cython.operator import dereference -from .ipc cimport messaging_use_zmq from .ipc cimport Context as cppContext from .ipc cimport SubSocket as cppSubSocket from .ipc cimport PubSocket as cppPubSocket @@ -19,10 +18,6 @@ from .ipc cimport Message as cppMessage from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle -def context_is_zmq(): - return messaging_use_zmq() - - class IpcError(Exception): def __init__(self, endpoint=None): suffix = f"with {endpoint.decode('utf-8')}" if endpoint else "" diff --git a/msgq/tests/test_fake.py b/msgq/tests/test_fake.py index 4513973..4970417 100644 --- a/msgq/tests/test_fake.py +++ b/msgq/tests/test_fake.py @@ -1,5 +1,4 @@ import pytest -import os import multiprocessing import platform import msgq @@ -69,8 +68,6 @@ class TestFakeSockets: prefix: Optional[str] = None def setup_method(self): - if "ZMQ" in os.environ: - pytest.skip("FakeSockets not supported on ZMQ") msgq.toggle_fake_events(True) if self.prefix is not None: msgq.set_fake_prefix(self.prefix) diff --git a/msgq/tests/test_messaging.py b/msgq/tests/test_messaging.py index 1f43923..d777232 100644 --- a/msgq/tests/test_messaging.py +++ b/msgq/tests/test_messaging.py @@ -1,4 +1,3 @@ -import os import random import time import string @@ -11,22 +10,12 @@ def random_sock(): def random_bytes(length=1000): return bytes([random.randrange(0xFF) for _ in range(length)]) -def zmq_sleep(t=1): - if "ZMQ" in os.environ: - time.sleep(t) - class TestPubSubSockets: - def setup_method(self): - # ZMQ pub socket takes too long to die - # sleep to prevent multiple publishers error between tests - zmq_sleep() - def test_pub_sub(self): sock = random_sock() pub_sock = msgq.pub_sock(sock) sub_sock = msgq.sub_sock(sock, conflate=False, timeout=None) - zmq_sleep(3) for _ in range(1000): msg = random_bytes() @@ -40,7 +29,6 @@ class TestPubSubSockets: for conflate in [True, False]: num_msgs = random.randint(3, 10) sub_sock = msgq.sub_sock(sock, conflate=conflate, timeout=None) - zmq_sleep() sent_msgs = [] for __ in range(num_msgs): @@ -62,7 +50,6 @@ class TestPubSubSockets: sock = random_sock() timeout = random.randrange(200) sub_sock = msgq.sub_sock(sock, timeout=timeout) - zmq_sleep() start_time = time.monotonic() recvd = sub_sock.receive() diff --git a/msgq/tests/test_poller.py b/msgq/tests/test_poller.py index 6ef2c04..7c5553d 100644 --- a/msgq/tests/test_poller.py +++ b/msgq/tests/test_poller.py @@ -111,7 +111,7 @@ class TestPoller: msg_seen = True i += 1 - if r is None and msg_seen: # ZMQ sometimes receives nothing on the first receive + if r is None and msg_seen: break del pub diff --git a/msgq/visionipc/tests/test_visionipc.py b/msgq/visionipc/tests/test_visionipc.py index 374dfcf..fd4d0b2 100644 --- a/msgq/visionipc/tests/test_visionipc.py +++ b/msgq/visionipc/tests/test_visionipc.py @@ -1,14 +1,8 @@ -import os -import time import random from typing import Optional import numpy as np from msgq.visionipc import VisionIpcServer, VisionIpcClient, VisionStreamType -def zmq_sleep(t=1): - if "ZMQ" in os.environ: - time.sleep(t) - class TestVisionIpc: server: VisionIpcServer @@ -26,7 +20,6 @@ class TestVisionIpc: else: self.client = None - zmq_sleep() return self.server, self.client def test_connect(self): diff --git a/msgq/visionipc/visionipc_server.cc b/msgq/visionipc/visionipc_server.cc index 071461b..b444f81 100644 --- a/msgq/visionipc/visionipc_server.cc +++ b/msgq/visionipc/visionipc_server.cc @@ -10,18 +10,12 @@ #include #include -#include "msgq/ipc.h" #include "msgq/visionipc/visionipc.h" #include "msgq/visionipc/visionipc_server.h" #include "msgq/logger/logger.h" std::string get_endpoint_name(std::string name, VisionStreamType type){ - if (messaging_use_zmq()){ - assert(name == "camerad" || name == "navd"); - return std::to_string(9000 + static_cast(type)); - } else { - return "visionipc_" + name + "_" + std::to_string(type); - } + return "visionipc_" + name + "_" + std::to_string(type); } std::string get_ipc_path(const std::string& name) { @@ -71,7 +65,6 @@ void VisionIpcServer::create_buffers_with_sizes(VisionStreamType type, size_t nu cur_idx[type] = 0; // Create msgq publisher for each of the `name` + type combos - // TODO: compute port number directly if using zmq sockets[type] = PubSocket::create(msg_ctx, get_endpoint_name(name, type), false); } diff --git a/msgq/visionipc/visionipc_tests.cc b/msgq/visionipc/visionipc_tests.cc index ddf790d..6f6b9a9 100644 --- a/msgq/visionipc/visionipc_tests.cc +++ b/msgq/visionipc/visionipc_tests.cc @@ -1,18 +1,9 @@ -#include -#include - #include "catch2/catch.hpp" #include "msgq/visionipc/visionipc_server.h" #include "msgq/visionipc/visionipc_client.h" -static void zmq_sleep(int milliseconds=1000){ - if (messaging_use_zmq()){ - std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); - } -} - TEST_CASE("Connecting"){ VisionIpcServer server("camerad"); server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100); @@ -57,8 +48,6 @@ TEST_CASE("Send single buffer"){ VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false); REQUIRE(client.connect()); - zmq_sleep(); - VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD); REQUIRE(buf != nullptr); @@ -86,8 +75,6 @@ TEST_CASE("Test no conflate"){ VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false); REQUIRE(client.connect()); - zmq_sleep(); - VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD); REQUIRE(buf != nullptr); @@ -114,8 +101,6 @@ TEST_CASE("Test conflate"){ VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, true); REQUIRE(client.connect()); - zmq_sleep(); - VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD); REQUIRE(buf != nullptr); diff --git a/setup.sh b/setup.sh index b71da40..f768b4d 100755 --- a/setup.sh +++ b/setup.sh @@ -8,9 +8,9 @@ PLATFORM=$(uname -s) echo "installing dependencies" if [[ $PLATFORM == "Darwin" ]]; then - if ! command -v python3 &>/dev/null || ! pkg-config --exists libzmq 2>/dev/null; then + if ! command -v python3 &>/dev/null; then export HOMEBREW_NO_AUTO_UPDATE=1 - brew install python3 zeromq + brew install python3 fi elif [[ $PLATFORM == "Linux" ]]; then # for AGNOS since we clear the apt lists @@ -20,7 +20,6 @@ elif [[ $PLATFORM == "Linux" ]]; then sudo apt-get install -y --no-install-recommends \ curl ca-certificates \ - libzmq3-dev \ python3-dev python3-pip python3-venv else echo "WARNING: unsupported platform. skipping apt/brew install."