Files
StarPilot/selfdrive/monitoring/dmonitoringd.py
firestar5683 5d86b7ac5b IsRHD
2026-05-31 21:03:42 -05:00

72 lines
2.5 KiB
Python

#!/usr/bin/env python3
import cereal.messaging as messaging
from opendbc.car import structs
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process
from openpilot.selfdrive.monitoring.helpers import DriverMonitoring
GearShifter = structs.CarState.GearShifter
def get_rhd_override(params):
return params.get_bool("IsRHD") if params.get_bool("IsRHDOverride") else None
def dmonitoringd_thread():
config_realtime_process([0, 1, 2, 3], 5)
params = Params()
pm = messaging.PubMaster(['driverMonitoringState'])
sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'selfdriveState', 'modelV2'], poll='driverStateV2')
DM = DriverMonitoring(
rhd_saved=params.get_bool("IsRhdDetected"),
always_on=params.get_bool("AlwaysOnDM"),
rhd_override=get_rhd_override(params),
)
demo_mode=False
sm = sm.extend(['starpilotCarState'])
driver_view_enabled = params.get_bool("IsDriverViewEnabled")
# 20Hz <- dmonitoringmodeld
while True:
sm.update()
if not sm.updated['driverStateV2']:
# iterate when model has new output
continue
valid = sm.all_checks()
if demo_mode and sm.valid['driverStateV2']:
DM.run_step(sm, demo=demo_mode)
elif valid:
DM.run_step(sm, demo=demo_mode)
elif driver_view_enabled:
DM.face_detected = sm['driverStateV2'].leftDriverData.faceProb > DM.settings._FACE_THRESHOLD or sm['driverStateV2'].rightDriverData.faceProb > DM.settings._FACE_THRESHOLD
# publish
dat = DM.get_state_packet(valid=valid or driver_view_enabled)
pm.send('driverMonitoringState', dat)
# load live always-on toggle
if sm['driverStateV2'].frameId % 40 == 1:
DM.always_on = params.get_bool("AlwaysOnDM")
DM.wheel_on_right_default = params.get_bool("IsRhdDetected")
DM.wheel_on_right_override = get_rhd_override(params)
demo_mode = params.get_bool("IsDriverViewEnabled") and sm["carState"].gearShifter != GearShifter.reverse
# save rhd virtual toggle every 5 mins
if (DM.wheel_on_right_override is None and sm['driverStateV2'].frameId % 6000 == 0 and not demo_mode and
DM.wheelpos.prob_offseter.filtered_stat.n > DM.settings._WHEELPOS_FILTER_MIN_COUNT and
DM.wheel_on_right == (DM.wheelpos.prob_offseter.filtered_stat.M > DM.settings._WHEELPOS_THRESHOLD)):
params.put_bool_nonblocking("IsRhdDetected", DM.wheel_on_right)
params.put_bool_nonblocking("IsRHD", DM.wheel_on_right)
def main():
dmonitoringd_thread()
if __name__ == '__main__':
main()