mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-06-27 09:32:05 +08:00
16ec56398c
* add openpilot prefix to imports
* more
* more
* fix docs
* fix linter
* bump submodules
* fix patched tests
* update dynamic imports
* debug
* Revert "debug"
This reverts commit db5e13b9911cc74438bee123bc3430da6c31b24b.
* fix pm test
old-commit-hash: a9626f95b6
50 lines
1.2 KiB
Python
50 lines
1.2 KiB
Python
import os
|
|
import shutil
|
|
import uuid
|
|
|
|
from typing import List, Optional
|
|
|
|
from openpilot.common.params import Params
|
|
|
|
class OpenpilotPrefix(object):
|
|
def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True):
|
|
self.prefix = prefix if prefix else str(uuid.uuid4())
|
|
self.msgq_path = os.path.join('/dev/shm', self.prefix)
|
|
self.clean_dirs_on_exit = clean_dirs_on_exit
|
|
|
|
def __enter__(self):
|
|
os.environ['OPENPILOT_PREFIX'] = self.prefix
|
|
try:
|
|
os.mkdir(self.msgq_path)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_obj, exc_tb):
|
|
if self.clean_dirs_on_exit:
|
|
self.clean_dirs()
|
|
del os.environ['OPENPILOT_PREFIX']
|
|
return False
|
|
|
|
def clean_dirs(self):
|
|
symlink_path = Params().get_param_path()
|
|
if os.path.exists(symlink_path):
|
|
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
|
|
os.remove(symlink_path)
|
|
shutil.rmtree(self.msgq_path, ignore_errors=True)
|
|
|
|
|
|
class DummySocket:
|
|
def __init__(self):
|
|
self.data: List[bytes] = []
|
|
|
|
def receive(self, non_blocking: bool = False) -> Optional[bytes]:
|
|
if non_blocking:
|
|
return None
|
|
|
|
return self.data.pop()
|
|
|
|
def send(self, data: bytes):
|
|
self.data.append(data)
|