diff --git a/msgq/ipc.cc b/msgq/ipc.cc index e269d43..b5cc20e 100644 --- a/msgq/ipc.cc +++ b/msgq/ipc.cc @@ -7,21 +7,8 @@ #include "msgq/impl_msgq.h" #include "msgq/impl_fake.h" -#ifdef __APPLE__ -const bool MUST_USE_ZMQ = true; -#else -const bool MUST_USE_ZMQ = false; -#endif - bool messaging_use_zmq(){ - if (std::getenv("ZMQ") || MUST_USE_ZMQ) { - if (std::getenv("OPENPILOT_PREFIX")) { - std::cerr << "OPENPILOT_PREFIX not supported with ZMQ backend\n"; - assert(false); - } - return true; - } - return false; + return std::getenv("ZMQ") != nullptr; } bool messaging_use_fake(){ diff --git a/msgq/msgq.cc b/msgq/msgq.cc index 1928155..92f0ec1 100644 --- a/msgq/msgq.cc +++ b/msgq/msgq.cc @@ -87,12 +87,16 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes std::signal(SIGUSR2, sigusr2_handler); - std::string full_path = "/dev/shm/msgq_"; +#ifdef __APPLE__ + std::string base_path = "/tmp/msgq_"; +#else + std::string base_path = "/dev/shm/msgq_"; +#endif const char* prefix = std::getenv("OPENPILOT_PREFIX"); if (prefix) { - full_path += std::string(prefix) + "/"; + base_path += std::string(prefix) + "/"; } - full_path += path; + std::string full_path = base_path + path; auto fd = open(full_path.c_str(), O_RDWR | O_CREAT, 0664); if (fd < 0) { @@ -168,8 +172,11 @@ void msgq_init_publisher(msgq_queue_t * q) { } static void thread_signal(uint32_t tid) { - #ifndef SYS_tkill - // TODO: this won't work for multithreaded programs + #ifdef __APPLE__ + // macOS doesn't have tkill, rely on polling instead + (void)tid; + #elif !defined(SYS_tkill) + // fallback for systems without tkill kill(tid, SIGUSR2); #else syscall(SYS_tkill, tid, SIGUSR2); diff --git a/msgq/msgq_tests.cc b/msgq/msgq_tests.cc index fa8a883..fc713af 100644 --- a/msgq/msgq_tests.cc +++ b/msgq/msgq_tests.cc @@ -1,6 +1,14 @@ #include "catch2/catch.hpp" #include "msgq/msgq.h" +static void cleanup_test_queue() { +#ifdef __APPLE__ + remove("/tmp/msgq_test_queue"); +#else + remove("/dev/shm/msgq_test_queue"); +#endif +} + TEST_CASE("ALIGN") { REQUIRE(ALIGN(0) == 0); @@ -43,7 +51,7 @@ TEST_CASE("msgq_msg_init_data") TEST_CASE("msgq_init_subscriber") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q; msgq_new_queue(&q, "test_queue", 1024); REQUIRE(*q.num_readers == 0); @@ -63,7 +71,7 @@ TEST_CASE("msgq_init_subscriber") TEST_CASE("msgq_msg_send first message") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q; msgq_new_queue(&q, "test_queue", 1024); msgq_init_publisher(&q); @@ -100,7 +108,7 @@ TEST_CASE("msgq_msg_send first message") TEST_CASE("msgq_msg_send test wraparound") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q; msgq_new_queue(&q, "test_queue", 1024); msgq_init_publisher(&q); @@ -132,7 +140,7 @@ TEST_CASE("msgq_msg_send test wraparound") TEST_CASE("msgq_msg_recv test wraparound") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q_pub, q_sub; msgq_new_queue(&q_pub, "test_queue", 1024); msgq_new_queue(&q_sub, "test_queue", 1024); @@ -178,7 +186,7 @@ TEST_CASE("msgq_msg_recv test wraparound") TEST_CASE("msgq_msg_send test invalidation") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q_pub, q_sub; msgq_new_queue(&q_pub, "test_queue", 1024); msgq_new_queue(&q_sub, "test_queue", 1024); @@ -214,7 +222,7 @@ TEST_CASE("msgq_msg_send test invalidation") TEST_CASE("msgq_init_subscriber init 2 subscribers") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t q1, q2; msgq_new_queue(&q1, "test_queue", 1024); msgq_new_queue(&q2, "test_queue", 1024); @@ -237,7 +245,7 @@ TEST_CASE("msgq_init_subscriber init 2 subscribers") TEST_CASE("Write 1 msg, read 1 msg", "[integration]") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); const size_t msg_size = 128; msgq_queue_t writer, reader; @@ -273,7 +281,7 @@ TEST_CASE("Write 1 msg, read 1 msg", "[integration]") TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); const size_t msg_size = 128; msgq_queue_t writer, reader; @@ -310,7 +318,7 @@ TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]") TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); const size_t msg_size = 128; msgq_queue_t writer, reader; @@ -348,7 +356,7 @@ TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]") TEST_CASE("1 publisher, 1 slow subscriber", "[integration]") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t writer, reader; msgq_new_queue(&writer, "test_queue", 1024); @@ -391,7 +399,7 @@ TEST_CASE("1 publisher, 1 slow subscriber", "[integration]") TEST_CASE("1 publisher, 2 subscribers", "[integration]") { - remove("/dev/shm/msgq_test_queue"); + cleanup_test_queue(); msgq_queue_t writer, reader1, reader2; msgq_new_queue(&writer, "test_queue", 1024); diff --git a/test.sh b/test.sh index e0cfe7c..6be3fa8 100755 --- a/test.sh +++ b/test.sh @@ -16,4 +16,5 @@ scons -j8 pre-commit run --all-files # *** test *** +msgq/test_runner pytest