Fix athena like stock to allow uploads cuz im gonna go crazy

This commit is contained in:
DevTekVE
2024-12-14 21:57:42 +01:00
parent 10fc0de603
commit b571e4cff4
+27 -30
View File
@@ -54,6 +54,7 @@ RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
MAX_AGE = 31 * 24 * 3600 # seconds
WS_FRAME_SIZE = 4096
DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds
NetworkType = log.DeviceState.NetworkType
@@ -99,9 +100,9 @@ send_queue: Queue[str] = queue.Queue()
upload_queue: Queue[UploadItem] = queue.Queue()
low_priority_send_queue: Queue[str] = queue.Queue()
log_recv_queue: Queue[str] = queue.Queue()
cancelled_uploads: set[str] = set()
cur_upload_items: dict[int, UploadItem | None] = {}
cur_upload_items_lock = threading.Lock()
def strip_zst_extension(fn: str) -> str:
@@ -129,9 +130,8 @@ class UploadQueueCache:
@staticmethod
def cache(upload_queue: Queue[UploadItem]) -> None:
try:
with upload_queue.mutex:
items = [asdict(item) for item in upload_queue.queue]
queue: list[UploadItem | None] = list(upload_queue.queue)
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
Params().put("AthenadUploadQueue", json.dumps(items))
except Exception:
cloudlog.exception("athena.UploadQueueCache.cache.exception")
@@ -201,8 +201,7 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
with cur_upload_items_lock:
cur_upload_items[tid] = None
cur_upload_items[tid] = None
for _ in range(RETRY_DELAY):
time.sleep(1)
@@ -213,16 +212,16 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr
def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None:
# Abort transfer if connection changed to metered after starting upload
# or if athenad is shutting down to re-connect the websocket
sm.update(0)
metered = sm['deviceState'].networkMetered
if metered and (not item.allow_cellular):
raise AbortTransferException
if not item.allow_cellular:
if (time.monotonic() - sm.recv_time['deviceState']) > DEVICE_STATE_UPDATE_INTERVAL:
sm.update(0)
if sm['deviceState'].networkMetered:
raise AbortTransferException
if end_event.is_set():
raise AbortTransferException
with cur_upload_items_lock:
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
def upload_handler(end_event: threading.Event) -> None:
@@ -230,10 +229,14 @@ def upload_handler(end_event: threading.Event) -> None:
tid = threading.get_ident()
while not end_event.is_set():
cur_upload_items[tid] = None
try:
with cur_upload_items_lock:
cur_upload_items[tid] = None
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue
# Remove item if too old
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000)
@@ -412,13 +415,8 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
@dispatcher.add_method
def listUploadQueue() -> list[UploadItemDict]:
with upload_queue.mutex:
items = list(upload_queue.queue)
with cur_upload_items_lock:
items += list(cur_upload_items.values())
return [asdict(item) for item in items]
items = list(upload_queue.queue) + list(cur_upload_items.values())
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]
@dispatcher.add_method
@@ -426,14 +424,13 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]:
if not isinstance(upload_id, list):
upload_id = [upload_id]
with upload_queue.mutex:
remaining_items = [item for item in upload_queue.queue if item.id not in upload_id]
if len(remaining_items) == len(upload_queue.queue):
return {"success": 0, "error": "not found"}
uploading_ids = {item.id for item in list(upload_queue.queue)}
cancelled_ids = uploading_ids.intersection(upload_id)
if len(cancelled_ids) == 0:
return {"success": 0, "error": "not found"}
upload_queue.queue.clear()
upload_queue.queue.extend(remaining_items)
return {"success": 1}
cancelled_uploads.update(cancelled_ids)
return {"success": 1}
@dispatcher.add_method
def setRouteViewed(route: str) -> dict[str, int | str]:
@@ -827,4 +824,4 @@ def main(exit_event: threading.Event = None):
if __name__ == "__main__":
main()
main()