msgq: add (janky) macOS support (#667)

* start mac MSGQ support

* lil more
This commit is contained in:
Adeeb Shihadeh
2025-12-27 16:38:49 -08:00
committed by GitHub
parent f3f440102e
commit 349bf449b7
4 changed files with 33 additions and 30 deletions

View File

@@ -7,21 +7,8 @@
#include "msgq/impl_msgq.h"
#include "msgq/impl_fake.h"
#ifdef __APPLE__
const bool MUST_USE_ZMQ = true;
#else
const bool MUST_USE_ZMQ = false;
#endif
bool messaging_use_zmq(){
if (std::getenv("ZMQ") || MUST_USE_ZMQ) {
if (std::getenv("OPENPILOT_PREFIX")) {
std::cerr << "OPENPILOT_PREFIX not supported with ZMQ backend\n";
assert(false);
}
return true;
}
return false;
return std::getenv("ZMQ") != nullptr;
}
bool messaging_use_fake(){

View File

@@ -87,12 +87,16 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);
std::string full_path = "/dev/shm/msgq_";
#ifdef __APPLE__
std::string base_path = "/tmp/msgq_";
#else
std::string base_path = "/dev/shm/msgq_";
#endif
const char* prefix = std::getenv("OPENPILOT_PREFIX");
if (prefix) {
full_path += std::string(prefix) + "/";
base_path += std::string(prefix) + "/";
}
full_path += path;
std::string full_path = base_path + path;
auto fd = open(full_path.c_str(), O_RDWR | O_CREAT, 0664);
if (fd < 0) {
@@ -168,8 +172,11 @@ void msgq_init_publisher(msgq_queue_t * q) {
}
static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
#ifdef __APPLE__
// macOS doesn't have tkill, rely on polling instead
(void)tid;
#elif !defined(SYS_tkill)
// fallback for systems without tkill
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);

View File

@@ -1,6 +1,14 @@
#include "catch2/catch.hpp"
#include "msgq/msgq.h"
static void cleanup_test_queue() {
#ifdef __APPLE__
remove("/tmp/msgq_test_queue");
#else
remove("/dev/shm/msgq_test_queue");
#endif
}
TEST_CASE("ALIGN")
{
REQUIRE(ALIGN(0) == 0);
@@ -43,7 +51,7 @@ TEST_CASE("msgq_msg_init_data")
TEST_CASE("msgq_init_subscriber")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
REQUIRE(*q.num_readers == 0);
@@ -63,7 +71,7 @@ TEST_CASE("msgq_init_subscriber")
TEST_CASE("msgq_msg_send first message")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
msgq_init_publisher(&q);
@@ -100,7 +108,7 @@ TEST_CASE("msgq_msg_send first message")
TEST_CASE("msgq_msg_send test wraparound")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
msgq_init_publisher(&q);
@@ -132,7 +140,7 @@ TEST_CASE("msgq_msg_send test wraparound")
TEST_CASE("msgq_msg_recv test wraparound")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q_pub, q_sub;
msgq_new_queue(&q_pub, "test_queue", 1024);
msgq_new_queue(&q_sub, "test_queue", 1024);
@@ -178,7 +186,7 @@ TEST_CASE("msgq_msg_recv test wraparound")
TEST_CASE("msgq_msg_send test invalidation")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q_pub, q_sub;
msgq_new_queue(&q_pub, "test_queue", 1024);
msgq_new_queue(&q_sub, "test_queue", 1024);
@@ -214,7 +222,7 @@ TEST_CASE("msgq_msg_send test invalidation")
TEST_CASE("msgq_init_subscriber init 2 subscribers")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t q1, q2;
msgq_new_queue(&q1, "test_queue", 1024);
msgq_new_queue(&q2, "test_queue", 1024);
@@ -237,7 +245,7 @@ TEST_CASE("msgq_init_subscriber init 2 subscribers")
TEST_CASE("Write 1 msg, read 1 msg", "[integration]")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
const size_t msg_size = 128;
msgq_queue_t writer, reader;
@@ -273,7 +281,7 @@ TEST_CASE("Write 1 msg, read 1 msg", "[integration]")
TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
const size_t msg_size = 128;
msgq_queue_t writer, reader;
@@ -310,7 +318,7 @@ TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]")
TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
const size_t msg_size = 128;
msgq_queue_t writer, reader;
@@ -348,7 +356,7 @@ TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]")
TEST_CASE("1 publisher, 1 slow subscriber", "[integration]")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t writer, reader;
msgq_new_queue(&writer, "test_queue", 1024);
@@ -391,7 +399,7 @@ TEST_CASE("1 publisher, 1 slow subscriber", "[integration]")
TEST_CASE("1 publisher, 2 subscribers", "[integration]")
{
remove("/dev/shm/msgq_test_queue");
cleanup_test_queue();
msgq_queue_t writer, reader1, reader2;
msgq_new_queue(&writer, "test_queue", 1024);

View File

@@ -16,4 +16,5 @@ scons -j8
pre-commit run --all-files
# *** test ***
msgq/test_runner
pytest