0423d12c9c
* Update carrot_functions.py * fix.. atc.. * TR16 model. * fix.. atc * Adjust interpolation value for t_follow calculation * fix safeMode.. * fix safeMode2 * fix.. v_cruise * model_turn_speed.. * fix.. * fix.. * fix.. cruise.py * fix.. modelTurn.. * fix stopped car safe mode. * model turn 120% * remove model turn speed.. * paramsd... * Revert "remove model turn speed.." This reverts commit 564e9dd609d63215687551a2bc9bfee0141563e1. * model_turn_speed... 120 -> 115% * starting achange cost 30 -> 10 * fix.. * aChangeCostStarting * fix.. * gwm v7 * Adjust traffic stop distance parameter * Update carrot_functions.py * update gwm 250929 * trafficStopDistance adjust * localizer_roll_std * scc13 * fix... * fix.. scc13 * scc14 * bypass scc13 * fix scc13 * TheCoolPeople's Model * North Nevada Model * Revert "model_turn_speed... 120 -> 115%" This reverts commit e842a7e99fd6af82fd64d10bf0a08441e9150e5d. * Reapply "remove model turn speed.." This reverts commit 544ac168114814df5293f4a7481ea4769bc4b6b3. * for c3x lite (#218) add hardware c3x lite * NNV(North Nevada) v2 * fix.. * Nuggets In Dijon Model * toyota accel pid long * for c3xlite fix (#219) * LatSmoothSec * Revert "Reapply "remove model turn speed.."" This reverts commit 2c10aae495cc0b601001634d8ee7eaa07faf0022. * apply livePose * releases 251017 --------- Co-authored-by: 「 crwusiz 」 <43285072+crwusiz@users.noreply.github.com>
140 lines
4.3 KiB
Python
Executable File
140 lines
4.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import subprocess
|
|
import time
|
|
from cereal import car, messaging
|
|
from openpilot.common.realtime import Ratekeeper
|
|
import threading
|
|
|
|
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
|
|
|
|
class Beepd:
|
|
def __init__(self):
|
|
self.current_alert = AudibleAlert.none
|
|
self.enable_gpio()
|
|
self.startup_beep()
|
|
|
|
def enable_gpio(self):
|
|
# 尝试 export,忽略已 export 的错误
|
|
try:
|
|
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
|
|
shell=True,
|
|
stderr=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
encoding='utf8')
|
|
except Exception:
|
|
pass
|
|
subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction",
|
|
shell=True,
|
|
stderr=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
encoding='utf8')
|
|
|
|
def _beep(self, on):
|
|
val = "1" if on else "0"
|
|
subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value",
|
|
shell=True,
|
|
stderr=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
encoding='utf8')
|
|
|
|
def engage(self):
|
|
self._beep(True)
|
|
time.sleep(0.05)
|
|
self._beep(False)
|
|
|
|
def disengage(self):
|
|
for _ in range(2):
|
|
self._beep(True)
|
|
time.sleep(0.01)
|
|
self._beep(False)
|
|
time.sleep(0.01)
|
|
|
|
def warning(self):
|
|
for _ in range(3):
|
|
self._beep(True)
|
|
time.sleep(0.01)
|
|
self._beep(False)
|
|
time.sleep(0.01)
|
|
|
|
def startup_beep(self):
|
|
self._beep(True)
|
|
time.sleep(0.1)
|
|
self._beep(False)
|
|
|
|
def ding(self):
|
|
self._beep(True)
|
|
time.sleep(0.02)
|
|
self._beep(False)
|
|
|
|
def dong(self):
|
|
self._beep(True)
|
|
time.sleep(0.03)
|
|
self._beep(False)
|
|
|
|
def beep(self):
|
|
self._beep(True)
|
|
time.sleep(0.04)
|
|
self._beep(False)
|
|
|
|
def dispatch_beep(self, func):
|
|
threading.Thread(target=func, daemon=True).start()
|
|
|
|
def update_alert(self, new_alert):
|
|
if new_alert != self.current_alert:
|
|
self.current_alert = new_alert
|
|
print(f"[BEEP] New alert: {new_alert}")
|
|
if new_alert == AudibleAlert.engage:
|
|
self.dispatch_beep(self.engage)
|
|
elif new_alert == AudibleAlert.disengage:
|
|
self.dispatch_beep(self.disengage)
|
|
elif new_alert in [AudibleAlert.refuse, AudibleAlert.prompt, AudibleAlert.warningImmediate,AudibleAlert.warningSoft]:
|
|
self.dispatch_beep(self.warning)
|
|
elif new_alert in [AudibleAlert.longEngaged, AudibleAlert.longDisengaged, AudibleAlert.trafficSignGreen, AudibleAlert.trafficSignChanged, AudibleAlert.trafficError, AudibleAlert.bsdWarning, AudibleAlert.laneChange]:
|
|
self.dispatch_beep(self.ding)
|
|
elif new_alert in [AudibleAlert.stopStop, AudibleAlert.stopping, AudibleAlert.autoHold, AudibleAlert.engage2, AudibleAlert.disengage2, AudibleAlert.speedDown, AudibleAlert.audioTurn, AudibleAlert.reverseGear]:
|
|
self.dispatch_beep(self.dong)
|
|
elif new_alert in [AudibleAlert.audio1, AudibleAlert.audio2, AudibleAlert.audio3, AudibleAlert.audio4, AudibleAlert.audio5,
|
|
AudibleAlert.audio6, AudibleAlert.audio7, AudibleAlert.audio8, AudibleAlert.audio9, AudibleAlert.audio10]:
|
|
self.dispatch_beep(self.beep)
|
|
|
|
def get_audible_alert(self, sm):
|
|
if sm.updated['selfdriveState']:
|
|
new_alert = sm['selfdriveState'].alertSound.raw
|
|
self.update_alert(new_alert)
|
|
|
|
def test_beepd_thread(self):
|
|
frame = 0
|
|
rk = Ratekeeper(20)
|
|
pm = messaging.PubMaster(['selfdriveState'])
|
|
while True:
|
|
cs = messaging.new_message('selfdriveState')
|
|
if frame == 40:
|
|
cs.selfdriveState.alertSound = AudibleAlert.engage
|
|
if frame == 60:
|
|
cs.selfdriveState.alertSound = AudibleAlert.disengage
|
|
if frame == 80:
|
|
cs.selfdriveState.alertSound = AudibleAlert.prompt
|
|
|
|
pm.send("selfdriveState", cs)
|
|
frame += 1
|
|
rk.keep_time()
|
|
|
|
def beepd_thread(self, test=False):
|
|
if test:
|
|
threading.Thread(target=self.test_beepd_thread, daemon=True).start()
|
|
|
|
sm = messaging.SubMaster(['selfdriveState'])
|
|
rk = Ratekeeper(20)
|
|
|
|
while True:
|
|
sm.update(0)
|
|
self.get_audible_alert(sm)
|
|
rk.keep_time()
|
|
|
|
def main():
|
|
s = Beepd()
|
|
s.beepd_thread(test=False) # 改成 True 可启用模拟测试数据
|
|
|
|
if __name__ == "__main__":
|
|
main()
|