mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-03 04:22:09 +08:00
laikad: unit test refactor (#28295)
Refactor laika tests to use replay_process old-commit-hash: 1f0ff21eee457f635059325bdcde61fc1bc6f0b4
This commit is contained in:
@@ -2,12 +2,9 @@
|
||||
import time
|
||||
import unittest
|
||||
from cereal import log
|
||||
import cereal.messaging as messaging
|
||||
from common.params import Params
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
#from unittest.mock import patch
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
from laika.constants import SECS_IN_DAY
|
||||
@@ -19,45 +16,22 @@ from laika.raw_gnss import GNSSMeasurement, read_raw_ublox, read_raw_qcom
|
||||
from selfdrive.locationd.laikad import EPHEMERIS_CACHE, Laikad
|
||||
from selfdrive.test.openpilotci import get_url
|
||||
from tools.lib.logreader import LogReader
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
|
||||
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
|
||||
from selfdrive.test.process_replay.process_replay import get_process_config, replay_process
|
||||
|
||||
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
|
||||
|
||||
|
||||
def get_ublox_gnss(ubloxraw):
|
||||
with OpenpilotPrefix():
|
||||
managed_processes['ubloxd'].start()
|
||||
timeout_ms = 30
|
||||
pm = messaging.PubMaster(['ubloxRaw'])
|
||||
sock = messaging.sub_sock('ubloxGnss', timeout=timeout_ms)
|
||||
|
||||
log_msgs = []
|
||||
log_t = []
|
||||
for x in tqdm(ubloxraw):
|
||||
pm.send(x.which(), x.as_builder())
|
||||
ret = messaging.recv_one(sock)
|
||||
if ret is not None:
|
||||
msg = log.Event.new_message(ubloxGnss=ret.ubloxGnss.to_dict())
|
||||
msg.logMonoTime = x.logMonoTime
|
||||
log_msgs.append(msg)
|
||||
log_t.append(1e-9 * x.logMonoTime)
|
||||
assert managed_processes['ubloxd'].get_process_state_msg().running
|
||||
assert len(log_msgs) > 1 or len(ubloxraw) == 0
|
||||
managed_processes['ubloxd'].stop()
|
||||
return log_t, log_msgs
|
||||
|
||||
|
||||
|
||||
def get_log(segs=range(0)):
|
||||
def get_log_ublox(segs=range(0)):
|
||||
logs = []
|
||||
for i in segs:
|
||||
logs.extend(LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", i)))
|
||||
|
||||
raw_logs = [m for m in logs if m.which() == 'ubloxRaw']
|
||||
all_logs = get_ublox_gnss(raw_logs)[1]
|
||||
ublox_cfg = get_process_config("ubloxd")
|
||||
all_logs = replay_process(ublox_cfg, logs)
|
||||
low_gnss = []
|
||||
for m in all_logs:
|
||||
if m.ubloxGnss.which() != 'measurementReport':
|
||||
if m.which() != "ubloxGnss" or m.ubloxGnss.which() != 'measurementReport':
|
||||
continue
|
||||
|
||||
MAX_MEAS = 7
|
||||
@@ -72,6 +46,7 @@ def get_log(segs=range(0)):
|
||||
low_gnss.append(m)
|
||||
return all_logs, low_gnss
|
||||
|
||||
|
||||
def get_log_qcom(segs=range(0)):
|
||||
logs = []
|
||||
for i in segs:
|
||||
@@ -79,13 +54,16 @@ def get_log_qcom(segs=range(0)):
|
||||
all_logs = [m for m in logs if m.which() == 'qcomGnss']
|
||||
return all_logs
|
||||
|
||||
|
||||
def verify_messages(lr, laikad, return_one_success=False):
|
||||
good_msgs = []
|
||||
for m in lr:
|
||||
if m.which() == 'ubloxGnss':
|
||||
gnss_msg = m.ubloxGnss
|
||||
else:
|
||||
elif m.which() == 'qcomGnss':
|
||||
gnss_msg = m.qcomGnss
|
||||
else:
|
||||
continue
|
||||
msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=True)
|
||||
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
|
||||
good_msgs.append(msg)
|
||||
@@ -101,7 +79,7 @@ def get_first_gps_time(logs):
|
||||
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
|
||||
if len(new_meas) > 0:
|
||||
return new_meas[0].recv_time
|
||||
else:
|
||||
elif m.which() == "qcomGnss":
|
||||
if m.qcomGnss.which == 'measurementReport':
|
||||
new_meas = read_raw_qcom(m.qcomGnss.measurementReport)
|
||||
if len(new_meas) > 0:
|
||||
@@ -116,14 +94,11 @@ def get_measurement_mock(gpstime, sat_ephemeris):
|
||||
return meas
|
||||
|
||||
|
||||
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
|
||||
|
||||
|
||||
class TestLaikad(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
logs, low_gnss = get_log(range(1))
|
||||
logs, low_gnss = get_log_ublox(range(1))
|
||||
cls.logs = logs
|
||||
cls.low_gnss = low_gnss
|
||||
cls.logs_qcom = get_log_qcom(range(1))
|
||||
@@ -190,6 +165,9 @@ class TestLaikad(unittest.TestCase):
|
||||
self.assertFalse(all(laikad.kf_valid(m.logMonoTime * 1e-9)))
|
||||
kf_valid = False
|
||||
for m in self.logs:
|
||||
if m.which() != 'ubloxGnss':
|
||||
continue
|
||||
|
||||
laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
|
||||
kf_valid = all(laikad.kf_valid(m.logMonoTime * 1e-9))
|
||||
if kf_valid:
|
||||
@@ -247,6 +225,9 @@ class TestLaikad(unittest.TestCase):
|
||||
seen_internet_eph = False
|
||||
|
||||
for m in logs:
|
||||
if m.which() != 'ubloxGnss' and m.which() != 'qcomGnss':
|
||||
continue
|
||||
|
||||
gnss_msg = m.qcomGnss if use_qcom else m.ubloxGnss
|
||||
out_msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=False)
|
||||
if laikad.orbit_fetch_future is not None:
|
||||
@@ -301,8 +282,6 @@ class TestLaikad(unittest.TestCase):
|
||||
self.assertTrue(any([x.source=='cache' for x in msg.gnssMeasurements.ephemerisStatuses]))
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
|
||||
|
||||
#TODO test cache with only orbits
|
||||
#with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
|
||||
# # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
|
||||
@@ -333,5 +312,7 @@ class TestLaikad(unittest.TestCase):
|
||||
def dict_has_values(self, dct):
|
||||
self.assertGreater(len(dct), 0)
|
||||
self.assertGreater(min([len(v) for v in dct.values()]), 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user