mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-27 08:52:05 +08:00
cleanup azure handling (#31034)
* wip cleanup * Wip * fixes * fix * cleanup * keep this for now * dest
This commit is contained in:
@@ -3,33 +3,21 @@ import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from functools import lru_cache
|
||||
from typing import Iterable, Optional
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
from azure.storage.blob import ContainerClient
|
||||
from tqdm import tqdm
|
||||
|
||||
from openpilot.selfdrive.car.tests.routes import routes as test_car_models_routes
|
||||
from openpilot.selfdrive.test.process_replay.test_processes import source_segments as replay_segments
|
||||
from openpilot.tools.lib.openpilotci import (DATA_CI_ACCOUNT, DATA_CI_ACCOUNT_URL, OPENPILOT_CI_CONTAINER,
|
||||
DATA_CI_CONTAINER, get_azure_credential, get_container_sas, upload_file)
|
||||
from openpilot.tools.lib.azure_container import AzureContainer
|
||||
from openpilot.tools.lib.openpilotcontainers import DataCIContainer, DataProdContainer, OpenpilotCIContainer
|
||||
|
||||
DATA_PROD_ACCOUNT = "commadata2"
|
||||
DATA_PROD_CONTAINER = "commadata2"
|
||||
|
||||
SOURCES = [
|
||||
(DATA_PROD_ACCOUNT, DATA_PROD_CONTAINER),
|
||||
(DATA_CI_ACCOUNT, DATA_CI_CONTAINER),
|
||||
SOURCES: List[AzureContainer] = [
|
||||
DataProdContainer,
|
||||
DataCIContainer
|
||||
]
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_azure_keys():
|
||||
dest_container = ContainerClient(DATA_CI_ACCOUNT_URL, OPENPILOT_CI_CONTAINER, credential=get_azure_credential())
|
||||
dest_key = get_container_sas(DATA_CI_ACCOUNT, OPENPILOT_CI_CONTAINER)
|
||||
source_keys = [get_container_sas(*s) for s in SOURCES]
|
||||
return dest_container, dest_key, source_keys
|
||||
|
||||
DEST = OpenpilotCIContainer
|
||||
|
||||
def upload_route(path: str, exclude_patterns: Optional[Iterable[str]] = None) -> None:
|
||||
if exclude_patterns is None:
|
||||
@@ -41,11 +29,11 @@ def upload_route(path: str, exclude_patterns: Optional[Iterable[str]] = None) ->
|
||||
for file in os.listdir(path):
|
||||
if any(re.search(pattern, file) for pattern in exclude_patterns):
|
||||
continue
|
||||
upload_file(os.path.join(path, file), f"{destpath}/{file}")
|
||||
DEST.upload_file(os.path.join(path, file), f"{destpath}/{file}")
|
||||
|
||||
|
||||
def sync_to_ci_public(route: str) -> bool:
|
||||
dest_container, dest_key, source_keys = get_azure_keys()
|
||||
dest_container, dest_key = DEST.get_client_and_key()
|
||||
key_prefix = route.replace('|', '/')
|
||||
dongle_id = key_prefix.split('/')[0]
|
||||
|
||||
@@ -53,14 +41,15 @@ def sync_to_ci_public(route: str) -> bool:
|
||||
return True
|
||||
|
||||
print(f"Uploading {route}")
|
||||
for (source_account, source_bucket), source_key in zip(SOURCES, source_keys, strict=True):
|
||||
for source_container in SOURCES:
|
||||
# assumes az login has been run
|
||||
print(f"Trying {source_account}/{source_bucket}")
|
||||
print(f"Trying {source_container.ACCOUNT}/{source_container.CONTAINER}")
|
||||
_, source_key = source_container.get_client_and_key()
|
||||
cmd = [
|
||||
"azcopy",
|
||||
"copy",
|
||||
f"https://{source_account}.blob.core.windows.net/{source_bucket}/{key_prefix}?{source_key}",
|
||||
f"https://{DATA_CI_ACCOUNT}.blob.core.windows.net/{OPENPILOT_CI_CONTAINER}/{dongle_id}?{dest_key}",
|
||||
f"{source_container.BASE_URL}{key_prefix}?{source_key}",
|
||||
f"{DEST.BASE_URL}{dongle_id}?{dest_key}",
|
||||
"--recursive=true",
|
||||
"--overwrite=false",
|
||||
"--exclude-pattern=*/dcamera.hevc",
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import IO, Union
|
||||
|
||||
from azure.storage.blob import ContainerClient
|
||||
|
||||
TOKEN_PATH = Path("/data/azure_token")
|
||||
|
||||
@lru_cache
|
||||
def get_azure_credential():
|
||||
if "AZURE_TOKEN" in os.environ:
|
||||
return os.environ["AZURE_TOKEN"]
|
||||
elif TOKEN_PATH.is_file():
|
||||
return TOKEN_PATH.read_text().strip()
|
||||
else:
|
||||
from azure.identity import AzureCliCredential
|
||||
return AzureCliCredential()
|
||||
|
||||
@lru_cache
|
||||
def get_container_sas(account_name: str, container_name: str):
|
||||
from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
|
||||
start_time = datetime.utcnow()
|
||||
expiry_time = start_time + timedelta(hours=1)
|
||||
blob_service = BlobServiceClient(
|
||||
account_url=f"https://{account_name}.blob.core.windows.net",
|
||||
credential=get_azure_credential(),
|
||||
)
|
||||
return generate_container_sas(
|
||||
account_name,
|
||||
container_name,
|
||||
user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
|
||||
permission=ContainerSasPermissions(read=True, write=True, list=True),
|
||||
expiry=expiry_time,
|
||||
)
|
||||
|
||||
class AzureContainer:
|
||||
def __init__(self, account, container):
|
||||
self.ACCOUNT = account
|
||||
self.CONTAINER = container
|
||||
|
||||
@property
|
||||
def ACCOUNT_URL(self) -> str:
|
||||
return f"https://{self.ACCOUNT}.blob.core.windows.net"
|
||||
|
||||
@property
|
||||
def BASE_URL(self) -> str:
|
||||
return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
|
||||
|
||||
def get_client_and_key(self):
|
||||
client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
|
||||
key = get_container_sas(self.ACCOUNT, self.CONTAINER)
|
||||
return client, key
|
||||
|
||||
def get_url(self, route_name: str, segment_num, log_type="rlog") -> str:
|
||||
ext = "hevc" if log_type.endswith('camera') else "bz2"
|
||||
return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
|
||||
|
||||
def upload_bytes(self, data: Union[bytes, IO], blob_name: str) -> str:
|
||||
from azure.storage.blob import BlobClient
|
||||
blob = BlobClient(
|
||||
account_url=self.ACCOUNT_URL,
|
||||
container_name=self.CONTAINER,
|
||||
blob_name=blob_name,
|
||||
credential=get_azure_credential(),
|
||||
overwrite=False,
|
||||
)
|
||||
blob.upload_blob(data)
|
||||
return self.BASE_URL + blob_name
|
||||
|
||||
def upload_file(self, path: Union[str, os.PathLike], blob_name: str) -> str:
|
||||
with open(path, "rb") as f:
|
||||
return self.upload_bytes(f, blob_name)
|
||||
Executable → Regular
+8
-62
@@ -1,66 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import IO, Union
|
||||
from openpilot.tools.lib.openpilotcontainers import OpenpilotCIContainer
|
||||
|
||||
DATA_CI_ACCOUNT = "commadataci"
|
||||
DATA_CI_ACCOUNT_URL = f"https://{DATA_CI_ACCOUNT}.blob.core.windows.net"
|
||||
OPENPILOT_CI_CONTAINER = "openpilotci"
|
||||
DATA_CI_CONTAINER = "commadataci"
|
||||
BASE_URL = f"{DATA_CI_ACCOUNT_URL}/{OPENPILOT_CI_CONTAINER}/"
|
||||
def get_url(*args, **kwargs):
|
||||
return OpenpilotCIContainer.get_url(*args, **kwargs)
|
||||
|
||||
TOKEN_PATH = Path("/data/azure_token")
|
||||
def upload_file(*args, **kwargs):
|
||||
return OpenpilotCIContainer.upload_file(*args, **kwargs)
|
||||
|
||||
def upload_bytes(*args, **kwargs):
|
||||
return OpenpilotCIContainer.upload_bytes(*args, **kwargs)
|
||||
|
||||
def get_url(route_name: str, segment_num, log_type="rlog") -> str:
|
||||
ext = "hevc" if log_type.endswith('camera') else "bz2"
|
||||
return BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_azure_credential():
|
||||
if "AZURE_TOKEN" in os.environ:
|
||||
return os.environ["AZURE_TOKEN"]
|
||||
elif TOKEN_PATH.is_file():
|
||||
return TOKEN_PATH.read_text().strip()
|
||||
else:
|
||||
from azure.identity import AzureCliCredential
|
||||
return AzureCliCredential()
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_container_sas(account_name: str, container_name: str):
|
||||
from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
|
||||
start_time = datetime.utcnow()
|
||||
expiry_time = start_time + timedelta(hours=1)
|
||||
blob_service = BlobServiceClient(
|
||||
account_url=f"https://{account_name}.blob.core.windows.net",
|
||||
credential=get_azure_credential(),
|
||||
)
|
||||
return generate_container_sas(
|
||||
account_name,
|
||||
container_name,
|
||||
user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
|
||||
permission=ContainerSasPermissions(read=True, write=True, list=True),
|
||||
expiry=expiry_time,
|
||||
)
|
||||
|
||||
|
||||
def upload_bytes(data: Union[bytes, IO], blob_name: str) -> str:
|
||||
from azure.storage.blob import BlobClient
|
||||
blob = BlobClient(
|
||||
account_url=DATA_CI_ACCOUNT_URL,
|
||||
container_name=OPENPILOT_CI_CONTAINER,
|
||||
blob_name=blob_name,
|
||||
credential=get_azure_credential(),
|
||||
overwrite=False,
|
||||
)
|
||||
blob.upload_blob(data)
|
||||
return BASE_URL + blob_name
|
||||
|
||||
|
||||
def upload_file(path: Union[str, os.PathLike], blob_name: str) -> str:
|
||||
with open(path, "rb") as f:
|
||||
return upload_bytes(f, blob_name)
|
||||
BASE_URL = OpenpilotCIContainer.BASE_URL
|
||||
|
||||
Executable
+6
@@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from openpilot.tools.lib.azure_container import AzureContainer
|
||||
|
||||
OpenpilotCIContainer = AzureContainer("commadataci", "openpilotci")
|
||||
DataCIContainer = AzureContainer("commadataci", "commadataci")
|
||||
DataProdContainer = AzureContainer("commadata2", "commadata2")
|
||||
Reference in New Issue
Block a user