mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-13 19:15:02 +08:00
722 lines
20 KiB
Python
722 lines
20 KiB
Python
import importlib
|
|
import os
|
|
import signal
|
|
import stat as statmod
|
|
import struct
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
from pathlib import Path
|
|
from collections.abc import Callable, ValuesView
|
|
from abc import ABC, abstractmethod
|
|
from multiprocessing import Process
|
|
from types import SimpleNamespace
|
|
|
|
from setproctitle import setproctitle
|
|
|
|
from cereal import car, log
|
|
import cereal.messaging as messaging
|
|
import openpilot.system.sentry as sentry
|
|
from openpilot.common.basedir import BASEDIR
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.common.watchdog import WATCHDOG_FN
|
|
|
|
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
|
|
|
|
DEBUG_ENV_KEYS = (
|
|
"XDG_RUNTIME_DIR",
|
|
"WAYLAND_DISPLAY",
|
|
"WAYLAND_SOCKET",
|
|
"QT_QPA_PLATFORM",
|
|
"QT_WAYLAND_SHELL_INTEGRATION",
|
|
"QT_WAYLAND_DISABLE_WINDOWDECORATION",
|
|
)
|
|
|
|
|
|
def _debug_dump_dir() -> Path:
|
|
for candidate in (Path("/data/log"), Path("/tmp")):
|
|
if candidate.is_dir() and os.access(candidate, os.W_OK):
|
|
return candidate
|
|
return Path.cwd()
|
|
|
|
|
|
def _truncate_text(text: str, max_chars: int = 32768) -> str:
|
|
if len(text) <= max_chars:
|
|
return text
|
|
return text[:max_chars] + "\n<truncated>\n"
|
|
|
|
|
|
def _read_text_file(path: Path, max_bytes: int = 16384) -> str:
|
|
try:
|
|
data = path.read_bytes()
|
|
except OSError as e:
|
|
return f"<read failed: {e}>"
|
|
|
|
if len(data) > max_bytes:
|
|
data = data[:max_bytes] + b"\n<truncated>\n"
|
|
|
|
return data.decode("utf-8", errors="replace")
|
|
|
|
|
|
def _read_proc_cmdline(proc_dir: Path, max_bytes: int = 4096) -> str:
|
|
try:
|
|
data = (proc_dir / "cmdline").read_bytes()
|
|
except OSError as e:
|
|
return f"<read failed: {e}>"
|
|
|
|
cmdline = data.replace(b"\0", b" ").decode("utf-8", errors="replace").strip()
|
|
return _truncate_text(cmdline or "<empty>", max_bytes)
|
|
|
|
|
|
def _read_proc_environ(proc_dir: Path) -> dict[str, str]:
|
|
try:
|
|
data = (proc_dir / "environ").read_bytes()
|
|
except OSError:
|
|
return {}
|
|
|
|
env = {}
|
|
for item in data.split(b"\0"):
|
|
if not item or b"=" not in item:
|
|
continue
|
|
key, value = item.split(b"=", 1)
|
|
env[key.decode("utf-8", errors="replace")] = value.decode("utf-8", errors="replace")
|
|
return env
|
|
|
|
|
|
def _format_env_subset(env: dict[str, str]) -> str:
|
|
if not env:
|
|
return "<empty>"
|
|
|
|
lines = [f"{key}={env[key]}" for key in DEBUG_ENV_KEYS if key in env]
|
|
return "\n".join(lines) if lines else "<no relevant keys>"
|
|
|
|
|
|
def _coerce_subprocess_output(data) -> str:
|
|
if data is None:
|
|
return ""
|
|
if isinstance(data, bytes):
|
|
return data.decode("utf-8", errors="replace")
|
|
return data
|
|
|
|
|
|
def _run_debug_command(cmd: str, timeout: float = 1.5, max_chars: int = 32768) -> str:
|
|
try:
|
|
result = subprocess.run(["bash", "-lc", cmd], capture_output=True, text=True, timeout=timeout)
|
|
parts = [f"$ {cmd}", f"<exit {result.returncode}>"]
|
|
if result.stdout:
|
|
parts.append(result.stdout.rstrip())
|
|
if result.stderr:
|
|
parts.extend(["[stderr]", result.stderr.rstrip()])
|
|
if len(parts) == 2:
|
|
parts.append("<no output>")
|
|
return _truncate_text("\n".join(parts), max_chars)
|
|
except subprocess.TimeoutExpired as e:
|
|
stdout = _coerce_subprocess_output(e.stdout).rstrip()
|
|
stderr = _coerce_subprocess_output(e.stderr).rstrip()
|
|
parts = [f"$ {cmd}", f"<timed out after {timeout:.1f}s>"]
|
|
if stdout:
|
|
parts.append(stdout)
|
|
if stderr:
|
|
parts.extend(["[stderr]", stderr])
|
|
if len(parts) == 2:
|
|
parts.append("<no output>")
|
|
return _truncate_text("\n".join(parts), max_chars)
|
|
|
|
|
|
def _read_proc_start_wall_time(proc_dir: Path) -> str:
|
|
try:
|
|
stat_text = (proc_dir / "stat").read_text()
|
|
rparen = stat_text.rfind(")")
|
|
fields = stat_text[rparen + 2:].split()
|
|
start_ticks = int(fields[19])
|
|
uptime_s = float(Path("/proc/uptime").read_text().split()[0])
|
|
hz = os.sysconf("SC_CLK_TCK")
|
|
start_time_s = time.time() - uptime_s + (start_ticks / hz)
|
|
return time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(start_time_s))
|
|
except Exception as e:
|
|
return f"<parse failed: {e}>"
|
|
|
|
|
|
def _find_process_dirs(match_terms: tuple[str, ...]) -> list[Path]:
|
|
proc_root = Path("/proc")
|
|
try:
|
|
proc_dirs = sorted((p for p in proc_root.iterdir() if p.name.isdigit()), key=lambda p: int(p.name))
|
|
except OSError:
|
|
return []
|
|
|
|
matches = []
|
|
for proc_dir in proc_dirs:
|
|
comm = _read_text_file(proc_dir / "comm", 256).strip()
|
|
cmdline = _read_proc_cmdline(proc_dir, 2048)
|
|
haystacks = (comm, cmdline)
|
|
if any(term in haystack for term in match_terms for haystack in haystacks):
|
|
matches.append(proc_dir)
|
|
return matches
|
|
|
|
|
|
def _read_symlink(path: Path) -> str:
|
|
try:
|
|
return os.readlink(path)
|
|
except OSError as e:
|
|
return f"<read failed: {e}>"
|
|
|
|
|
|
def _describe_path(path: Path) -> str:
|
|
try:
|
|
st = path.stat()
|
|
except OSError as e:
|
|
return f"path={path}\n<stat failed: {e}>"
|
|
|
|
mode = statmod.filemode(st.st_mode)
|
|
if statmod.S_ISSOCK(st.st_mode):
|
|
path_type = "socket"
|
|
elif statmod.S_ISDIR(st.st_mode):
|
|
path_type = "dir"
|
|
elif statmod.S_ISREG(st.st_mode):
|
|
path_type = "file"
|
|
else:
|
|
path_type = "other"
|
|
|
|
return "\n".join([
|
|
f"path={path}",
|
|
f"type={path_type}",
|
|
f"mode={mode}",
|
|
f"inode={st.st_ino}",
|
|
f"size={st.st_size}",
|
|
f"mtime={time.strftime('%Y-%m-%dT%H:%M:%S%z', time.localtime(st.st_mtime))}",
|
|
f"mtime_ns={st.st_mtime_ns}",
|
|
f"ctime_ns={st.st_ctime_ns}",
|
|
])
|
|
|
|
|
|
def _proc_net_unix_matches(path: Path) -> str:
|
|
path_str = str(path)
|
|
try:
|
|
rows = [line for line in Path("/proc/net/unix").read_text().splitlines() if path_str in line]
|
|
except OSError as e:
|
|
return f"<read failed: {e}>"
|
|
|
|
return "\n".join(rows) if rows else "<no matches>"
|
|
|
|
|
|
def _resolve_wayland_socket(runtime_dir: Path | None, display: str | None) -> Path | None:
|
|
if not display:
|
|
return runtime_dir / "wayland-0" if runtime_dir is not None else None
|
|
|
|
socket_path = Path(display)
|
|
if socket_path.is_absolute():
|
|
return socket_path
|
|
if runtime_dir is None:
|
|
return None
|
|
return runtime_dir / socket_path
|
|
|
|
|
|
def _append_ui_watchdog_context(lines: list[str], ui_proc_dir: Path) -> None:
|
|
ui_env = _read_proc_environ(ui_proc_dir)
|
|
weston_procs = _find_process_dirs(("weston",))
|
|
weston_envs = [_read_proc_environ(proc_dir) for proc_dir in weston_procs]
|
|
|
|
lines.extend([
|
|
"== ui proc cmdline ==",
|
|
_read_proc_cmdline(ui_proc_dir),
|
|
"",
|
|
"== ui proc exe ==",
|
|
_read_symlink(ui_proc_dir / "exe"),
|
|
"",
|
|
"== ui proc env ==",
|
|
_format_env_subset(ui_env),
|
|
"",
|
|
])
|
|
|
|
for proc_dir, env in zip(weston_procs, weston_envs):
|
|
lines.extend([
|
|
f"== weston process {proc_dir.name} ==",
|
|
"-- cmdline --",
|
|
_read_proc_cmdline(proc_dir),
|
|
"",
|
|
"-- exe --",
|
|
_read_symlink(proc_dir / "exe"),
|
|
"",
|
|
"-- start wall time --",
|
|
_read_proc_start_wall_time(proc_dir),
|
|
"",
|
|
"-- status --",
|
|
_read_text_file(proc_dir / "status"),
|
|
"",
|
|
"-- wchan --",
|
|
_read_text_file(proc_dir / "wchan", 1024),
|
|
"",
|
|
"-- syscall --",
|
|
_read_text_file(proc_dir / "syscall", 2048),
|
|
"",
|
|
"-- env --",
|
|
_format_env_subset(env),
|
|
"",
|
|
])
|
|
|
|
runtime_dirs: set[Path] = set()
|
|
socket_paths: set[Path] = set()
|
|
|
|
for env in (ui_env, *weston_envs):
|
|
runtime_dir = Path(env["XDG_RUNTIME_DIR"]) if env.get("XDG_RUNTIME_DIR") else None
|
|
if runtime_dir is not None:
|
|
runtime_dirs.add(runtime_dir)
|
|
socket_path = _resolve_wayland_socket(runtime_dir, env.get("WAYLAND_DISPLAY"))
|
|
if socket_path is not None:
|
|
socket_paths.add(socket_path)
|
|
|
|
if not runtime_dirs:
|
|
for candidate in (Path("/run/user/1000"), Path("/tmp")):
|
|
if candidate.exists():
|
|
runtime_dirs.add(candidate)
|
|
|
|
for runtime_dir in sorted(runtime_dirs):
|
|
lines.extend([
|
|
f"== runtime dir {runtime_dir} ==",
|
|
_describe_path(runtime_dir),
|
|
"",
|
|
])
|
|
|
|
candidates = {path for path in socket_paths if path.parent == runtime_dir}
|
|
if runtime_dir.exists():
|
|
candidates.update(runtime_dir.glob("wayland-*"))
|
|
candidates.update(runtime_dir.glob("weston*"))
|
|
|
|
for candidate in sorted(candidates):
|
|
lines.extend([
|
|
f"-- wayland path {candidate.name} --",
|
|
_describe_path(candidate),
|
|
"-- /proc/net/unix --",
|
|
_proc_net_unix_matches(candidate),
|
|
"",
|
|
])
|
|
|
|
for socket_path in sorted(socket_paths):
|
|
if socket_path.parent not in runtime_dirs:
|
|
lines.extend([
|
|
f"== wayland path {socket_path} ==",
|
|
_describe_path(socket_path),
|
|
"-- /proc/net/unix --",
|
|
_proc_net_unix_matches(socket_path),
|
|
"",
|
|
])
|
|
|
|
commands = [
|
|
(
|
|
"== systemctl weston ==",
|
|
"systemctl show weston.service weston-ready.service "
|
|
"--property=Id,ActiveState,SubState,MainPID,ExecMainStartTimestamp,ExecMainStartTimestampMonotonic,ExecMainStatus "
|
|
"--no-pager 2>/dev/null || true",
|
|
1.5,
|
|
),
|
|
(
|
|
"== journalctl weston ==",
|
|
"journalctl -u weston.service -u weston-ready.service -n 120 --no-pager 2>&1 || true",
|
|
1.5,
|
|
),
|
|
(
|
|
"== recent swaglog weston/ui ==",
|
|
"grep -RinE 'Watchdog timeout for ui|weston|Wayland|EGL|gbm|drm|GPU|segfault|SIGSEGV|OOM|fatal' "
|
|
"/data/log/swaglog* 2>/dev/null | tail -n 160 || true",
|
|
1.5,
|
|
),
|
|
(
|
|
"== dmesg weston/gpu ==",
|
|
"dmesg 2>/dev/null | grep -Ei 'drm|gpu|msm|kgsl|weston|wayland|segfault|oom|hang' | tail -n 160 || true",
|
|
1.5,
|
|
),
|
|
]
|
|
|
|
for title, cmd, timeout in commands:
|
|
lines.extend([
|
|
title,
|
|
_run_debug_command(cmd, timeout=timeout),
|
|
"",
|
|
])
|
|
|
|
|
|
def _capture_watchdog_debug_dump(name: str, pid: int, reason: str, dt: float) -> None:
|
|
try:
|
|
proc_dir = Path(f"/proc/{pid}")
|
|
if not proc_dir.exists():
|
|
cloudlog.error(f"watchdog debug dump skipped for {name}: /proc/{pid} no longer exists")
|
|
return
|
|
|
|
dump_path = _debug_dump_dir() / f"{name}_watchdog_dump_{pid}_{time.monotonic_ns()}.log"
|
|
lines = [
|
|
f"name={name}",
|
|
f"pid={pid}",
|
|
f"dt={dt:.3f}",
|
|
f"reason={reason}",
|
|
f"wall_time={time.strftime('%Y-%m-%dT%H:%M:%S%z')}",
|
|
f"watchdog_file={WATCHDOG_FN}{pid}",
|
|
"",
|
|
"== /proc/status ==",
|
|
_read_text_file(proc_dir / "status"),
|
|
"",
|
|
"== /proc/wchan ==",
|
|
_read_text_file(proc_dir / "wchan", 1024),
|
|
"",
|
|
"== /proc/syscall ==",
|
|
_read_text_file(proc_dir / "syscall", 2048),
|
|
"",
|
|
]
|
|
|
|
watchdog_file = Path(f"{WATCHDOG_FN}{pid}")
|
|
lines.extend([
|
|
"== watchdog file ==",
|
|
_describe_path(watchdog_file),
|
|
"",
|
|
])
|
|
|
|
if name == "ui":
|
|
_append_ui_watchdog_context(lines, proc_dir)
|
|
|
|
task_dir = proc_dir / "task"
|
|
try:
|
|
task_entries = sorted(task_dir.iterdir(), key=lambda p: int(p.name))
|
|
except OSError as e:
|
|
task_entries = []
|
|
lines.extend([
|
|
"== /proc/task ==",
|
|
f"<read failed: {e}>",
|
|
"",
|
|
])
|
|
|
|
for entry in task_entries:
|
|
lines.extend([
|
|
f"== thread {entry.name} ==",
|
|
"-- comm --",
|
|
_read_text_file(entry / "comm", 1024),
|
|
"",
|
|
"-- wchan --",
|
|
_read_text_file(entry / "wchan", 1024),
|
|
"",
|
|
"-- syscall --",
|
|
_read_text_file(entry / "syscall", 2048),
|
|
"",
|
|
"-- status --",
|
|
_read_text_file(entry / "status"),
|
|
"",
|
|
"-- stack --",
|
|
_read_text_file(entry / "stack"),
|
|
"",
|
|
])
|
|
|
|
try:
|
|
dump_path.write_text("\n".join(lines))
|
|
cloudlog.error(f"Wrote watchdog debug dump for {name} to {dump_path}")
|
|
except OSError as e:
|
|
cloudlog.error(f"failed to write watchdog debug dump for {name} to {dump_path}: {e}")
|
|
except Exception:
|
|
cloudlog.exception(f"failed to capture watchdog debug dump for {name} pid={pid}")
|
|
|
|
|
|
def launcher(proc: str, name: str, nice: int | None = None) -> None:
|
|
try:
|
|
if nice is not None:
|
|
os.nice(nice)
|
|
|
|
# import the process
|
|
mod = importlib.import_module(proc)
|
|
|
|
# rename the process
|
|
setproctitle(proc)
|
|
|
|
# create new context since we forked
|
|
messaging.reset_context()
|
|
|
|
# add daemon name tag to logs
|
|
cloudlog.bind(daemon=name)
|
|
sentry.set_tag("daemon", name)
|
|
|
|
# exec the process
|
|
mod.main()
|
|
except KeyboardInterrupt:
|
|
cloudlog.warning(f"child {proc} got SIGINT")
|
|
except Exception:
|
|
# can't install the crash handler because sys.excepthook doesn't play nice
|
|
# with threads, so catch it here.
|
|
sentry.capture_exception()
|
|
raise
|
|
|
|
|
|
def nativelauncher(pargs: list[str], cwd: str, name: str, nice: int | None = None) -> None:
|
|
if nice is not None:
|
|
os.nice(nice)
|
|
|
|
os.environ['MANAGER_DAEMON'] = name
|
|
|
|
# exec the process
|
|
os.chdir(cwd)
|
|
os.execvp(pargs[0], pargs)
|
|
|
|
|
|
def join_process(process: Process, timeout: float) -> None:
|
|
# Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382
|
|
# We have to poll the exitcode instead
|
|
t = time.monotonic()
|
|
while time.monotonic() - t < timeout and process.exitcode is None:
|
|
time.sleep(0.001)
|
|
|
|
|
|
class ManagerProcess(ABC):
|
|
daemon = False
|
|
sigkill = False
|
|
should_run: Callable[[bool, Params, car.CarParams, SimpleNamespace], bool]
|
|
proc: Process | None = None
|
|
enabled = True
|
|
name = ""
|
|
|
|
last_watchdog_time = 0
|
|
watchdog_max_dt: int | None = None
|
|
watchdog_seen = False
|
|
shutting_down = False
|
|
|
|
@abstractmethod
|
|
def prepare(self) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def start(self) -> None:
|
|
pass
|
|
|
|
def restart(self) -> None:
|
|
self.stop(sig=signal.SIGKILL)
|
|
self.start()
|
|
|
|
def capture_watchdog_debug_dump(self, reason: str, dt: float) -> None:
|
|
if self.proc is None or self.proc.pid is None:
|
|
return
|
|
|
|
_capture_watchdog_debug_dump(self.name, self.proc.pid, reason, dt)
|
|
|
|
def capture_watchdog_debug_dump_async(self, reason: str, dt: float) -> None:
|
|
if self.proc is None or self.proc.pid is None:
|
|
return
|
|
|
|
# Capture watchdog evidence in the background so managerState stays responsive.
|
|
thread = threading.Thread(
|
|
name=f"{self.name}_watchdog_dump",
|
|
target=_capture_watchdog_debug_dump,
|
|
args=(self.name, self.proc.pid, reason, dt),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def check_watchdog(self, started: bool) -> None:
|
|
if self.watchdog_max_dt is None or self.proc is None:
|
|
return
|
|
|
|
try:
|
|
fn = WATCHDOG_FN + str(self.proc.pid)
|
|
with open(fn, "rb") as f:
|
|
self.last_watchdog_time = struct.unpack('Q', f.read())[0]
|
|
self.watchdog_seen = True
|
|
except Exception:
|
|
if not self.watchdog_seen:
|
|
return
|
|
|
|
dt = time.monotonic() - self.last_watchdog_time / 1e9
|
|
if dt > self.watchdog_max_dt and ENABLE_WATCHDOG:
|
|
self.capture_watchdog_debug_dump_async(f"watchdog_timeout started={started}", dt)
|
|
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
|
|
self.restart()
|
|
|
|
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
|
|
if self.proc is None:
|
|
return None
|
|
|
|
if self.proc.exitcode is None:
|
|
if not self.shutting_down:
|
|
cloudlog.info(f"killing {self.name}")
|
|
if sig is None:
|
|
sig = signal.SIGKILL if self.sigkill else signal.SIGINT
|
|
self.signal(sig)
|
|
self.shutting_down = True
|
|
|
|
if not block:
|
|
return None
|
|
|
|
join_process(self.proc, 5)
|
|
|
|
# If process failed to die send SIGKILL
|
|
if self.proc.exitcode is None and retry:
|
|
cloudlog.info(f"killing {self.name} with SIGKILL")
|
|
self.signal(signal.SIGKILL)
|
|
self.proc.join()
|
|
|
|
ret = self.proc.exitcode
|
|
cloudlog.info(f"{self.name} is dead with {ret}")
|
|
|
|
if self.proc.exitcode is not None:
|
|
self.shutting_down = False
|
|
self.proc = None
|
|
|
|
return ret
|
|
|
|
def signal(self, sig: int) -> None:
|
|
if self.proc is None:
|
|
return
|
|
|
|
# Don't signal if already exited
|
|
if self.proc.exitcode is not None and self.proc.pid is not None:
|
|
return
|
|
|
|
# Can't signal if we don't have a pid
|
|
if self.proc.pid is None:
|
|
return
|
|
|
|
cloudlog.info(f"sending signal {sig} to {self.name}")
|
|
os.kill(self.proc.pid, sig)
|
|
|
|
def get_process_state_msg(self):
|
|
state = log.ManagerState.ProcessState.new_message()
|
|
state.name = self.name
|
|
if self.proc:
|
|
state.running = self.proc.is_alive()
|
|
state.shouldBeRunning = self.proc is not None and not self.shutting_down
|
|
state.pid = self.proc.pid or 0
|
|
state.exitCode = self.proc.exitcode or 0
|
|
return state
|
|
|
|
|
|
class NativeProcess(ManagerProcess):
|
|
def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, nice=None):
|
|
self.name = name
|
|
self.cwd = cwd
|
|
self.cmdline = cmdline
|
|
self.should_run = should_run
|
|
self.enabled = enabled
|
|
self.sigkill = sigkill
|
|
self.watchdog_max_dt = watchdog_max_dt
|
|
self.nice = nice
|
|
self.launcher = nativelauncher
|
|
|
|
def prepare(self) -> None:
|
|
pass
|
|
|
|
def start(self) -> None:
|
|
# In case we only tried a non blocking stop we need to stop it before restarting
|
|
if self.shutting_down:
|
|
self.stop()
|
|
|
|
if self.proc is not None:
|
|
return
|
|
|
|
cwd = os.path.join(BASEDIR, self.cwd)
|
|
cloudlog.info(f"starting process {self.name}")
|
|
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name, self.nice))
|
|
self.proc.start()
|
|
self.last_watchdog_time = 0
|
|
self.watchdog_seen = False
|
|
self.shutting_down = False
|
|
|
|
|
|
class PythonProcess(ManagerProcess):
|
|
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, nice=None):
|
|
self.name = name
|
|
self.module = module
|
|
self.should_run = should_run
|
|
self.enabled = enabled
|
|
self.sigkill = sigkill
|
|
self.watchdog_max_dt = watchdog_max_dt
|
|
self.nice = nice
|
|
self.launcher = launcher
|
|
|
|
def prepare(self) -> None:
|
|
if self.enabled:
|
|
cloudlog.info(f"preimporting {self.module}")
|
|
importlib.import_module(self.module)
|
|
|
|
def start(self) -> None:
|
|
# In case we only tried a non blocking stop we need to stop it before restarting
|
|
if self.shutting_down:
|
|
self.stop()
|
|
|
|
if self.proc is not None:
|
|
return
|
|
|
|
# TODO: this is just a workaround for this tinygrad check:
|
|
# https://github.com/tinygrad/tinygrad/blob/ac9c96dae1656dc220ee4acc39cef4dd449aa850/tinygrad/device.py#L26
|
|
name = self.name if "modeld" not in self.name else "MainProcess"
|
|
|
|
cloudlog.info(f"starting python {self.module}")
|
|
self.proc = Process(name=name, target=self.launcher, args=(self.module, self.name, self.nice))
|
|
self.proc.start()
|
|
self.last_watchdog_time = 0
|
|
self.watchdog_seen = False
|
|
self.shutting_down = False
|
|
|
|
|
|
class DaemonProcess(ManagerProcess):
|
|
"""Python process that has to stay running across manager restart.
|
|
This is used for athena so you don't lose SSH access when restarting manager."""
|
|
def __init__(self, name, module, param_name, enabled=True):
|
|
self.name = name
|
|
self.module = module
|
|
self.param_name = param_name
|
|
self.enabled = enabled
|
|
self.params = None
|
|
|
|
@staticmethod
|
|
def should_run(started, params, CP, starpilot_toggles):
|
|
return True
|
|
|
|
def prepare(self) -> None:
|
|
pass
|
|
|
|
def start(self) -> None:
|
|
if self.params is None:
|
|
self.params = Params()
|
|
|
|
pid = self.params.get(self.param_name)
|
|
if pid is not None:
|
|
try:
|
|
os.kill(int(pid), 0)
|
|
with open(f'/proc/{pid}/cmdline') as f:
|
|
if self.module in f.read():
|
|
# daemon is running
|
|
return
|
|
except (OSError, FileNotFoundError):
|
|
# process is dead
|
|
pass
|
|
|
|
cloudlog.info(f"starting daemon {self.name}")
|
|
proc = subprocess.Popen(['python', '-m', self.module],
|
|
stdin=open('/dev/null'),
|
|
stdout=open('/dev/null', 'w'),
|
|
stderr=open('/dev/null', 'w'),
|
|
preexec_fn=os.setpgrp)
|
|
|
|
self.params.put(self.param_name, proc.pid)
|
|
|
|
def stop(self, retry=True, block=True, sig=None) -> None:
|
|
pass
|
|
|
|
|
|
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None,
|
|
not_run: list[str] | None=None, starpilot_toggles: SimpleNamespace=None) -> list[ManagerProcess]:
|
|
if not_run is None:
|
|
not_run = []
|
|
|
|
running = []
|
|
for p in procs:
|
|
# Reap crashed processes so they can be cleanly restarted below.
|
|
if p.proc is not None and p.proc.exitcode is not None and not p.shutting_down:
|
|
cloudlog.error(f"Process {p.name} crashed with exitcode {p.proc.exitcode}, restarting")
|
|
p.stop(retry=False)
|
|
|
|
if p.enabled and p.name not in not_run and p.should_run(started, params, CP, starpilot_toggles):
|
|
running.append(p)
|
|
else:
|
|
p.stop(block=False)
|
|
|
|
p.check_watchdog(started)
|
|
|
|
for p in running:
|
|
p.start()
|
|
|
|
return running
|