Update human_turn_detection.py

This commit is contained in:
herizon1054
2026-05-08 01:00:47 +08:00
committed by GitHub
parent fa0f661e62
commit e3acaa38a9

View File

@@ -5,6 +5,7 @@ from enum import Enum, auto
from openpilot.common.params import Params from openpilot.common.params import Params
# --- 系統與安全常數 ---
LOG_PATH = "/data/media/0/realdata/debug.log" LOG_PATH = "/data/media/0/realdata/debug.log"
PARAM_REFRESH_SEC = 2.0 PARAM_REFRESH_SEC = 2.0
MIN_SPEED_MS = 0.1 MIN_SPEED_MS = 0.1
@@ -13,182 +14,188 @@ MAX_SPEED_MS = 9.72
def _log(message: str) -> None: def _log(message: str) -> None:
try: try:
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
with open(LOG_PATH, "a", encoding="utf-8") as f: with open(LOG_PATH, "a", encoding="utf-8") as f:
f.write(f"{time.time():.3f} {message}\n") f.write(f"{time.time():.3f} {message}\n")
except Exception: except Exception:
pass pass
class HTDState(Enum): class HTDState(Enum):
INACTIVE = auto() INACTIVE = auto()
MANUAL_TURN = auto() MANUAL_TURN = auto()
RAMPING = auto() RAMPING = auto()
class HumanTurnDetection: class HumanTurnDetection:
def __init__(self) -> None: def __init__(self) -> None:
self._params = Params() self._params = Params()
self._last_params_read = 0.0 self._last_params_read = 0.0
# --- UI 控制的變數 --- # --- 1. UI 控制參數 (從 Params 讀取) ---
self._enabled = False # [修正1] 預設 False等第一次 _read_params() 從 Params 讀取 self._enabled = False # [修正1] 預設 False等第一次 _read_params() 從 Params 讀取
self._angle_threshold_deg = 90.0 self._angle_threshold_deg = 90.0
# --- 以下為寫死的系統參數,不再從 Params 讀取 --- # --- 2. 系統內建參數 (不從 Params 讀取) ---
self._angle_release_deg = 20.0 self._angle_release_deg = 20.0
self._torque_start_nm = 2.0 self._torque_start_nm = 2.0
self._torque_release_nm = 0.6 self._torque_release_nm = 0.6
# 安全接管角度鎖:角度大於此值時,即使倒數完畢也拒絕恢復自動駕駛 self._resume_angle_lock_deg = 40.0 # 安全接管角度鎖:大於此值拒絕恢復自
self._resume_angle_lock_deg = 40.0
self._state: HTDState = HTDState.INACTIVE # --- 3. 狀態與計時器 ---
self._state_change_time = 0.0 self._state: HTDState = HTDState.INACTIVE
self._state_change_time = 0.0
self._trigger_start_time = 0.0
# 帶正負號的原始數值,用於方向一致性判斷 # --- 4. 車輛動態資料快取 ---
self._last_angle_raw = 0.0 self._last_angle_raw = 0.0
self._last_torque_raw = 0.0 self._last_torque_raw = 0.0
self._last_angle = 0.0
self._last_torque = 0.0
self._last_pressed = False
self._last_angle = 0.0 # --- 5. 運行過程暫存變數 ---
self._last_torque = 0.0 self._max_turn_angle = 0.0 # 本次彎道最大角度
self._last_pressed = False self._dynamic_delay = 0.5 # 準備接管時的動態等待秒數
self._trigger_start_time = 0.0
# 本次彎道最大角度 def _read_params(self) -> None:
self._max_turn_angle = 0.0 """定期更新外部設定參數"""
# 準備接管時的動態等待秒數 now = time.monotonic()
self._dynamic_delay = 0.5 if now - self._last_params_read < PARAM_REFRESH_SEC:
return
self._last_params_read = now
self._enabled = self._params.get_bool("dp_htd_enabled")
self._angle_threshold_deg = self._get_float("dp_htd_turn_angle_threshold", 90.0)
def _read_params(self) -> None: def _transition(self, new_state: HTDState, reason: str) -> None:
now = time.monotonic() """處理狀態切換並記錄 Log"""
if now - self._last_params_read < PARAM_REFRESH_SEC: if new_state == self._state:
return return
self._last_params_read = now
self._state = new_state
self._state_change_time = time.monotonic()
_log(
f"HTD {new_state.name} reason={reason} angle={self._last_angle:.1f} "
f"torque={self._last_torque:.2f} pressed={self._last_pressed} delay={self._dynamic_delay:.2f}"
)
self._enabled = self._params.get_bool("dp_htd_enabled") def update(
self._angle_threshold_deg = self._get_float("dp_htd_turn_angle_threshold", 90.0) self,
lat_active: bool,
cruise_enabled: bool,
steering_angle_deg: float,
steering_torque_nm: float,
v_ego: float,
steering_pressed: bool = False,
) -> tuple[bool, HTDState]:
self._read_params()
def _transition(self, new_state: HTDState, reason: str) -> None: # --- 更新車輛動態資料 ---
if new_state == self._state: self._last_angle_raw = steering_angle_deg
return self._last_torque_raw = steering_torque_nm
self._state = new_state self._last_angle = abs(steering_angle_deg)
self._state_change_time = time.monotonic() self._last_torque = abs(steering_torque_nm)
_log( self._last_pressed = steering_pressed
f"HTD {new_state.name} reason={reason} angle={self._last_angle:.1f} "
f"torque={self._last_torque:.2f} pressed={self._last_pressed} delay={self._dynamic_delay:.2f}"
)
def update( # --- 守衛條件 (Guard Clauses) ---
self, # [修正2] cruise_enabled 時 HTD 不應介入(巡航中不是「人工轉向」場景)
lat_active: bool, # 同時保留速度範圍與啟用開關的守衛
cruise_enabled: bool, is_invalid_condition = (
steering_angle_deg: float, not self._enabled or
steering_torque_nm: float, cruise_enabled or
v_ego: float, not lat_active or
steering_pressed: bool = False, not (MIN_SPEED_MS <= v_ego <= MAX_SPEED_MS)
) -> tuple[bool, HTDState]: )
self._read_params()
if is_invalid_condition:
if self._state != HTDState.INACTIVE:
self._transition(HTDState.INACTIVE, "disabled_or_invalid_condition")
self._trigger_start_time = 0.0
return True, self._state
# 儲存帶正負號的原始數值 # --- 狀態機邏輯 (State Machine) ---
self._last_angle_raw = steering_angle_deg if self._state == HTDState.INACTIVE:
self._last_torque_raw = steering_torque_nm if self._should_trigger():
self._max_turn_angle = self._last_angle
self._transition(HTDState.MANUAL_TURN, "trigger")
return False, self._state
elif self._state == HTDState.MANUAL_TURN:
self._max_turn_angle = max(self._max_turn_angle, self._last_angle)
if self._should_release():
# 計算動態延遲時間 (0.5 ~ 1.0 秒)
calculated_delay = self._max_turn_angle / 270.0
self._dynamic_delay = max(0.5, min(calculated_delay, 1.0))
self._transition(HTDState.RAMPING, "release")
return False, self._state
# 儲存絕對值用於門檻比對 elif self._state == HTDState.RAMPING:
self._last_angle = abs(steering_angle_deg) # [修正3] retrigger 判斷移至 RAMPING 最頂端,確保每個 tick 都能重新進入 MANUAL_TURN
self._last_torque = abs(steering_torque_nm) if self._should_trigger():
self._last_pressed = steering_pressed self._trigger_start_time = 0.0 # [修正4] retrigger 時重置防抖計時器,避免立刻再觸發
self._transition(HTDState.MANUAL_TURN, "retrigger")
return False, self._state
# [修正2] cruise_enabled 時 HTD 不應介入(巡航中不是「人工轉向」場景) elapsed = time.monotonic() - self._state_change_time
# 同時保留速度範圍與啟用開關的守衛 if elapsed >= self._dynamic_delay:
if (not self._enabled # 角度尚未回正,繼續等待
or cruise_enabled if self._last_angle > self._resume_angle_lock_deg:
or not lat_active return False, self._state
or v_ego < MIN_SPEED_MS
or v_ego > MAX_SPEED_MS):
if self._state != HTDState.INACTIVE:
self._transition(HTDState.INACTIVE, "disabled_or_invalid_condition")
self._trigger_start_time = 0.0
return True, self._state
# ── INACTIVE ────────────────────────────────────────────── # 滿足恢復條件
if self._state == HTDState.INACTIVE: self._max_turn_angle = 0.0
if self._should_trigger(): self._transition(HTDState.INACTIVE, "resume")
self._max_turn_angle = self._last_angle return True, self._state
self._transition(HTDState.MANUAL_TURN, "trigger")
return False, self._state return False, self._state
return True, self._state
# ── MANUAL_TURN ─────────────────────────────────────────── # 預設行為 (理論上不會走到這裡)
if self._state == HTDState.MANUAL_TURN: return True, self._state
self._max_turn_angle = max(self._max_turn_angle, self._last_angle)
if self._should_release(): def _should_trigger(self) -> bool:
calculated_delay = self._max_turn_angle / 270.0 """判斷是否應該觸發手動轉向狀態"""
self._dynamic_delay = max(0.5, min(calculated_delay, 1.0)) direction_match = (self._last_angle_raw * self._last_torque_raw) > 0
self._transition(HTDState.RAMPING, "release") condition_met = (
return False, self._state self._last_pressed
and direction_match
and self._last_torque >= self._torque_start_nm
and self._last_angle >= self._angle_threshold_deg
)
# ── RAMPING ─────────────────────────────────────────────── if condition_met:
# [修正3] retrigger 判斷移至 RAMPING 最頂端,確保每個 tick 都能重新進入 MANUAL_TURN if self._trigger_start_time == 0.0:
if self._should_trigger(): self._trigger_start_time = time.monotonic()
self._trigger_start_time = 0.0 # [修正4] retrigger 時重置防抖計時器,避免立刻再觸發 elif time.monotonic() - self._trigger_start_time >= 0.1:
self._transition(HTDState.MANUAL_TURN, "retrigger") return True
return False, self._state else:
self._trigger_start_time = 0.0
elapsed = time.monotonic() - self._state_change_time return False
if elapsed >= self._dynamic_delay:
if self._last_angle > self._resume_angle_lock_deg:
# 角度尚未回正,繼續等待
return False, self._state
self._max_turn_angle = 0.0 def _should_release(self) -> bool:
self._transition(HTDState.INACTIVE, "resume") """判斷是否應該解除手動轉向狀態"""
return True, self._state # 軌道 A方向盤回正且扭力歸零
perfect_return = (
self._last_torque <= self._torque_release_nm
and self._last_angle <= self._angle_release_deg
)
# 軌道 B駕駛放開方向盤
hands_off = not self._last_pressed
return False, self._state release_condition = perfect_return or hands_off
def _should_trigger(self) -> bool: if release_condition:
direction_match = (self._last_angle_raw * self._last_torque_raw) > 0 self._trigger_start_time = 0.0 # 釋放時同步清除防抖計時
condition_met = ( return release_condition
self._last_pressed
and direction_match
and self._last_torque >= self._torque_start_nm
and self._last_angle >= self._angle_threshold_deg
)
if condition_met: def _get_float(self, key: str, default: float) -> float:
if self._trigger_start_time == 0.0: """安全地從 Params 取得浮點數"""
self._trigger_start_time = time.monotonic() try:
elif time.monotonic() - self._trigger_start_time >= 0.1: val = self._params.get(key)
return True return float(val) if val is not None else default
else: except Exception:
self._trigger_start_time = 0.0 return default
return False
def _should_release(self) -> bool:
# 軌道 A方向盤回正且扭力歸零
perfect_return = (
self._last_torque <= self._torque_release_nm
and self._last_angle <= self._angle_release_deg
)
# 軌道 B駕駛放開方向盤
hands_off = not self._last_pressed
release_condition = perfect_return or hands_off
if release_condition:
self._trigger_start_time = 0.0 # 釋放時同步清除防抖計時
return release_condition
def _get_float(self, key: str, default: float) -> float:
try:
val = self._params.get(key)
if val is None:
return default
return float(val)
except Exception:
return default