mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-27 17:02:04 +08:00
athenad: update typing
This commit is contained in:
@@ -75,7 +75,7 @@ class UploadFile:
|
||||
allow_cellular: bool
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Dict) -> UploadFile:
|
||||
def from_dict(cls, d: dict) -> UploadFile:
|
||||
return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False))
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ class UploadItem:
|
||||
allow_cellular: bool = False
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Dict) -> UploadItem:
|
||||
def from_dict(cls, d: dict) -> UploadItem:
|
||||
return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"],
|
||||
d["progress"], d["allow_cellular"])
|
||||
|
||||
@@ -309,7 +309,7 @@ def _do_upload(upload_item: UploadItem, callback: Optional[Callable] = None) ->
|
||||
|
||||
# security: user should be able to request any message from their car
|
||||
@dispatcher.add_method
|
||||
def getMessage(service: str, timeout: int = 1000) -> Dict:
|
||||
def getMessage(service: str, timeout: int = 1000) -> dict:
|
||||
if service is None or service not in service_list:
|
||||
raise Exception("invalid service")
|
||||
|
||||
@@ -320,7 +320,7 @@ def getMessage(service: str, timeout: int = 1000) -> Dict:
|
||||
raise TimeoutError
|
||||
|
||||
# this is because capnp._DynamicStructReader doesn't have typing information
|
||||
return cast(Dict, ret.to_dict())
|
||||
return cast(dict, ret.to_dict())
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
|
||||
Reference in New Issue
Block a user