Files
onepilot/selfdrive/controls/beep.py
T
carrot 0423d12c9c Nuggets In Dijon model, C3xLite, livePose (#221)
* Update carrot_functions.py

* fix.. atc..

* TR16 model.

* fix.. atc

* Adjust interpolation value for t_follow calculation

* fix safeMode..

* fix safeMode2

* fix.. v_cruise

* model_turn_speed..

* fix..

* fix..

* fix.. cruise.py

* fix.. modelTurn..

* fix stopped car safe mode.

* model turn 120%

* remove model turn speed..

* paramsd...

* Revert "remove model turn speed.."

This reverts commit 564e9dd609d63215687551a2bc9bfee0141563e1.

* model_turn_speed...  120 -> 115%

* starting achange cost 30 -> 10

* fix..

* aChangeCostStarting

* fix..

* gwm v7

* Adjust traffic stop distance parameter

* Update carrot_functions.py

* update gwm 250929

* trafficStopDistance adjust

* localizer_roll_std

* scc13

* fix...

* fix.. scc13

* scc14

* bypass scc13

* fix scc13

* TheCoolPeople's Model

* North Nevada Model

* Revert "model_turn_speed...  120 -> 115%"

This reverts commit e842a7e99fd6af82fd64d10bf0a08441e9150e5d.

* Reapply "remove model turn speed.."

This reverts commit 544ac168114814df5293f4a7481ea4769bc4b6b3.

* for c3x lite (#218)

add hardware c3x lite

* NNV(North Nevada) v2

* fix..

* Nuggets In Dijon Model

* toyota accel pid long

* for c3xlite fix (#219)

* LatSmoothSec

* Revert "Reapply "remove model turn speed..""

This reverts commit 2c10aae495cc0b601001634d8ee7eaa07faf0022.

* apply livePose

* releases 251017

---------

Co-authored-by: 「 crwusiz 」 <43285072+crwusiz@users.noreply.github.com>
2025-10-17 12:34:13 +09:00

140 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess
import time
from cereal import car, messaging
from openpilot.common.realtime import Ratekeeper
import threading
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
class Beepd:
def __init__(self):
self.current_alert = AudibleAlert.none
self.enable_gpio()
self.startup_beep()
def enable_gpio(self):
# 尝试 export,忽略已 export 的错误
try:
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
except Exception:
pass
subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def _beep(self, on):
val = "1" if on else "0"
subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def engage(self):
self._beep(True)
time.sleep(0.05)
self._beep(False)
def disengage(self):
for _ in range(2):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def warning(self):
for _ in range(3):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def startup_beep(self):
self._beep(True)
time.sleep(0.1)
self._beep(False)
def ding(self):
self._beep(True)
time.sleep(0.02)
self._beep(False)
def dong(self):
self._beep(True)
time.sleep(0.03)
self._beep(False)
def beep(self):
self._beep(True)
time.sleep(0.04)
self._beep(False)
def dispatch_beep(self, func):
threading.Thread(target=func, daemon=True).start()
def update_alert(self, new_alert):
if new_alert != self.current_alert:
self.current_alert = new_alert
print(f"[BEEP] New alert: {new_alert}")
if new_alert == AudibleAlert.engage:
self.dispatch_beep(self.engage)
elif new_alert == AudibleAlert.disengage:
self.dispatch_beep(self.disengage)
elif new_alert in [AudibleAlert.refuse, AudibleAlert.prompt, AudibleAlert.warningImmediate,AudibleAlert.warningSoft]:
self.dispatch_beep(self.warning)
elif new_alert in [AudibleAlert.longEngaged, AudibleAlert.longDisengaged, AudibleAlert.trafficSignGreen, AudibleAlert.trafficSignChanged, AudibleAlert.trafficError, AudibleAlert.bsdWarning, AudibleAlert.laneChange]:
self.dispatch_beep(self.ding)
elif new_alert in [AudibleAlert.stopStop, AudibleAlert.stopping, AudibleAlert.autoHold, AudibleAlert.engage2, AudibleAlert.disengage2, AudibleAlert.speedDown, AudibleAlert.audioTurn, AudibleAlert.reverseGear]:
self.dispatch_beep(self.dong)
elif new_alert in [AudibleAlert.audio1, AudibleAlert.audio2, AudibleAlert.audio3, AudibleAlert.audio4, AudibleAlert.audio5,
AudibleAlert.audio6, AudibleAlert.audio7, AudibleAlert.audio8, AudibleAlert.audio9, AudibleAlert.audio10]:
self.dispatch_beep(self.beep)
def get_audible_alert(self, sm):
if sm.updated['selfdriveState']:
new_alert = sm['selfdriveState'].alertSound.raw
self.update_alert(new_alert)
def test_beepd_thread(self):
frame = 0
rk = Ratekeeper(20)
pm = messaging.PubMaster(['selfdriveState'])
while True:
cs = messaging.new_message('selfdriveState')
if frame == 40:
cs.selfdriveState.alertSound = AudibleAlert.engage
if frame == 60:
cs.selfdriveState.alertSound = AudibleAlert.disengage
if frame == 80:
cs.selfdriveState.alertSound = AudibleAlert.prompt
pm.send("selfdriveState", cs)
frame += 1
rk.keep_time()
def beepd_thread(self, test=False):
if test:
threading.Thread(target=self.test_beepd_thread, daemon=True).start()
sm = messaging.SubMaster(['selfdriveState'])
rk = Ratekeeper(20)
while True:
sm.update(0)
self.get_audible_alert(sm)
rk.keep_time()
def main():
s = Beepd()
s.beepd_thread(test=False) # 改成 True 可启用模拟测试数据
if __name__ == "__main__":
main()