add debug code for livePose

This commit is contained in:
ajouatom
2026-05-21 09:16:55 +09:00
parent c97e48f1ff
commit 047f9e3c55

View File

@@ -33,6 +33,7 @@ POSENET_STD_HIST_HALF = 20
CAM_ODO_POSE_DELAY = 0.1 # dependent on the vision model context frames and temporal frequency (current model is 5 fps with 2 context frames)
CAM_ODO_ROT_STD_MULT = 10
CAM_ODO_TRANS_STD_MULT = 4
INVALID_PRINT_INTERVAL = 1.0
def calculate_invalid_input_decay(invalid_limit, recovery_time, frequency):
@@ -266,6 +267,60 @@ def sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, senso
return all(sensor_alive.values()) and all(sensor_valid.values())
def live_pose_inputs_invalid_reasons(sm, critical_services, observation_input_invalid, input_invalid_threshold, last_input_invalid_reason):
reasons = []
invalid_sm_services = [s for s in sm.services if not sm.valid[s] and s not in sm.ignore_valid]
if invalid_sm_services:
reasons.append(f"sm invalid: {', '.join(invalid_sm_services)}")
invalid_observations = []
for s in critical_services:
invalid_count = observation_input_invalid[s]
threshold = input_invalid_threshold[s]
if invalid_count >= threshold:
reason = last_input_invalid_reason.get(s, "unknown")
invalid_observations.append(f"{s}={invalid_count:.2f}/{threshold:.2f} ({reason})")
if invalid_observations:
reasons.append(f"observation invalid: {', '.join(invalid_observations)}")
return reasons
def live_pose_sensors_invalid_reasons(sensor_alive, sensor_valid, sensor_recv_time, simulation):
reasons = []
cur_time = time.monotonic()
for which in ("accelerometer", "gyroscope"):
if not sensor_alive[which]:
if simulation:
reasons.append(f"{which} not received this cycle")
else:
reasons.append(f"{which} not alive age={cur_time - sensor_recv_time[which]:.3f}s")
if not sensor_valid[which]:
reasons.append(f"{which} msg.valid=False")
return reasons
def print_live_pose_invalid_reasons(inputs_valid, sensors_valid, input_reasons, sensor_reasons, last_print_time):
if inputs_valid and sensors_valid:
return last_print_time
cur_time = time.monotonic()
if cur_time - last_print_time < INVALID_PRINT_INTERVAL:
return last_print_time
reasons = []
if not inputs_valid:
reasons.append("inputs_valid=False: " + ("; ".join(input_reasons) if input_reasons else "unknown"))
if not sensors_valid:
reasons.append("sensors_valid=False: " + ("; ".join(sensor_reasons) if sensor_reasons else "unknown"))
print("[locationd] livePose invalid - " + " | ".join(reasons), flush=True)
return cur_time
def main():
config_realtime_process([0, 1, 2, 3], 5)
@@ -285,6 +340,8 @@ def main():
filter_initialized = False
critcal_services = ["accelerometer", "gyroscope", "cameraOdometry"]
observation_input_invalid = defaultdict(int)
last_input_invalid_reason = {}
last_invalid_print_time = 0.0
input_invalid_limit = {s: round(INPUT_INVALID_LIMIT * (SERVICE_LIST[s].frequency / 20.)) for s in critcal_services}
input_invalid_threshold = {s: input_invalid_limit[s] - 0.5 for s in critcal_services}
@@ -324,9 +381,11 @@ def main():
if res == HandleLogResult.TIMING_INVALID:
cloudlog.warning(f"Observation {which} ignored due to failed timing check")
observation_input_invalid[which] += 1
last_input_invalid_reason[which] = "timing check"
elif res == HandleLogResult.INPUT_INVALID:
cloudlog.warning(f"Observation {which} ignored due to failed sanity check")
observation_input_invalid[which] += 1
last_input_invalid_reason[which] = "sanity check"
elif res == HandleLogResult.SUCCESS:
observation_input_invalid[which] *= input_invalid_decay[which]
else:
@@ -336,6 +395,9 @@ def main():
critical_service_inputs_valid = all(observation_input_invalid[s] < input_invalid_threshold[s] for s in critcal_services)
inputs_valid = sm.all_valid() and critical_service_inputs_valid
sensors_valid = sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, SIMULATION)
input_invalid_reasons = live_pose_inputs_invalid_reasons(sm, critcal_services, observation_input_invalid, input_invalid_threshold, last_input_invalid_reason)
sensor_invalid_reasons = live_pose_sensors_invalid_reasons(sensor_alive, sensor_valid, sensor_recv_time, SIMULATION)
last_invalid_print_time = print_live_pose_invalid_reasons(inputs_valid, sensors_valid, input_invalid_reasons, sensor_invalid_reasons, last_invalid_print_time)
msg = estimator.get_msg(sensors_valid, inputs_valid, filter_initialized)
pm.send("livePose", msg)