diff --git a/cereal/messaging/bridge.cc b/cereal/messaging/bridge.cc index 69ecd188e..fb92c575c 100644 --- a/cereal/messaging/bridge.cc +++ b/cereal/messaging/bridge.cc @@ -33,7 +33,8 @@ void zmq_to_msgq(const std::vector &endpoints, const std::string &i for (auto endpoint : endpoints) { auto pub_sock = new MSGQPubSocket(); auto sub_sock = new ZMQSubSocket(); - pub_sock->connect(pub_context.get(), endpoint); + size_t queue_size = services.at(endpoint).queue_size; + pub_sock->connect(pub_context.get(), endpoint, true, queue_size); sub_sock->connect(sub_context.get(), endpoint, ip, false); poller->registerSocket(sub_sock); diff --git a/cereal/messaging/msgq_to_zmq.cc b/cereal/messaging/msgq_to_zmq.cc index ce626f2aa..7f8c738d4 100644 --- a/cereal/messaging/msgq_to_zmq.cc +++ b/cereal/messaging/msgq_to_zmq.cc @@ -2,6 +2,7 @@ #include +#include "cereal/services.h" #include "common/util.h" extern ExitHandler do_exit; @@ -108,7 +109,8 @@ void MsgqToZmq::zmqMonitorThread() { if (++pair.connected_clients == 1) { // Create new MSGQ subscriber socket and map to ZMQ publisher pair.sub_sock = std::make_unique(); - pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1"); + size_t queue_size = services.at(pair.endpoint).queue_size; + pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1", false, true, queue_size); sub2pub[pair.sub_sock.get()] = pair.pub_sock.get(); registerSockets(); }