mirror of
https://github.com/commaai/msgq.git
synced 2026-06-08 05:54:44 +08:00
@@ -1,7 +1,7 @@
|
||||
# MSGQ: A lock free single producer multi consumer message queue
|
||||
|
||||
## What is this library?
|
||||
MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. MSGQ is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. While MSGQ is the core of this library, this library also allows replacing the MSGQ backend with ZMQ or a spoofed implementation that can be used for deterministic testing. This library also contains visionipc, an IPC system specifically for large contiguous buffers (like images/video).
|
||||
MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. This library also provides a spoofed implementation that can be used for deterministic testing, and visionipc, an IPC system specifically for large contiguous buffers (like images/video).
|
||||
|
||||
## Storage
|
||||
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains:
|
||||
|
||||
@@ -9,13 +9,12 @@ gen_dir = Dir('gen')
|
||||
msgq_objects = env.SharedObject([
|
||||
'msgq/ipc.cc',
|
||||
'msgq/event.cc',
|
||||
'msgq/impl_zmq.cc',
|
||||
'msgq/impl_msgq.cc',
|
||||
'msgq/impl_fake.cc',
|
||||
'msgq/msgq.cc',
|
||||
])
|
||||
msgq = env.Library('msgq', msgq_objects)
|
||||
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
|
||||
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, common])
|
||||
|
||||
# Build Vision IPC
|
||||
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc']
|
||||
@@ -29,7 +28,7 @@ vipc_objects = env.SharedObject(vipc_sources)
|
||||
visionipc = env.Library('visionipc', vipc_objects)
|
||||
|
||||
|
||||
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
|
||||
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common]
|
||||
envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.abspath}/visionipc_pyx.pyx',
|
||||
LIBS=vipc_libs)
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# must be built with scons
|
||||
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
|
||||
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \
|
||||
context_is_zmq
|
||||
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
|
||||
from msgq.ipc_pyx import MultiplePublishersError, IpcError
|
||||
|
||||
from typing import Optional, List, Union
|
||||
@@ -13,7 +12,6 @@ assert set_fake_prefix
|
||||
assert get_fake_prefix
|
||||
assert delete_fake_prefix
|
||||
assert wait_for_one_event
|
||||
assert context_is_zmq
|
||||
|
||||
NO_TRAVERSAL_LIMIT = 2**64-1
|
||||
|
||||
|
||||
@@ -1,14 +1,6 @@
|
||||
import os
|
||||
import pytest
|
||||
import msgq
|
||||
|
||||
@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True)
|
||||
def zmq_mode(request):
|
||||
if request.param:
|
||||
os.environ["ZMQ"] = "1"
|
||||
else:
|
||||
os.environ.pop("ZMQ", None)
|
||||
@pytest.fixture(autouse=True)
|
||||
def msgq_context():
|
||||
msgq.context = msgq.Context()
|
||||
assert msgq.context_is_zmq() == request.param
|
||||
yield request.param
|
||||
os.environ.pop("ZMQ", None)
|
||||
|
||||
@@ -8,36 +8,30 @@
|
||||
|
||||
#include "msgq/impl_msgq.h"
|
||||
|
||||
MSGQContext::MSGQContext() {
|
||||
}
|
||||
|
||||
MSGQContext::~MSGQContext() {
|
||||
}
|
||||
|
||||
void MSGQMessage::init(size_t sz) {
|
||||
void Message::init(size_t sz) {
|
||||
size = sz;
|
||||
data = new char[size];
|
||||
}
|
||||
|
||||
void MSGQMessage::init(char * d, size_t sz) {
|
||||
void Message::init(char * d, size_t sz) {
|
||||
size = sz;
|
||||
data = new char[size];
|
||||
memcpy(data, d, size);
|
||||
}
|
||||
|
||||
void MSGQMessage::takeOwnership(char * d, size_t sz) {
|
||||
void Message::takeOwnership(char * d, size_t sz) {
|
||||
size = sz;
|
||||
data = d;
|
||||
}
|
||||
|
||||
void MSGQMessage::close() {
|
||||
void Message::close() {
|
||||
if (size > 0){
|
||||
delete[] data;
|
||||
}
|
||||
size = 0;
|
||||
}
|
||||
|
||||
MSGQMessage::~MSGQMessage() {
|
||||
Message::~Message() {
|
||||
this->close();
|
||||
}
|
||||
|
||||
@@ -67,7 +61,7 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a
|
||||
Message * MSGQSubSocket::receive(bool non_blocking){
|
||||
msgq_msg_t msg;
|
||||
|
||||
MSGQMessage *r = NULL;
|
||||
Message *r = NULL;
|
||||
|
||||
int rc = msgq_msg_recv(&msg, q);
|
||||
|
||||
@@ -92,11 +86,11 @@ Message * MSGQSubSocket::receive(bool non_blocking){
|
||||
}
|
||||
|
||||
if (rc > 0){
|
||||
r = new MSGQMessage;
|
||||
r = new Message;
|
||||
r->takeOwnership(msg.data, msg.size);
|
||||
}
|
||||
|
||||
return (Message*)r;
|
||||
return r;
|
||||
}
|
||||
|
||||
void MSGQSubSocket::setTimeout(int t){
|
||||
@@ -110,14 +104,9 @@ MSGQSubSocket::~MSGQSubSocket(){
|
||||
}
|
||||
}
|
||||
|
||||
int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){
|
||||
int PubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){
|
||||
assert(context);
|
||||
|
||||
// TODO
|
||||
//if (check_endpoint && !service_exists(std::string(endpoint))){
|
||||
// std::cout << "Warning, " << std::string(endpoint) << " is not in service list." << std::endl;
|
||||
//}
|
||||
|
||||
q = new msgq_queue_t;
|
||||
size_t size = segment_size > 0 ? segment_size : DEFAULT_SEGMENT_SIZE;
|
||||
int r = msgq_new_queue(q, endpoint.c_str(), size);
|
||||
@@ -130,7 +119,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MSGQPubSocket::sendMessage(Message *message){
|
||||
int PubSocket::sendMessage(Message *message){
|
||||
msgq_msg_t msg;
|
||||
msg.data = message->getData();
|
||||
msg.size = message->getSize();
|
||||
@@ -138,7 +127,7 @@ int MSGQPubSocket::sendMessage(Message *message){
|
||||
return msgq_msg_send(&msg, q);
|
||||
}
|
||||
|
||||
int MSGQPubSocket::send(char *data, size_t size){
|
||||
int PubSocket::send(char *data, size_t size){
|
||||
msgq_msg_t msg;
|
||||
msg.data = data;
|
||||
msg.size = size;
|
||||
@@ -146,11 +135,11 @@ int MSGQPubSocket::send(char *data, size_t size){
|
||||
return msgq_msg_send(&msg, q);
|
||||
}
|
||||
|
||||
bool MSGQPubSocket::all_readers_updated() {
|
||||
bool PubSocket::all_readers_updated() {
|
||||
return msgq_all_readers_updated(q);
|
||||
}
|
||||
|
||||
MSGQPubSocket::~MSGQPubSocket(){
|
||||
PubSocket::~PubSocket(){
|
||||
if (q != NULL){
|
||||
msgq_close_queue(q);
|
||||
delete q;
|
||||
@@ -160,7 +149,7 @@ MSGQPubSocket::~MSGQPubSocket(){
|
||||
|
||||
void MSGQPoller::registerSocket(SubSocket * socket){
|
||||
assert(num_polls + 1 < MAX_POLLERS);
|
||||
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket();
|
||||
polls[num_polls].q = static_cast<MSGQSubSocket*>(socket)->getQueue();
|
||||
|
||||
sockets.push_back(socket);
|
||||
num_polls++;
|
||||
|
||||
@@ -8,29 +8,6 @@
|
||||
|
||||
#define MAX_POLLERS 128
|
||||
|
||||
class MSGQContext : public Context {
|
||||
private:
|
||||
void * context = NULL;
|
||||
public:
|
||||
MSGQContext();
|
||||
void * getRawContext() {return context;}
|
||||
~MSGQContext();
|
||||
};
|
||||
|
||||
class MSGQMessage : public Message {
|
||||
private:
|
||||
char * data;
|
||||
size_t size;
|
||||
public:
|
||||
void init(size_t size);
|
||||
void init(char *data, size_t size);
|
||||
void takeOwnership(char *data, size_t size);
|
||||
size_t getSize(){return size;}
|
||||
char * getData(){return data;}
|
||||
void close();
|
||||
~MSGQMessage();
|
||||
};
|
||||
|
||||
class MSGQSubSocket : public SubSocket {
|
||||
private:
|
||||
msgq_queue_t * q = NULL;
|
||||
@@ -38,22 +15,11 @@ private:
|
||||
public:
|
||||
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0);
|
||||
void setTimeout(int timeout);
|
||||
void * getRawSocket() {return (void*)q;}
|
||||
msgq_queue_t * getQueue() {return q;}
|
||||
Message *receive(bool non_blocking=false);
|
||||
~MSGQSubSocket();
|
||||
};
|
||||
|
||||
class MSGQPubSocket : public PubSocket {
|
||||
private:
|
||||
msgq_queue_t * q = NULL;
|
||||
public:
|
||||
int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0);
|
||||
int sendMessage(Message *message);
|
||||
int send(char *data, size_t size);
|
||||
bool all_readers_updated();
|
||||
~MSGQPubSocket();
|
||||
};
|
||||
|
||||
class MSGQPoller : public Poller {
|
||||
private:
|
||||
std::vector<SubSocket*> sockets;
|
||||
|
||||
179
msgq/impl_zmq.cc
179
msgq/impl_zmq.cc
@@ -1,179 +0,0 @@
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <cstdlib>
|
||||
#include <cerrno>
|
||||
#include <unistd.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "msgq/impl_zmq.h"
|
||||
|
||||
static size_t fnv1a_hash(const std::string &str) {
|
||||
const size_t fnv_prime = 0x100000001b3;
|
||||
size_t hash_value = 0xcbf29ce484222325;
|
||||
for (char c : str) {
|
||||
hash_value ^= (unsigned char)c;
|
||||
hash_value *= fnv_prime;
|
||||
}
|
||||
return hash_value;
|
||||
}
|
||||
|
||||
//FIXME: This is a hack to get the port number from the socket name, might have collisions
|
||||
static int get_port(std::string endpoint) {
|
||||
size_t hash_value = fnv1a_hash(endpoint);
|
||||
int start_port = 8023;
|
||||
int max_port = 65535;
|
||||
int port = start_port + (hash_value % (max_port - start_port));
|
||||
return port;
|
||||
}
|
||||
|
||||
ZMQContext::ZMQContext() {
|
||||
context = zmq_ctx_new();
|
||||
}
|
||||
|
||||
ZMQContext::~ZMQContext() {
|
||||
zmq_ctx_term(context);
|
||||
}
|
||||
|
||||
void ZMQMessage::init(size_t sz) {
|
||||
size = sz;
|
||||
data = new char[size];
|
||||
}
|
||||
|
||||
void ZMQMessage::init(char * d, size_t sz) {
|
||||
size = sz;
|
||||
data = new char[size];
|
||||
memcpy(data, d, size);
|
||||
}
|
||||
|
||||
void ZMQMessage::close() {
|
||||
if (size > 0){
|
||||
delete[] data;
|
||||
}
|
||||
size = 0;
|
||||
}
|
||||
|
||||
ZMQMessage::~ZMQMessage() {
|
||||
this->close();
|
||||
}
|
||||
|
||||
|
||||
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate, bool check_endpoint, size_t segment_size){
|
||||
sock = zmq_socket(context->getRawContext(), ZMQ_SUB);
|
||||
if (sock == NULL){
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
|
||||
|
||||
if (conflate){
|
||||
int arg = 1;
|
||||
zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int));
|
||||
}
|
||||
|
||||
int reconnect_ivl = 500;
|
||||
zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl));
|
||||
|
||||
|
||||
full_endpoint = "tcp://" + address + ":";
|
||||
if (check_endpoint){
|
||||
full_endpoint += std::to_string(get_port(endpoint));
|
||||
} else {
|
||||
full_endpoint += endpoint;
|
||||
}
|
||||
|
||||
return zmq_connect(sock, full_endpoint.c_str());
|
||||
}
|
||||
|
||||
|
||||
Message * ZMQSubSocket::receive(bool non_blocking){
|
||||
zmq_msg_t msg;
|
||||
assert(zmq_msg_init(&msg) == 0);
|
||||
|
||||
int flags = non_blocking ? ZMQ_DONTWAIT : 0;
|
||||
int rc = zmq_msg_recv(&msg, sock, flags);
|
||||
Message *r = NULL;
|
||||
|
||||
if (rc >= 0){
|
||||
// Make a copy to ensure the data is aligned
|
||||
r = new ZMQMessage;
|
||||
r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
|
||||
}
|
||||
|
||||
zmq_msg_close(&msg);
|
||||
return r;
|
||||
}
|
||||
|
||||
void ZMQSubSocket::setTimeout(int timeout){
|
||||
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
}
|
||||
|
||||
ZMQSubSocket::~ZMQSubSocket(){
|
||||
zmq_close(sock);
|
||||
}
|
||||
|
||||
int ZMQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){
|
||||
sock = zmq_socket(context->getRawContext(), ZMQ_PUB);
|
||||
if (sock == NULL){
|
||||
return -1;
|
||||
}
|
||||
|
||||
full_endpoint = "tcp://*:";
|
||||
if (check_endpoint){
|
||||
full_endpoint += std::to_string(get_port(endpoint));
|
||||
} else {
|
||||
full_endpoint += endpoint;
|
||||
}
|
||||
|
||||
// ZMQ pub sockets cannot be shared between processes, so we need to ensure pid stays the same
|
||||
pid = getpid();
|
||||
|
||||
return zmq_bind(sock, full_endpoint.c_str());
|
||||
}
|
||||
|
||||
int ZMQPubSocket::sendMessage(Message *message) {
|
||||
assert(pid == getpid());
|
||||
return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT);
|
||||
}
|
||||
|
||||
int ZMQPubSocket::send(char *data, size_t size) {
|
||||
assert(pid == getpid());
|
||||
return zmq_send(sock, data, size, ZMQ_DONTWAIT);
|
||||
}
|
||||
|
||||
bool ZMQPubSocket::all_readers_updated() {
|
||||
assert(false); // TODO not implemented
|
||||
return false;
|
||||
}
|
||||
|
||||
ZMQPubSocket::~ZMQPubSocket(){
|
||||
zmq_close(sock);
|
||||
}
|
||||
|
||||
|
||||
void ZMQPoller::registerSocket(SubSocket * socket){
|
||||
assert(num_polls + 1 < MAX_POLLERS);
|
||||
polls[num_polls].socket = socket->getRawSocket();
|
||||
polls[num_polls].events = ZMQ_POLLIN;
|
||||
|
||||
sockets.push_back(socket);
|
||||
num_polls++;
|
||||
}
|
||||
|
||||
std::vector<SubSocket*> ZMQPoller::poll(int timeout){
|
||||
std::vector<SubSocket*> r;
|
||||
|
||||
int rc = zmq_poll(polls, num_polls, timeout);
|
||||
if (rc < 0){
|
||||
return r;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_polls; i++){
|
||||
if (polls[i].revents){
|
||||
r.push_back(sockets[i]);
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
@@ -1,68 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <zmq.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "msgq/ipc.h"
|
||||
|
||||
#define MAX_POLLERS 128
|
||||
|
||||
class ZMQContext : public Context {
|
||||
private:
|
||||
void * context = NULL;
|
||||
public:
|
||||
ZMQContext();
|
||||
void * getRawContext() {return context;}
|
||||
~ZMQContext();
|
||||
};
|
||||
|
||||
class ZMQMessage : public Message {
|
||||
private:
|
||||
char * data;
|
||||
size_t size;
|
||||
public:
|
||||
void init(size_t size);
|
||||
void init(char *data, size_t size);
|
||||
size_t getSize(){return size;}
|
||||
char * getData(){return data;}
|
||||
void close();
|
||||
~ZMQMessage();
|
||||
};
|
||||
|
||||
class ZMQSubSocket : public SubSocket {
|
||||
private:
|
||||
void * sock;
|
||||
std::string full_endpoint;
|
||||
public:
|
||||
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0);
|
||||
void setTimeout(int timeout);
|
||||
void * getRawSocket() {return sock;}
|
||||
Message *receive(bool non_blocking=false);
|
||||
~ZMQSubSocket();
|
||||
};
|
||||
|
||||
class ZMQPubSocket : public PubSocket {
|
||||
private:
|
||||
void * sock;
|
||||
std::string full_endpoint;
|
||||
int pid = -1;
|
||||
public:
|
||||
int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0);
|
||||
int sendMessage(Message *message);
|
||||
int send(char *data, size_t size);
|
||||
bool all_readers_updated();
|
||||
~ZMQPubSocket();
|
||||
};
|
||||
|
||||
class ZMQPoller : public Poller {
|
||||
private:
|
||||
std::vector<SubSocket*> sockets;
|
||||
zmq_pollitem_t polls[MAX_POLLERS];
|
||||
size_t num_polls = 0;
|
||||
|
||||
public:
|
||||
void registerSocket(SubSocket *socket);
|
||||
std::vector<SubSocket*> poll(int timeout);
|
||||
~ZMQPoller(){}
|
||||
};
|
||||
49
msgq/ipc.cc
49
msgq/ipc.cc
@@ -3,46 +3,23 @@
|
||||
#include <string>
|
||||
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/impl_zmq.h"
|
||||
#include "msgq/impl_msgq.h"
|
||||
#include "msgq/impl_fake.h"
|
||||
|
||||
bool messaging_use_zmq(){
|
||||
return std::getenv("ZMQ") != nullptr;
|
||||
}
|
||||
|
||||
bool messaging_use_fake(){
|
||||
char* fake_enabled = std::getenv("CEREAL_FAKE");
|
||||
return fake_enabled != NULL;
|
||||
}
|
||||
|
||||
Context * Context::create(){
|
||||
Context * c;
|
||||
if (messaging_use_zmq()){
|
||||
c = new ZMQContext();
|
||||
} else {
|
||||
c = new MSGQContext();
|
||||
}
|
||||
return c;
|
||||
return new Context();
|
||||
}
|
||||
|
||||
SubSocket * SubSocket::create(){
|
||||
SubSocket * s;
|
||||
if (messaging_use_fake()) {
|
||||
if (messaging_use_zmq()) {
|
||||
s = new FakeSubSocket<ZMQSubSocket>();
|
||||
} else {
|
||||
s = new FakeSubSocket<MSGQSubSocket>();
|
||||
return new FakeSubSocket<MSGQSubSocket>();
|
||||
}
|
||||
} else {
|
||||
if (messaging_use_zmq()){
|
||||
s = new ZMQSubSocket();
|
||||
} else {
|
||||
s = new MSGQSubSocket();
|
||||
}
|
||||
}
|
||||
|
||||
return s;
|
||||
return new MSGQSubSocket();
|
||||
}
|
||||
|
||||
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate, bool check_endpoint, size_t segment_size){
|
||||
@@ -60,14 +37,7 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri
|
||||
}
|
||||
|
||||
PubSocket * PubSocket::create(){
|
||||
PubSocket * s;
|
||||
if (messaging_use_zmq()){
|
||||
s = new ZMQPubSocket();
|
||||
} else {
|
||||
s = new MSGQPubSocket();
|
||||
}
|
||||
|
||||
return s;
|
||||
return new PubSocket();
|
||||
}
|
||||
|
||||
PubSocket * PubSocket::create(Context * context, std::string endpoint, bool check_endpoint, size_t segment_size){
|
||||
@@ -85,17 +55,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint, bool chec
|
||||
}
|
||||
|
||||
Poller * Poller::create(){
|
||||
Poller * p;
|
||||
if (messaging_use_fake()) {
|
||||
p = new FakePoller();
|
||||
} else {
|
||||
if (messaging_use_zmq()){
|
||||
p = new ZMQPoller();
|
||||
} else {
|
||||
p = new MSGQPoller();
|
||||
return new FakePoller();
|
||||
}
|
||||
}
|
||||
return p;
|
||||
return new MSGQPoller();
|
||||
}
|
||||
|
||||
Poller * Poller::create(std::vector<SubSocket*> sockets){
|
||||
|
||||
34
msgq/ipc.h
34
msgq/ipc.h
@@ -15,23 +15,24 @@
|
||||
|
||||
#define MSG_MULTIPLE_PUBLISHERS 100
|
||||
|
||||
bool messaging_use_zmq();
|
||||
|
||||
class Context {
|
||||
public:
|
||||
virtual void * getRawContext() = 0;
|
||||
static Context * create();
|
||||
virtual ~Context(){}
|
||||
~Context(){}
|
||||
};
|
||||
|
||||
class Message {
|
||||
private:
|
||||
char * data = nullptr;
|
||||
size_t size = 0;
|
||||
public:
|
||||
virtual void init(size_t size) = 0;
|
||||
virtual void init(char * data, size_t size) = 0;
|
||||
virtual void close() = 0;
|
||||
virtual size_t getSize() = 0;
|
||||
virtual char * getData() = 0;
|
||||
virtual ~Message(){}
|
||||
void init(size_t size);
|
||||
void init(char * data, size_t size);
|
||||
void takeOwnership(char * data, size_t size);
|
||||
void close();
|
||||
size_t getSize(){return size;}
|
||||
char * getData(){return data;}
|
||||
~Message();
|
||||
};
|
||||
|
||||
|
||||
@@ -40,21 +41,22 @@ public:
|
||||
virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0) = 0;
|
||||
virtual void setTimeout(int timeout) = 0;
|
||||
virtual Message *receive(bool non_blocking=false) = 0;
|
||||
virtual void * getRawSocket() = 0;
|
||||
static SubSocket * create();
|
||||
static SubSocket * create(Context * context, std::string endpoint, std::string address="127.0.0.1", bool conflate=false, bool check_endpoint=true, size_t segment_size=0);
|
||||
virtual ~SubSocket(){}
|
||||
};
|
||||
|
||||
class PubSocket {
|
||||
private:
|
||||
struct msgq_queue_t * q = nullptr;
|
||||
public:
|
||||
virtual int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0) = 0;
|
||||
virtual int sendMessage(Message *message) = 0;
|
||||
virtual int send(char *data, size_t size) = 0;
|
||||
virtual bool all_readers_updated() = 0;
|
||||
int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0);
|
||||
int sendMessage(Message *message);
|
||||
int send(char *data, size_t size);
|
||||
bool all_readers_updated();
|
||||
static PubSocket * create();
|
||||
static PubSocket * create(Context * context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0);
|
||||
virtual ~PubSocket(){}
|
||||
~PubSocket();
|
||||
};
|
||||
|
||||
class Poller {
|
||||
|
||||
@@ -35,8 +35,6 @@ cdef extern from "msgq/impl_fake.h":
|
||||
|
||||
|
||||
cdef extern from "msgq/ipc.h":
|
||||
bool messaging_use_zmq()
|
||||
|
||||
cdef cppclass Context:
|
||||
@staticmethod
|
||||
Context * create()
|
||||
|
||||
@@ -10,7 +10,6 @@ from libc.string cimport strerror
|
||||
from cython.operator import dereference
|
||||
|
||||
|
||||
from .ipc cimport messaging_use_zmq
|
||||
from .ipc cimport Context as cppContext
|
||||
from .ipc cimport SubSocket as cppSubSocket
|
||||
from .ipc cimport PubSocket as cppPubSocket
|
||||
@@ -19,10 +18,6 @@ from .ipc cimport Message as cppMessage
|
||||
from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle
|
||||
|
||||
|
||||
def context_is_zmq():
|
||||
return messaging_use_zmq()
|
||||
|
||||
|
||||
class IpcError(Exception):
|
||||
def __init__(self, endpoint=None):
|
||||
suffix = f"with {endpoint.decode('utf-8')}" if endpoint else ""
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import pytest
|
||||
import os
|
||||
import multiprocessing
|
||||
import platform
|
||||
import msgq
|
||||
@@ -69,8 +68,6 @@ class TestFakeSockets:
|
||||
prefix: Optional[str] = None
|
||||
|
||||
def setup_method(self):
|
||||
if "ZMQ" in os.environ:
|
||||
pytest.skip("FakeSockets not supported on ZMQ")
|
||||
msgq.toggle_fake_events(True)
|
||||
if self.prefix is not None:
|
||||
msgq.set_fake_prefix(self.prefix)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import string
|
||||
@@ -11,22 +10,12 @@ def random_sock():
|
||||
def random_bytes(length=1000):
|
||||
return bytes([random.randrange(0xFF) for _ in range(length)])
|
||||
|
||||
def zmq_sleep(t=1):
|
||||
if "ZMQ" in os.environ:
|
||||
time.sleep(t)
|
||||
|
||||
class TestPubSubSockets:
|
||||
|
||||
def setup_method(self):
|
||||
# ZMQ pub socket takes too long to die
|
||||
# sleep to prevent multiple publishers error between tests
|
||||
zmq_sleep()
|
||||
|
||||
def test_pub_sub(self):
|
||||
sock = random_sock()
|
||||
pub_sock = msgq.pub_sock(sock)
|
||||
sub_sock = msgq.sub_sock(sock, conflate=False, timeout=None)
|
||||
zmq_sleep(3)
|
||||
|
||||
for _ in range(1000):
|
||||
msg = random_bytes()
|
||||
@@ -40,7 +29,6 @@ class TestPubSubSockets:
|
||||
for conflate in [True, False]:
|
||||
num_msgs = random.randint(3, 10)
|
||||
sub_sock = msgq.sub_sock(sock, conflate=conflate, timeout=None)
|
||||
zmq_sleep()
|
||||
|
||||
sent_msgs = []
|
||||
for __ in range(num_msgs):
|
||||
@@ -62,7 +50,6 @@ class TestPubSubSockets:
|
||||
sock = random_sock()
|
||||
timeout = random.randrange(200)
|
||||
sub_sock = msgq.sub_sock(sock, timeout=timeout)
|
||||
zmq_sleep()
|
||||
|
||||
start_time = time.monotonic()
|
||||
recvd = sub_sock.receive()
|
||||
|
||||
@@ -111,7 +111,7 @@ class TestPoller:
|
||||
msg_seen = True
|
||||
i += 1
|
||||
|
||||
if r is None and msg_seen: # ZMQ sometimes receives nothing on the first receive
|
||||
if r is None and msg_seen:
|
||||
break
|
||||
|
||||
del pub
|
||||
|
||||
@@ -1,14 +1,8 @@
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
from typing import Optional
|
||||
import numpy as np
|
||||
from msgq.visionipc import VisionIpcServer, VisionIpcClient, VisionStreamType
|
||||
|
||||
def zmq_sleep(t=1):
|
||||
if "ZMQ" in os.environ:
|
||||
time.sleep(t)
|
||||
|
||||
|
||||
class TestVisionIpc:
|
||||
server: VisionIpcServer
|
||||
@@ -26,7 +20,6 @@ class TestVisionIpc:
|
||||
else:
|
||||
self.client = None
|
||||
|
||||
zmq_sleep()
|
||||
return self.server, self.client
|
||||
|
||||
def test_connect(self):
|
||||
|
||||
@@ -10,19 +10,13 @@
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "msgq/ipc.h"
|
||||
#include "msgq/visionipc/visionipc.h"
|
||||
#include "msgq/visionipc/visionipc_server.h"
|
||||
#include "msgq/logger/logger.h"
|
||||
|
||||
std::string get_endpoint_name(std::string name, VisionStreamType type){
|
||||
if (messaging_use_zmq()){
|
||||
assert(name == "camerad" || name == "navd");
|
||||
return std::to_string(9000 + static_cast<int>(type));
|
||||
} else {
|
||||
return "visionipc_" + name + "_" + std::to_string(type);
|
||||
}
|
||||
}
|
||||
|
||||
std::string get_ipc_path(const std::string& name) {
|
||||
std::string path = "/tmp/";
|
||||
@@ -71,7 +65,6 @@ void VisionIpcServer::create_buffers_with_sizes(VisionStreamType type, size_t nu
|
||||
cur_idx[type] = 0;
|
||||
|
||||
// Create msgq publisher for each of the `name` + type combos
|
||||
// TODO: compute port number directly if using zmq
|
||||
sockets[type] = PubSocket::create(msg_ctx, get_endpoint_name(name, type), false);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,9 @@
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "catch2/catch.hpp"
|
||||
|
||||
#include "msgq/visionipc/visionipc_server.h"
|
||||
#include "msgq/visionipc/visionipc_client.h"
|
||||
|
||||
|
||||
static void zmq_sleep(int milliseconds=1000){
|
||||
if (messaging_use_zmq()){
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Connecting"){
|
||||
VisionIpcServer server("camerad");
|
||||
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
|
||||
@@ -57,8 +48,6 @@ TEST_CASE("Send single buffer"){
|
||||
|
||||
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
|
||||
REQUIRE(client.connect());
|
||||
zmq_sleep();
|
||||
|
||||
VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD);
|
||||
REQUIRE(buf != nullptr);
|
||||
|
||||
@@ -86,8 +75,6 @@ TEST_CASE("Test no conflate"){
|
||||
|
||||
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
|
||||
REQUIRE(client.connect());
|
||||
zmq_sleep();
|
||||
|
||||
VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD);
|
||||
REQUIRE(buf != nullptr);
|
||||
|
||||
@@ -114,8 +101,6 @@ TEST_CASE("Test conflate"){
|
||||
|
||||
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, true);
|
||||
REQUIRE(client.connect());
|
||||
zmq_sleep();
|
||||
|
||||
VisionBuf * buf = server.get_buffer(VISION_STREAM_ROAD);
|
||||
REQUIRE(buf != nullptr);
|
||||
|
||||
|
||||
5
setup.sh
5
setup.sh
@@ -8,9 +8,9 @@ PLATFORM=$(uname -s)
|
||||
|
||||
echo "installing dependencies"
|
||||
if [[ $PLATFORM == "Darwin" ]]; then
|
||||
if ! command -v python3 &>/dev/null || ! pkg-config --exists libzmq 2>/dev/null; then
|
||||
if ! command -v python3 &>/dev/null; then
|
||||
export HOMEBREW_NO_AUTO_UPDATE=1
|
||||
brew install python3 zeromq
|
||||
brew install python3
|
||||
fi
|
||||
elif [[ $PLATFORM == "Linux" ]]; then
|
||||
# for AGNOS since we clear the apt lists
|
||||
@@ -20,7 +20,6 @@ elif [[ $PLATFORM == "Linux" ]]; then
|
||||
|
||||
sudo apt-get install -y --no-install-recommends \
|
||||
curl ca-certificates \
|
||||
libzmq3-dev \
|
||||
python3-dev python3-pip python3-venv
|
||||
else
|
||||
echo "WARNING: unsupported platform. skipping apt/brew install."
|
||||
|
||||
Reference in New Issue
Block a user