mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-28 01:52:06 +08:00
Revert "athenad: fix thread safety issues in upload handing" (#34224)
Revert "athenad: fix thread safety issues in upload handing (#34199)"
This reverts commit dcb3113c4b.
This commit is contained in:
+22
-25
@@ -100,9 +100,9 @@ send_queue: Queue[str] = queue.Queue()
|
||||
upload_queue: Queue[UploadItem] = queue.Queue()
|
||||
low_priority_send_queue: Queue[str] = queue.Queue()
|
||||
log_recv_queue: Queue[str] = queue.Queue()
|
||||
cancelled_uploads: set[str] = set()
|
||||
|
||||
cur_upload_items: dict[int, UploadItem | None] = {}
|
||||
cur_upload_items_lock = threading.Lock()
|
||||
|
||||
|
||||
def strip_zst_extension(fn: str) -> str:
|
||||
@@ -130,9 +130,8 @@ class UploadQueueCache:
|
||||
@staticmethod
|
||||
def cache(upload_queue: Queue[UploadItem]) -> None:
|
||||
try:
|
||||
with upload_queue.mutex:
|
||||
items = [asdict(item) for item in upload_queue.queue]
|
||||
|
||||
queue: list[UploadItem | None] = list(upload_queue.queue)
|
||||
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
|
||||
Params().put("AthenadUploadQueue", json.dumps(items))
|
||||
except Exception:
|
||||
cloudlog.exception("athena.UploadQueueCache.cache.exception")
|
||||
@@ -199,13 +198,11 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr
|
||||
progress=0,
|
||||
current=False
|
||||
)
|
||||
|
||||
with cur_upload_items_lock:
|
||||
upload_queue.put_nowait(item)
|
||||
cur_upload_items[tid] = None
|
||||
|
||||
upload_queue.put_nowait(item)
|
||||
UploadQueueCache.cache(upload_queue)
|
||||
|
||||
cur_upload_items[tid] = None
|
||||
|
||||
for _ in range(RETRY_DELAY):
|
||||
time.sleep(1)
|
||||
if end_event.is_set():
|
||||
@@ -224,8 +221,7 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None:
|
||||
if end_event.is_set():
|
||||
raise AbortTransferException
|
||||
|
||||
with cur_upload_items_lock:
|
||||
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
|
||||
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
|
||||
|
||||
|
||||
def upload_handler(end_event: threading.Event) -> None:
|
||||
@@ -233,10 +229,14 @@ def upload_handler(end_event: threading.Event) -> None:
|
||||
tid = threading.get_ident()
|
||||
|
||||
while not end_event.is_set():
|
||||
cur_upload_items[tid] = None
|
||||
|
||||
try:
|
||||
with cur_upload_items_lock:
|
||||
cur_upload_items[tid] = None
|
||||
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
|
||||
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
|
||||
|
||||
if item.id in cancelled_uploads:
|
||||
cancelled_uploads.remove(item.id)
|
||||
continue
|
||||
|
||||
# Remove item if too old
|
||||
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000)
|
||||
@@ -415,10 +415,8 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
|
||||
|
||||
@dispatcher.add_method
|
||||
def listUploadQueue() -> list[UploadItemDict]:
|
||||
with cur_upload_items_lock, upload_queue.mutex:
|
||||
items = list(upload_queue.queue) + [item for item in cur_upload_items.values() if item is not None]
|
||||
|
||||
return [asdict(item) for item in items]
|
||||
items = list(upload_queue.queue) + list(cur_upload_items.values())
|
||||
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
@@ -426,14 +424,13 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]:
|
||||
if not isinstance(upload_id, list):
|
||||
upload_id = [upload_id]
|
||||
|
||||
with upload_queue.mutex:
|
||||
remaining_items = [item for item in upload_queue.queue if item.id not in upload_id]
|
||||
if len(remaining_items) == len(upload_queue.queue):
|
||||
return {"success": 0, "error": "not found"}
|
||||
uploading_ids = {item.id for item in list(upload_queue.queue)}
|
||||
cancelled_ids = uploading_ids.intersection(upload_id)
|
||||
if len(cancelled_ids) == 0:
|
||||
return {"success": 0, "error": "not found"}
|
||||
|
||||
upload_queue.queue.clear()
|
||||
upload_queue.queue.extend(remaining_items)
|
||||
return {"success": 1}
|
||||
cancelled_uploads.update(cancelled_ids)
|
||||
return {"success": 1}
|
||||
|
||||
@dispatcher.add_method
|
||||
def setRouteViewed(route: str) -> dict[str, int | str]:
|
||||
|
||||
@@ -78,6 +78,7 @@ class TestAthenadMethods:
|
||||
|
||||
athenad.upload_queue = queue.Queue()
|
||||
athenad.cur_upload_items.clear()
|
||||
athenad.cancelled_uploads.clear()
|
||||
|
||||
for i in os.listdir(Paths.log_root()):
|
||||
p = os.path.join(Paths.log_root(), i)
|
||||
@@ -281,10 +282,13 @@ class TestAthenadMethods:
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
dispatcher["cancelUpload"](item.id)
|
||||
|
||||
assert item.id in athenad.cancelled_uploads
|
||||
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
assert athenad.upload_queue.qsize() == 0
|
||||
assert len(athenad.cancelled_uploads) == 0
|
||||
|
||||
@with_upload_handler
|
||||
def test_cancel_expiry(self):
|
||||
@@ -327,7 +331,7 @@ class TestAthenadMethods:
|
||||
assert items[0] == asdict(item)
|
||||
assert not items[0]['current']
|
||||
|
||||
dispatcher["cancelUpload"](item.id)
|
||||
athenad.cancelled_uploads.add(item.id)
|
||||
items = dispatcher["listUploadQueue"]()
|
||||
assert len(items) == 0
|
||||
|
||||
@@ -339,7 +343,7 @@ class TestAthenadMethods:
|
||||
athenad.upload_queue.put_nowait(item2)
|
||||
|
||||
# Ensure canceled items are not persisted
|
||||
dispatcher["cancelUpload"](item2.id)
|
||||
athenad.cancelled_uploads.add(item2.id)
|
||||
|
||||
# serialize item
|
||||
athenad.UploadQueueCache.cache(athenad.upload_queue)
|
||||
|
||||
Reference in New Issue
Block a user