MSGQ stability improvements when opening and closing lots of queues

This commit is contained in:
Willem Melching
2019-11-15 14:37:58 -08:00
parent e147abcc05
commit ef64eb27d7
3 changed files with 45 additions and 7 deletions

View File

@@ -119,6 +119,7 @@ void MSGQSubSocket::setTimeout(int t){
}
MSGQSubSocket::~MSGQSubSocket(){
msgq_close_queue(q);
delete q;
}
@@ -149,6 +150,7 @@ int MSGQPubSocket::send(char *data, size_t size){
}
MSGQPubSocket::~MSGQPubSocket(){
msgq_close_queue(q);
delete q;
}

View File

@@ -77,17 +77,24 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
auto fd = open(full_path, O_RDWR | O_CREAT, 0777);
delete[] full_path;
assert(fd >= 0); // TODO: properly handle exit codes
if (fd < 0)
return -1;
int rc = ftruncate(fd, size + sizeof(msgq_header_t));
assert(rc == 0); // TODO: properly handle exit codes
if (rc < 0)
return -1;
char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
assert(mem != NULL); // TODO: properly handle exit codes
if (mem == NULL)
return -1;
q->mmap_p = mem;
msgq_header_t *header = (msgq_header_t *)mem;
// Setup pointers to header segment
@@ -107,18 +114,34 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
q->endpoint = path;
q->read_conflate = false;
q->read_fifo = -1;
return 0;
}
void msgq_close_queue(msgq_queue_t *q){
if (q->read_fifo >= 0){
close(q->read_fifo);
}
for (uint64_t i = 0; i < NUM_READERS; i++){
if (q->read_fifos[i] >= 0){
close(q->read_fifos[i]);
}
}
if (q->mmap_p != NULL){
munmap(q->mmap_p, q->size + sizeof(msgq_header_t));
}
}
void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl;
std::random_device rd;
std::default_random_engine generator(rd());
std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint64_t>::max());
uint64_t uid = distribution(generator);
uint64_t uid = distribution(rd);
*q->write_uid = uid;
*q->num_readers = 0;
@@ -134,10 +157,12 @@ void msgq_init_publisher(msgq_queue_t * q) {
}
void msgq_init_subscriber(msgq_queue_t * q) {
std::random_device rd;
std::default_random_engine generator(rd());
assert(q != NULL);
assert(q->num_readers != NULL);
std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint64_t>::max());
uint64_t uid = distribution(generator);
uint64_t uid = distribution(rd);
// Get reader id
while (true){
@@ -179,12 +204,16 @@ void msgq_init_subscriber(msgq_queue_t * q) {
std::cout << q->read_fifo_path << std::endl;
int r = mkfifo(q->read_fifo_path.c_str(), 0777);
if (r != 0)
perror("Fifo: ");
assert(r == 0);
q->read_fifo = open(q->read_fifo_path.c_str(), O_RDWR | O_NONBLOCK);
// Fysnc so the fifo shows up in the directory
fsync(open("/dev/shm", O_RDONLY));
auto shm_fd = open("/dev/shm", O_RDONLY);
fsync(shm_fd);
close(shm_fd);
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << std::endl;
msgq_reset_reader(q);
@@ -271,6 +300,11 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Open fifo when not set, or when reader changes
if (q->read_fifos[i] == -1 || q->read_fifos_uid[i] != reader_uid){
// Close old reader fifo
if (q->read_fifos[i] >= 0){
close(q->read_fifos[i]);
}
q->read_fifos_uid[i] = reader_uid;
std::string path = "/dev/shm/fifo-";

View File

@@ -27,6 +27,7 @@ struct msgq_queue_t {
std::atomic<uint64_t> *read_pointers[NUM_READERS];
std::atomic<uint64_t> *read_valids[NUM_READERS];
std::atomic<uint64_t> *read_uids[NUM_READERS];
char * mmap_p;
char * data;
size_t size;
int reader_id;
@@ -62,6 +63,7 @@ int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size);
int msgq_msg_close(msgq_msg_t *msg);
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size);
void msgq_close_queue(msgq_queue_t *q);
void msgq_init_publisher(msgq_queue_t * q);
void msgq_init_subscriber(msgq_queue_t * q);