diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 55b9476986..efb30c25c3 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -54,6 +54,7 @@ RETRY_DELAY = 10 # seconds MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately MAX_AGE = 31 * 24 * 3600 # seconds WS_FRAME_SIZE = 4096 +DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds NetworkType = log.DeviceState.NetworkType @@ -99,9 +100,9 @@ send_queue: Queue[str] = queue.Queue() upload_queue: Queue[UploadItem] = queue.Queue() low_priority_send_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue() +cancelled_uploads: set[str] = set() cur_upload_items: dict[int, UploadItem | None] = {} -cur_upload_items_lock = threading.Lock() def strip_zst_extension(fn: str) -> str: @@ -129,9 +130,8 @@ class UploadQueueCache: @staticmethod def cache(upload_queue: Queue[UploadItem]) -> None: try: - with upload_queue.mutex: - items = [asdict(item) for item in upload_queue.queue] - + queue: list[UploadItem | None] = list(upload_queue.queue) + items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)] Params().put("AthenadUploadQueue", json.dumps(items)) except Exception: cloudlog.exception("athena.UploadQueueCache.cache.exception") @@ -201,8 +201,7 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr upload_queue.put_nowait(item) UploadQueueCache.cache(upload_queue) - with cur_upload_items_lock: - cur_upload_items[tid] = None + cur_upload_items[tid] = None for _ in range(RETRY_DELAY): time.sleep(1) @@ -213,16 +212,16 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None: # Abort transfer if connection changed to metered after starting upload # or if athenad is shutting down to re-connect the websocket - sm.update(0) - metered = sm['deviceState'].networkMetered - if metered and (not item.allow_cellular): - raise AbortTransferException + if not item.allow_cellular: + if (time.monotonic() - sm.recv_time['deviceState']) > DEVICE_STATE_UPDATE_INTERVAL: + sm.update(0) + if sm['deviceState'].networkMetered: + raise AbortTransferException if end_event.is_set(): raise AbortTransferException - with cur_upload_items_lock: - cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) + cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) def upload_handler(end_event: threading.Event) -> None: @@ -230,10 +229,14 @@ def upload_handler(end_event: threading.Event) -> None: tid = threading.get_ident() while not end_event.is_set(): + cur_upload_items[tid] = None + try: - with cur_upload_items_lock: - cur_upload_items[tid] = None - cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) + cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) + + if item.id in cancelled_uploads: + cancelled_uploads.remove(item.id) + continue # Remove item if too old age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000) @@ -412,13 +415,8 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo @dispatcher.add_method def listUploadQueue() -> list[UploadItemDict]: - with upload_queue.mutex: - items = list(upload_queue.queue) - - with cur_upload_items_lock: - items += list(cur_upload_items.values()) - - return [asdict(item) for item in items] + items = list(upload_queue.queue) + list(cur_upload_items.values()) + return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)] @dispatcher.add_method @@ -426,14 +424,13 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]: if not isinstance(upload_id, list): upload_id = [upload_id] - with upload_queue.mutex: - remaining_items = [item for item in upload_queue.queue if item.id not in upload_id] - if len(remaining_items) == len(upload_queue.queue): - return {"success": 0, "error": "not found"} + uploading_ids = {item.id for item in list(upload_queue.queue)} + cancelled_ids = uploading_ids.intersection(upload_id) + if len(cancelled_ids) == 0: + return {"success": 0, "error": "not found"} - upload_queue.queue.clear() - upload_queue.queue.extend(remaining_items) - return {"success": 1} + cancelled_uploads.update(cancelled_ids) + return {"success": 1} @dispatcher.add_method def setRouteViewed(route: str) -> dict[str, int | str]: @@ -827,4 +824,4 @@ def main(exit_event: threading.Event = None): if __name__ == "__main__": - main() + main() \ No newline at end of file