Files
StarPilot/system/manager/process_config.py
firestar5683 c744f66b2a Optimus Prime
2026-05-28 10:21:01 -05:00

163 lines
8.6 KiB
Python

import os
import operator
import platform
from types import SimpleNamespace
from cereal import car
from openpilot.common.params import Params
from openpilot.system.hardware import HARDWARE, PC, TICI
from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess
WEBCAM = os.getenv("USE_WEBCAM") is not None
UI_WATCHDOG_MAX_DT = int(os.getenv("UI_WATCHDOG_MAX_DT", "10"))
def driverview(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started or params.get_bool("IsDriverViewEnabled")
def notcar(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and CP.notCar
def iscar(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and not CP.notCar
def logging(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
run = (not CP.notCar) or not params.get_bool("DisableLogging")
return started and run
def ublox_available() -> bool:
return os.path.exists('/dev/ttyHS0') and not os.path.exists('/persist/comma/use-quectel-gps')
def ublox(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
use_ublox = ublox_available()
if use_ublox != params.get_bool("UbloxAvailable"):
params.put_bool("UbloxAvailable", use_ublox)
return started and use_ublox
def joystick(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get_bool("JoystickDebugMode")
def not_joystick(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and not params.get_bool("JoystickDebugMode")
def long_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get_bool("LongitudinalManeuverMode") and not params.get_bool("LateralManeuverMode")
def lat_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get_bool("LateralManeuverMode") and not params.get_bool("LongitudinalManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
def qcomgps(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and not ublox_available()
def always_run(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return True
def only_onroad(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started
def only_offroad(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return not started
def or_(*fns):
return lambda *args: operator.or_(*(fn(*args) for fn in fns))
def and_(*fns):
return lambda *args: operator.and_(*(fn(*args) for fn in fns))
# StarPilot variables
def allow_logging(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return not starpilot_toggles.no_logging
def allow_uploads(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return params.get_bool("AlwaysAllowUploads") or not starpilot_toggles.no_uploads or starpilot_toggles.no_onroad_uploads
def run_speed_limit_filler(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return starpilot_toggles.speed_limit_filler
def run_speed_limit_vision(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return starpilot_toggles.vision_speed_limit_detection
def run_navigationd(started: bool, params: Params, CP: car.CarParams, starpilot_toggles: SimpleNamespace) -> bool:
return started and params.get("NavDestination") is not None
procs = [
DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"),
NativeProcess("loggerd", "system/loggerd", ["./loggerd"], and_(allow_logging, logging)),
NativeProcess("encoderd", "system/loggerd", ["./encoderd"], and_(allow_logging, only_onroad)),
NativeProcess("stream_encoderd", "system/loggerd", ["./encoderd", "--stream"], notcar),
PythonProcess("logmessaged", "system.logmessaged", always_run),
NativeProcess("camerad", "system/camerad", ["./camerad"], driverview, enabled=not WEBCAM),
PythonProcess("webcamerad", "tools.webcam.camerad", driverview, enabled=WEBCAM),
PythonProcess("proclogd", "system.proclogd", and_(allow_logging, only_onroad), enabled=platform.system() != "Darwin"),
PythonProcess("journald", "system.journald", and_(allow_logging, only_onroad), platform.system() != "Darwin"),
PythonProcess("micd", "system.micd", iscar),
PythonProcess("timed", "system.timed", always_run, enabled=not PC),
PythonProcess("modeld", "selfdrive.modeld.modeld", only_onroad),
PythonProcess("dmonitoringmodeld", "selfdrive.modeld.dmonitoringmodeld", driverview, enabled=(WEBCAM or not PC)),
PythonProcess("sensord", "system.sensord.sensord", only_onroad, enabled=not PC),
PythonProcess("soundd", "selfdrive.ui.soundd", driverview),
PythonProcess("locationd", "selfdrive.locationd.locationd", only_onroad),
NativeProcess("_pandad", "selfdrive/pandad", ["./pandad"], always_run, enabled=False),
PythonProcess("calibrationd", "selfdrive.locationd.calibrationd", only_onroad),
PythonProcess("torqued", "selfdrive.locationd.torqued", only_onroad),
PythonProcess("controlsd", "selfdrive.controls.controlsd", and_(not_joystick, iscar)),
PythonProcess("joystickd", "tools.joystick.joystickd", or_(joystick, notcar)),
PythonProcess("selfdrived", "selfdrive.selfdrived.selfdrived", only_onroad),
PythonProcess("card", "selfdrive.car.card", only_onroad),
PythonProcess("deleter", "system.loggerd.deleter", always_run),
PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(WEBCAM or not PC)),
PythonProcess("qcomgpsd", "system.qcomgpsd.qcomgpsd", qcomgps, enabled=TICI),
PythonProcess("pandad", "selfdrive.pandad.pandad", always_run),
PythonProcess("paramsd", "selfdrive.locationd.paramsd", only_onroad),
PythonProcess("lagd", "selfdrive.locationd.lagd", only_onroad),
PythonProcess("ubloxd", "system.ubloxd.ubloxd", ublox, enabled=TICI),
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("lateral_maneuversd", "tools.lateral_maneuvers.lateral_maneuversd", lat_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),
PythonProcess("updated", "system.updated.updated", always_run, enabled=not PC),
PythonProcess("uploader", "system.loggerd.uploader", allow_uploads),
PythonProcess("statsd", "system.statsd", always_run),
PythonProcess("feedbackd", "selfdrive.ui.feedback.feedbackd", only_onroad),
# debug procs
NativeProcess("bridge", "cereal/messaging", ["./bridge"], notcar),
PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar),
PythonProcess("webjoystick", "tools.bodyteleop.web", notcar),
PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)),
]
# StarPilot variables
procs += [
PythonProcess("the_pond", "starpilot.system.the_pond.the_pond", always_run, nice=19),
PythonProcess("galaxy", "starpilot.system.galaxy.galaxy", always_run, nice=19),
]
device_type = HARDWARE.get_device_type()
if device_type in ("tici", "tizi"):
procs.append(NativeProcess("ui", "selfdrive/ui", ["./ui"], always_run, watchdog_max_dt=UI_WATCHDOG_MAX_DT))
else:
# C4 (mici) runs the Python raylib UI path.
procs.append(PythonProcess("ui", "selfdrive.ui.ui", always_run, watchdog_max_dt=UI_WATCHDOG_MAX_DT))
procs += [
PythonProcess("device_syncd", "starpilot.system.device_syncd", always_run),
PythonProcess("starpilot_process", "starpilot.starpilot_process", always_run),
PythonProcess("mapd", "starpilot.navigation.mapd_wrapper", always_run, nice=19),
PythonProcess("navigationd", "starpilot.navigation.navigationd", run_navigationd, nice=19),
PythonProcess("speed_limit_filler", "starpilot.system.speed_limit_filler", run_speed_limit_filler, nice=19),
PythonProcess("speed_limit_vision", "starpilot.system.speed_limit_vision", run_speed_limit_vision, nice=19),
]
managed_processes = {p.name: p for p in procs}