diff --git a/.gitignore b/.gitignore index 9e2fec8..452eb98 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ services.h .sconsign.dblite libcereal_shared.* .mypy_cache/ +messaging/messaging_pyx.html diff --git a/messaging/messaging_pyx.pyx b/messaging/messaging_pyx.pyx index c2aea0d..f3b9cd0 100644 --- a/messaging/messaging_pyx.pyx +++ b/messaging/messaging_pyx.pyx @@ -170,14 +170,19 @@ cdef class SubMaster: vector[string] services vector[string] ignore_alive + cppmap[string, bool] _alive + cppmap[string, bool] _valid + cppmap[string, float] _freq + cppmap[string, float] _rcv_time + cdef readonly: int frame dict updated - dict alive - dict valid - dict rcv_time + #dict alive + #dict valid + #dict rcv_time dict rcv_frame - dict freq + #dict freq dict logMonoTime dict sock dict data @@ -191,30 +196,35 @@ cdef class SubMaster: self.frame = -1 self.updated = {s: False for s in services} - self.rcv_time = {s: 0. for s in services} + #self.rcv_time = {s: 0. for s in services} self.rcv_frame = {s: 0 for s in services} - self.alive = {s: False for s in services} + #self.alive = {s: False for s in services} self.sock = {} - self.freq = {} + #self.freq = {} self.data = {} self.logMonoTime = {} - self.valid = {} + #self.valid = {} + + for s in self.services: + py_str = s.decode('utf8') + + self._alive[s] = False + self._valid[s] = True + self._freq[s] = service_list[py_str].frequency + self._rcv_time[s] = 0. - for s in services: sock = SubSocket() sock.connect(context, s, addr, True) self.poller.registerSocket(sock.socket) - self.sock[s] = sock - self.freq[s] = service_list[s].frequency + self.sock[py_str] = sock data = log.Event.new_message() try: - data.init(s) + data.init(py_str) except capnp.lib.capnp.KjException: - data.init(s, 0) # lists - self.data[s] = getattr(data, s) - self.logMonoTime[s] = 0 - self.valid[s] = True + data.init(py_str, 0) # lists + self.data[py_str] = getattr(data, py_str) + self.logMonoTime[py_str] = 0 def __dealloc__(self): del self.poller @@ -222,48 +232,70 @@ cdef class SubMaster: def __getitem__(self, s): return self.data[s] - def update(self, int timeout=1000): + @property + def alive(self): + return dict(self._alive) + + @property + def valid(self): + return dict(self._valid) + + @property + def rcv_time(self): + return dict(self._rcv_time) + + cpdef update(self, int timeout=1000): cdef vector[string] msgs = self._update(timeout) self.update_msgs(sec_since_boot(), msgs) cdef _update(self, int timeout): cdef vector[string] msgs - with nogil: - result = self.poller.poll(timeout) - for s in result: - msg = s.receive(True) - if msg != NULL: - msgs.push_back(string(msg.getData(), msg.getSize())) + result = self.poller.poll(timeout) + for s in result: + msg = s.receive(True) + if msg != NULL: + msgs.push_back(string(msg.getData(), msg.getSize())) return msgs cdef update_msgs(self, float cur_time, vector[string] msgs): self.frame += 1 self.updated = dict.fromkeys(self.updated, False) + for msg in msgs: msg = log.Event.from_bytes(msg) s = msg.which() self.updated[s] = True - self.rcv_time[s] = cur_time self.rcv_frame[s] = self.frame self.data[s] = getattr(msg, s) self.logMonoTime[s] = msg.logMonoTime - self.valid[s] = msg.valid - for s in self.data: + self._valid[s.encode('utf8')] = msg.valid + self._rcv_time[s.encode('utf8')] = cur_time + + for s in self.services: # arbitrary small number to avoid float comparison. If freq is 0, we can skip the check - if self.freq[s] > 1e-5: + if self._freq[s] > 1e-5: # alive if delay is within 10x the expected frequency - self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s]) + self._alive[s] = (cur_time - self._rcv_time[s]) < (10. / self._freq[s]) else: - self.alive[s] = True + self._alive[s] = True - cpdef all_alive(self, service_list=None): + cdef _check_valid_alive(self, bool check_valid, bool check_alive, vector[string] service_list=[]): + if service_list.size() == 0: + service_list = self.services + for s in service_list: + if (check_alive and not self._alive[s]) or (check_valid and not self._valid[s]): + return False return True - cpdef all_valid(self, service_list=None): - return True - cpdef all_alive_and_valid(self, service_list=None): - return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list) + cpdef all_alive(self, vector[string] service_list=[]): + return self._check_valid_alive(False, True, service_list) + + cpdef all_valid(self, vector[string] service_list=[]): + return self._check_valid_alive(True, False, service_list) + + cpdef all_alive_and_valid(self, vector[string] service_list=[]): + return self._check_valid_alive(True, True, service_list) diff --git a/messaging/messaging_pyx_setup.py b/messaging/messaging_pyx_setup.py index 992b391..b170923 100644 --- a/messaging/messaging_pyx_setup.py +++ b/messaging/messaging_pyx_setup.py @@ -53,5 +53,6 @@ setup(name='messaging', ] ), nthreads=4, + annotate=True ), ) diff --git a/messaging/tests/test_pub_sub_master.py b/messaging/tests/test_pub_sub_master.py index 7ec824a..bc35202 100755 --- a/messaging/tests/test_pub_sub_master.py +++ b/messaging/tests/test_pub_sub_master.py @@ -18,9 +18,9 @@ class TestSubMaster(unittest.TestCase): def test_init(self): sm = messaging.SubMaster(events) - for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, - sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: - self.assertEqual(len(p), len(events)) + #for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, + # sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: + # self.assertEqual(len(p), len(events)) def test_init_state(self): socks = random_socks() @@ -32,9 +32,9 @@ class TestSubMaster(unittest.TestCase): self.assertTrue(all(f == 0 for f in sm.rcv_frame.values())) self.assertTrue(all(t == 0 for t in sm.logMonoTime.values())) - for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, - sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: - self.assertEqual(len(p), len(socks)) + #for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, + # sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: + # self.assertEqual(len(p), len(socks)) def test_getitem(self): sock = "carState"