Files
StarPilot/system/micd.py
firestar5683 d0e1db6766 StarPilot
2026-03-22 03:15:05 -05:00

161 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
import numpy as np
from functools import cache
import threading
import time
from cereal import messaging
from openpilot.common.realtime import Ratekeeper
from openpilot.common.utils import retry
from openpilot.common.swaglog import cloudlog
RATE = 10
FFT_SAMPLES = 1600 # 100ms
REFERENCE_SPL = 2e-5 # newtons/m^2
SAMPLE_RATE = 16000
SAMPLE_BUFFER = 800 # 50ms
@cache
def get_a_weighting_filter():
# Calculate the A-weighting filter
# https://en.wikipedia.org/wiki/A-weighting
freqs = np.fft.fftfreq(FFT_SAMPLES, d=1 / SAMPLE_RATE)
A = 12194 ** 2 * freqs ** 4 / ((freqs ** 2 + 20.6 ** 2) * (freqs ** 2 + 12194 ** 2) * np.sqrt((freqs ** 2 + 107.7 ** 2) * (freqs ** 2 + 737.9 ** 2)))
return A / np.max(A)
def calculate_spl(measurements):
# https://www.engineeringtoolbox.com/sound-pressure-d_711.html
sound_pressure = np.sqrt(np.mean(measurements ** 2)) # RMS of amplitudes
if sound_pressure > 0:
sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL) # dB
else:
sound_pressure_level = 0
return sound_pressure, sound_pressure_level
def apply_a_weighting(measurements: np.ndarray) -> np.ndarray:
# Generate a Hanning window of the same length as the audio measurements
measurements_windowed = measurements * np.hanning(len(measurements))
# Apply the A-weighting filter to the signal
return np.abs(np.fft.ifft(np.fft.fft(measurements_windowed) * get_a_weighting_filter()))
class Mic:
def __init__(self):
self.rk = Ratekeeper(RATE)
self.pm = messaging.PubMaster(['soundPressure', 'rawAudioData'])
self.measurements = np.empty(0)
self.sound_pressure = 0
self.sound_pressure_weighted = 0
self.sound_pressure_level_weighted = 0
self.lock = threading.Lock()
def update(self):
with self.lock:
sound_pressure = self.sound_pressure
sound_pressure_weighted = self.sound_pressure_weighted
sound_pressure_level_weighted = self.sound_pressure_level_weighted
msg = messaging.new_message('soundPressure', valid=True)
msg.soundPressure.soundPressure = float(sound_pressure)
msg.soundPressure.soundPressureWeighted = float(sound_pressure_weighted)
msg.soundPressure.soundPressureWeightedDb = float(sound_pressure_level_weighted)
self.pm.send('soundPressure', msg)
self.rk.keep_time()
def callback(self, indata, frames, time, status):
"""
Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level.
Then apply A-weighting to the raw amplitudes and run the same calculations again.
Logged A-weighted equivalents are rough approximations of the human-perceived loudness.
"""
msg = messaging.new_message('rawAudioData', valid=True)
audio_data_int_16 = (indata[:, 0] * 32767).astype(np.int16)
msg.rawAudioData.data = audio_data_int_16.tobytes()
msg.rawAudioData.sampleRate = SAMPLE_RATE
self.pm.send('rawAudioData', msg)
with self.lock:
self.measurements = np.concatenate((self.measurements, indata[:, 0]))
while self.measurements.size >= FFT_SAMPLES:
measurements = self.measurements[:FFT_SAMPLES]
self.sound_pressure, _ = calculate_spl(measurements)
measurements_weighted = apply_a_weighting(measurements)
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted)
self.measurements = self.measurements[FFT_SAMPLES:]
@retry(attempts=10, delay=3)
def get_stream(self, sd, device=None):
# reload sounddevice to reinitialize portaudio
sd._terminate()
sd._initialize()
kwargs = {
"channels": 1,
"samplerate": SAMPLE_RATE,
"callback": self.callback,
"blocksize": SAMPLE_BUFFER,
}
if device is not None:
kwargs["device"] = device
return sd.InputStream(**kwargs)
def get_input_devices(self, sd):
# Try default first, then explicit input-capable devices as fallback.
devices = [None]
try:
for i, dev in enumerate(sd.query_devices()):
if dev.get("max_input_channels", 0) > 0:
devices.append(i)
except Exception:
cloudlog.exception("micd: failed to enumerate audio devices")
# Preserve order while deduplicating.
return list(dict.fromkeys(devices))
def micd_thread(self):
# sounddevice must be imported after forking processes
import sounddevice as sd
while True:
stream = None
for device in self.get_input_devices(sd):
try:
stream = self.get_stream(sd, device=device)
break
except Exception:
cloudlog.exception(f"micd: failed to open input stream (device={device})")
if stream is None:
cloudlog.error("micd: no valid input device, retrying")
time.sleep(5)
continue
try:
with stream:
cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}, {stream.blocksize=}")
while True:
self.update()
except Exception:
cloudlog.exception("micd: stream failed, restarting")
time.sleep(1)
def main():
mic = Mic()
mic.micd_thread()
if __name__ == "__main__":
main()