mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-03 04:22:09 +08:00
athena: persist upload queue to disk (#22659)
* revert submodules * lowercase * addressed comments * add test for cancelled uploads * formatting * catch all exceptions * handle empty param Co-authored-by: Willem Melching <willem.melching@gmail.com> old-commit-hash: 8cb83b29a6b0cb1e5081651e96745c425e7e31f9
This commit is contained in:
@@ -56,6 +56,28 @@ UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', '
|
||||
cur_upload_items = {}
|
||||
|
||||
|
||||
class UploadQueueCache():
|
||||
params = Params()
|
||||
|
||||
@staticmethod
|
||||
def initialize(upload_queue):
|
||||
try:
|
||||
upload_queue_json = UploadQueueCache.params.get("AthenadUploadQueue")
|
||||
if upload_queue_json is not None:
|
||||
for item in json.loads(upload_queue_json):
|
||||
upload_queue.put(UploadItem(**item))
|
||||
except Exception:
|
||||
cloudlog.exception("athena.UploadQueueCache.initialize.exception")
|
||||
|
||||
@staticmethod
|
||||
def cache(upload_queue):
|
||||
try:
|
||||
items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads]
|
||||
UploadQueueCache.params.put("AthenadUploadQueue", json.dumps(items))
|
||||
except Exception:
|
||||
cloudlog.exception("athena.UploadQueueCache.cache.exception")
|
||||
|
||||
|
||||
def handle_long_poll(ws):
|
||||
end_event = threading.Event()
|
||||
|
||||
@@ -111,6 +133,7 @@ def upload_handler(end_event):
|
||||
|
||||
try:
|
||||
cur_upload_items[tid] = upload_queue.get(timeout=1)._replace(current=True)
|
||||
|
||||
if cur_upload_items[tid].id in cancelled_uploads:
|
||||
cancelled_uploads.remove(cur_upload_items[tid].id)
|
||||
continue
|
||||
@@ -120,6 +143,7 @@ def upload_handler(end_event):
|
||||
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)
|
||||
|
||||
_do_upload(cur_upload_items[tid], cb)
|
||||
UploadQueueCache.cache(upload_queue)
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
|
||||
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")
|
||||
|
||||
@@ -131,6 +155,8 @@ def upload_handler(end_event):
|
||||
current=False
|
||||
)
|
||||
upload_queue.put_nowait(item)
|
||||
UploadQueueCache.cache(upload_queue)
|
||||
|
||||
cur_upload_items[tid] = None
|
||||
|
||||
for _ in range(RETRY_DELAY):
|
||||
@@ -248,6 +274,7 @@ def uploadFileToUrl(fn, url, headers):
|
||||
item = item._replace(id=upload_id)
|
||||
|
||||
upload_queue.put_nowait(item)
|
||||
UploadQueueCache.cache(upload_queue)
|
||||
|
||||
return {"enqueued": 1, "item": item._asdict()}
|
||||
|
||||
@@ -280,8 +307,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
|
||||
|
||||
cloudlog.debug("athena.startLocalProxy.starting")
|
||||
|
||||
params = Params()
|
||||
dongle_id = params.get("DongleId").decode('utf8')
|
||||
dongle_id = Params().get("DongleId").decode('utf8')
|
||||
identity_token = Api(dongle_id).get_token()
|
||||
ws = create_connection(remote_ws_uri,
|
||||
cookie="jwt=" + identity_token,
|
||||
@@ -525,6 +551,7 @@ def backoff(retries):
|
||||
def main():
|
||||
params = Params()
|
||||
dongle_id = params.get("DongleId", encoding='utf-8')
|
||||
UploadQueueCache.initialize(upload_queue)
|
||||
|
||||
ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id
|
||||
api = Api(dongle_id)
|
||||
|
||||
@@ -50,18 +50,28 @@ class MockApi():
|
||||
|
||||
|
||||
class MockParams():
|
||||
def __init__(self):
|
||||
self.params = {
|
||||
"DongleId": b"0000000000000000",
|
||||
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private" # noqa: E501
|
||||
}
|
||||
default_params = {
|
||||
"DongleId": b"0000000000000000",
|
||||
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private", # noqa: E501
|
||||
"AthenadUploadQueue": '[]'
|
||||
}
|
||||
params = default_params.copy()
|
||||
|
||||
@staticmethod
|
||||
def restore_defaults():
|
||||
MockParams.params = MockParams.default_params.copy()
|
||||
|
||||
def get(self, k, encoding=None):
|
||||
ret = self.params.get(k)
|
||||
ret = MockParams.params.get(k)
|
||||
if ret is not None and encoding is not None:
|
||||
ret = ret.decode(encoding)
|
||||
return ret
|
||||
|
||||
def put(self, k, v):
|
||||
if k not in MockParams.params:
|
||||
raise KeyError(f"key: {k} not in MockParams")
|
||||
MockParams.params[k] = v
|
||||
|
||||
|
||||
class MockWebsocket():
|
||||
def __init__(self, recv_queue, send_queue):
|
||||
|
||||
@@ -21,19 +21,22 @@ from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher
|
||||
from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server
|
||||
from cereal import messaging
|
||||
|
||||
|
||||
class TestAthenadMethods(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.SOCKET_PORT = 45454
|
||||
athenad.Params = MockParams
|
||||
athenad.ROOT = tempfile.mkdtemp()
|
||||
athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp()
|
||||
athenad.Params = MockParams
|
||||
athenad.Api = MockApi
|
||||
athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT])
|
||||
|
||||
def setUp(self):
|
||||
MockParams.restore_defaults()
|
||||
athenad.upload_queue = queue.Queue()
|
||||
athenad.cur_upload_items.clear()
|
||||
athenad.cancelled_uploads.clear()
|
||||
|
||||
for i in os.listdir(athenad.ROOT):
|
||||
p = os.path.join(athenad.ROOT, i)
|
||||
@@ -249,6 +252,26 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
items = dispatcher["listUploadQueue"]()
|
||||
self.assertEqual(len(items), 0)
|
||||
|
||||
def test_upload_queue_persistence(self):
|
||||
item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1')
|
||||
item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2')
|
||||
|
||||
athenad.upload_queue.put_nowait(item1)
|
||||
athenad.upload_queue.put_nowait(item2)
|
||||
|
||||
# Ensure cancelled items are not persisted
|
||||
athenad.cancelled_uploads.add(item2.id)
|
||||
|
||||
# serialize item
|
||||
athenad.UploadQueueCache.cache(athenad.upload_queue)
|
||||
|
||||
# deserialize item
|
||||
athenad.upload_queue.queue.clear()
|
||||
athenad.UploadQueueCache.initialize(athenad.upload_queue)
|
||||
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 1)
|
||||
self.assertDictEqual(athenad.upload_queue.queue[-1]._asdict(), item1._asdict())
|
||||
|
||||
@mock.patch('selfdrive.athena.athenad.create_connection')
|
||||
def test_startLocalProxy(self, mock_create_connection):
|
||||
end_event = threading.Event()
|
||||
|
||||
@@ -85,6 +85,7 @@ private:
|
||||
std::unordered_map<std::string, uint32_t> keys = {
|
||||
{"AccessToken", CLEAR_ON_MANAGER_START | DONT_LOG},
|
||||
{"AthenadPid", PERSISTENT},
|
||||
{"AthenadUploadQueue", PERSISTENT},
|
||||
{"BootedOnroad", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF},
|
||||
{"CalibrationParams", PERSISTENT},
|
||||
{"CarBatteryCapacity", PERSISTENT},
|
||||
|
||||
Reference in New Issue
Block a user