unittest forever (#685)

* unittest

* no retry

* rm that
This commit is contained in:
Adeeb Shihadeh
2026-05-14 17:17:14 -07:00
committed by GitHub
parent 1f07e67c03
commit ac86ad0809
7 changed files with 29 additions and 43 deletions

View File

@@ -28,5 +28,5 @@ test:
# *** tests ***
test_runner:
run: msgq/test_runner
pytest:
run: pytest
unittest:
run: python -m unittest discover -t . -s msgq

View File

@@ -1,6 +0,0 @@
import pytest
import msgq
@pytest.fixture(autouse=True)
def msgq_context():
msgq.context = msgq.Context()

View File

@@ -1,6 +1,6 @@
import pytest
import multiprocessing
import platform
import unittest
import msgq
from parameterized import parameterized_class
from typing import Optional
@@ -8,8 +8,8 @@ from typing import Optional
WAIT_TIMEOUT = 5
@pytest.mark.skipif(condition=platform.system() == "Darwin", reason="Events not supported on macOS")
class TestEvents:
@unittest.skipIf(platform.system() == "Darwin", "Events not supported on macOS")
class TestEvents(unittest.TestCase):
def test_mutation(self):
handle = msgq.fake_event_handle("carState")
@@ -32,7 +32,7 @@ class TestEvents:
event.wait(WAIT_TIMEOUT)
assert event.peek()
except RuntimeError:
pytest.fail("event.wait() timed out")
self.fail("event.wait() timed out")
def test_wait_multiprocess(self):
handle = msgq.fake_event_handle("carState")
@@ -47,7 +47,7 @@ class TestEvents:
event.wait(WAIT_TIMEOUT)
assert event.peek()
except RuntimeError:
pytest.fail("event.wait() timed out")
self.fail("event.wait() timed out")
p.kill()
@@ -57,26 +57,28 @@ class TestEvents:
try:
event.wait(0)
pytest.fail("event.wait() did not time out")
self.fail("event.wait() did not time out")
except RuntimeError:
assert not event.peek()
@pytest.mark.skipif(condition=platform.system() == "Darwin", reason="FakeSockets not supported on macOS")
@unittest.skipIf(platform.system() == "Darwin", "FakeSockets not supported on macOS")
@parameterized_class([{"prefix": None}, {"prefix": "test"}])
class TestFakeSockets:
class TestFakeSockets(unittest.TestCase):
prefix: Optional[str] = None
def setup_method(self):
def setUp(self):
super().setUp()
msgq.toggle_fake_events(True)
if self.prefix is not None:
msgq.set_fake_prefix(self.prefix)
else:
msgq.delete_fake_prefix()
def teardown_method(self):
def tearDown(self):
msgq.toggle_fake_events(False)
msgq.delete_fake_prefix()
super().tearDown()
def test_event_handle_init(self):
handle = msgq.fake_event_handle("controlsState", override=True)
@@ -132,7 +134,7 @@ class TestFakeSockets:
_ = sub_sock.receive()
assert not recv_called.peek()
except RuntimeError:
pytest.fail("event.wait() timed out")
self.fail("event.wait() timed out")
def test_synced_pub_sub(self):
def daemon_repub_process_run():
@@ -181,6 +183,6 @@ class TestFakeSockets:
frame = int.from_bytes(msg, 'little')
assert frame == i
except RuntimeError:
pytest.fail("event.wait() timed out")
self.fail("event.wait() timed out")
finally:
p.kill()

View File

@@ -1,8 +1,9 @@
import random
import time
import string
import unittest
import msgq
import pytest
def random_sock():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
@@ -10,7 +11,7 @@ def random_sock():
def random_bytes(length=1000):
return bytes([random.randrange(0xFF) for _ in range(length)])
class TestPubSubSockets:
class TestPubSubSockets(unittest.TestCase):
def test_pub_sub(self):
sock = random_sock()
@@ -45,13 +46,14 @@ class TestPubSubSockets:
for rec_msg, sent_msg in zip(recvd_msgs, sent_msgs):
assert rec_msg == sent_msg
@pytest.mark.flaky(retries=3, delay=1)
def test_receive_timeout(self):
sock = random_sock()
timeout = random.randrange(200)
sub_sock = msgq.sub_sock(sock, timeout=timeout)
timeout_ms = 50
sub_sock = msgq.sub_sock(sock, timeout=timeout_ms)
start_time = time.monotonic()
recvd = sub_sock.receive()
assert (time.monotonic() - start_time) < (timeout + 0.1)
elapsed = time.monotonic() - start_time
assert recvd is None
assert elapsed >= timeout_ms / 1000
assert elapsed < 5 # this can be noisy due to other load on the system

View File

@@ -1,4 +1,4 @@
import pytest
import unittest
import time
import msgq
import concurrent.futures
@@ -20,7 +20,7 @@ def poller():
return r
class TestPoller:
class TestPoller(unittest.TestCase):
def test_poll_once(self):
context = msgq.Context()
@@ -73,7 +73,7 @@ class TestPoller:
def test_multiple_publishers_exception(self):
context = msgq.Context()
with pytest.raises(msgq.MultiplePublishersError):
with self.assertRaises(msgq.MultiplePublishersError):
pub1 = msgq.PubSocket()
pub1.connect(context, SERVICE_NAME)

View File

@@ -1,10 +1,11 @@
import random
import unittest
from typing import Optional
import numpy as np
from msgq.visionipc import VisionIpcServer, VisionIpcClient, VisionStreamType
class TestVisionIpc:
class TestVisionIpc(unittest.TestCase):
server: VisionIpcServer
client: Optional[VisionIpcClient]

View File

@@ -20,8 +20,6 @@ dependencies = [
"parameterized",
"coverage",
"numpy",
"pytest",
"pytest-retry",
"cppcheck",
"cpplint",
"codespell",
@@ -38,20 +36,9 @@ lint.flake8-implicit-str-concat.allow-multiline=false
line-length = 160
target-version="py311"
[tool.ruff.lint.flake8-tidy-imports.banned-api]
"pytest.main".msg = "pytest.main requires special handling that is easy to mess up!"
"unittest".msg = "Use pytest"
[tool.ty.src]
exclude = ["site_scons/"]
[tool.ty.rules]
# Cython modules are compiled at build time, not available for static analysis
unresolved-import = "ignore"
[tool.pytest.ini_options]
addopts = "--durations=10"
testpaths = [
"msgq/tests",
"msgq/visionipc/tests",
]