From c8e0990dc4cab26e222f73bd508c86443d703b77 Mon Sep 17 00:00:00 2001 From: herizon1054 Date: Thu, 21 May 2026 02:13:40 +0800 Subject: [PATCH] Update accel_controller.py --- .../lib/accel_personality/accel_controller.py | 307 +++++------------- 1 file changed, 79 insertions(+), 228 deletions(-) diff --git a/dragonpilot/selfdrive/controls/lib/accel_personality/accel_controller.py b/dragonpilot/selfdrive/controls/lib/accel_personality/accel_controller.py index 522abdb805..93b7412874 100644 --- a/dragonpilot/selfdrive/controls/lib/accel_personality/accel_controller.py +++ b/dragonpilot/selfdrive/controls/lib/accel_personality/accel_controller.py @@ -10,276 +10,127 @@ import numpy as np from openpilot.common.realtime import DT_MDL from openpilot.common.params import Params -# 定義加速度個性化設定的列舉型態 (Eco, Normal, Sport) +# 定義加速度性格的列舉值 AccelPersonality = custom.LongitudinalPlanDP.AccelerationPersonality ACCEL_PERSONALITY_OPTIONS = [AccelPersonality.eco, AccelPersonality.normal, AccelPersonality.sport] - -# ============================================================================== -# 加速度與減速度設定檔 (Profiles & Breakpoints) -# ============================================================================== - -# 加速度上限的中斷點 (車速, 單位: m/s) -A_MAX_BP = [0.0, 4.0, 8.0, 16.0, 40.0] -# 各種個性化設定下的最大加速度值 (對應 A_MAX_BP) -A_MAX_V = { - AccelPersonality.eco: [1.80, 1.80, 1.20, 0.40, 0.08], - AccelPersonality.normal: [1.80, 1.80, 1.35, 0.50, 0.15], - AccelPersonality.sport: [1.80, 1.80, 1.50, 0.70, 0.25], +# Acceleration Profiles +MAX_ACCEL_PROFILES = { + AccelPersonality.eco: [1.00, 0.60, 1.00, 1.40, 1.20, 1.00, 0.80, 0.60, 0.50, 0.40, 0.12, 0.08], + AccelPersonality.normal: [1.50, 0.80, 1.20, 1.80, 1.40, 1.20, 1.00, 0.80, 0.70, 0.60, 0.24, 0.10], + AccelPersonality.sport: [2.00, 1.00, 1.40, 2.00, 1.60, 1.40, 1.20, 1.00, 0.90, 0.80, 0.36, 0.12], } +MAX_ACCEL_BREAKPOINTS = [0.0, 0.5, 1.0, 4.0, 6.0, 9.0, 11.0, 16.0, 20.0, 25.0, 30.0, 55.0] -# 滑行阻力 (Coast Drag) 的中斷點 (車速, 單位: m/s) -COAST_DRAG_BP = [0.0, 10.0, 25.0, 40.0] -# 各種個性化設定下的滑行減速度值 (對應 COAST_DRAG_BP,模擬放開油門時的自然減速) -COAST_DRAG_V = { - AccelPersonality.eco: [-0.03, -0.05, -0.08, -0.12], - AccelPersonality.normal: [-0.04, -0.07, -0.12, -0.18], - AccelPersonality.sport: [-0.06, -0.10, -0.18, -0.28], +# Braking Profiles +MIN_ACCEL_PROFILES = { + AccelPersonality.eco: [-.003, -0.25, -0.35, -0.44, -2.0], + AccelPersonality.normal: [-.004, -0.27, -0.37, -0.46, -2.0], + AccelPersonality.sport: [-.005, -0.29, -0.39, -0.48, -2.0], } +MIN_ACCEL_BREAKPOINTS = [3, 4.5, 7., 9., 25] -# 煞車底線 (A_MIN Floor) 的中斷點 (車速, 單位: m/s) -A_MIN_FLOOR_BP = [0.0, 5.0, 15.0, 40.0] -# 各種個性化設定下的最大允許減速度值 (對應 A_MIN_FLOOR_BP) -A_MIN_FLOOR_V = { - AccelPersonality.eco: [-0.20, -0.35, -0.55, -0.50], - AccelPersonality.normal: [-0.25, -0.45, -0.75, -0.65], - AccelPersonality.sport: [-0.35, -0.65, -1.00, -0.95], -} -# ============================================================================== -# 控制模型常數設定 -# ============================================================================== -DEFICIT_TO_FLOOR = 8.5 # 當車速低於巡航速度在此範圍內時,逐漸過渡到煞車底線 -COAST_DEADBAND = 1.0 # 巡航死區 (m/s),在此速差範圍內優先進入滑行狀態以維持車速穩定 -RAMP_OFF_RANGE = 5.0 # 接近巡航速度時,加速度上限開始線性遞減的緩衝範圍 (m/s) +DECEL_SMOOTH_ALPHA = 0.40 # Very aggressive smoothing for decel (lower = smoother) +ACCEL_SMOOTH_ALPHA = 0.90 # Less aggressive for accel (higher = more responsive) -# 非對稱變化率限制 (Rate Limiting) -A_MIN_TIGHTEN_RATE = 1.5 # 煞車加重時的變化率上限 (m/s³,對應原本的 MAX_DECEL_INCREASE_RATE) -A_MIN_RELAX_RATE = 0.6 # 煞車放鬆時的變化率上限 (m/s³,對應原本的 MAX_DECEL_DECREASE_RATE) -A_MAX_RATE = 0.8 # 加速度上限的變化率 (m/s³) +# Asymmetric rate limiting +MAX_DECEL_INCREASE_RATE = 1.3 # When braking harder (m/s² per second) +MAX_DECEL_DECREASE_RATE = 1.0 # When releasing brake (m/s² per second) -# 動態安全廊道間距 (Dynamic Safety Corridor Gap) -# 確保最小加速度永遠嚴格小於最大加速度,防止解算器崩潰 (Solver Crash) -MIN_MAX_GAP = 0.05 - -# 參數重新讀取的幀數間隔 (每秒更新一次 Params) -PARAM_REFRESH_FRAMES = max(1, int(1.0 / DT_MDL)) class AccelPersonalityController: - """ - 升級版縱向加減速控制器 (新版模型 tn) - 引入巡航速度 (v_cruise) 依賴性,將減速邏輯區分為滑行阻力與煞車底線, - 並透過接近緩和與巡航死區控制,大幅提升車輛在接近目標車速時的平穩度與舒適性。 - """ def __init__(self): self.params = Params() self.frame = 0 - self._first = True # 標記是否為首次執行,用於初始化加減速狀態 - - # 從系統參數讀取當前的個性化設定與啟用狀態 - val = self.params.get('AccelPersonality') - self._personality = val if val is not None else AccelPersonality.normal + self.last_max_accel = 2.0 + self.last_min_accel = -0.01 + self.first_run = True + self._accel_personality = self.params.get('AccelPersonality') or AccelPersonality.normal self._enabled = self.params.get_bool('AccelPersonalityEnabled') - # 初始化巡航速度與當前加減速限制值 - self._v_cruise = 0.0 - self._a_min = -0.05 - self._a_max = 1.50 - - # 效能優化快取機制 (Caching) - self._cache_v: float | None = None - self._cache_v_cruise: float | None = None - self._cache_a_min = self._a_min - self._cache_a_max = self._a_max - def update(self, sm=None): - """ - 更新控制器狀態,讀取最新車輛巡航速度並定期刷新系統參數 - """ self.frame += 1 - # 每個 cycle 重設快取 - self._cache_v = None - self._cache_v_cruise = None - - # 從開源車輛狀態 (carState) 獲取設定的巡航速度,並將時速 (km/h) 轉換為秒速 (m/s) - if sm is not None: - try: - self._v_cruise = float(sm['carState'].vCruise) * (1000.0 / 3600.0) - except Exception: - pass - - # 定期刷新外部參數,避免每幀讀取 Params 造成 I/O 負擔 - if self.frame % PARAM_REFRESH_FRAMES == 0: - val = self.params.get('AccelPersonality') - self._personality = val if val is not None else AccelPersonality.normal + if self.frame % int(1.0 / DT_MDL) == 0: + self._accel_personality = self.params.get('AccelPersonality') or AccelPersonality.normal self._enabled = self.params.get_bool('AccelPersonalityEnabled') @property def accel_personality(self) -> int: - """ - 獲取當前的加速度個性化設定型態 - """ - return self._personality + return self._accel_personality def get_accel_personality(self) -> int: - """ - 以整數形式獲取當前個性化設定 - """ - return int(self._personality) + return int(self._accel_personality) def set_accel_personality(self, personality: int): - """ - 設定新的加速度個性化設定並寫入系統參數 - """ if personality in ACCEL_PERSONALITY_OPTIONS: - self._personality = personality + self._accel_personality = personality self.params.put('AccelPersonality', personality) def cycle_accel_personality(self) -> int: - """ - 循環切換加速個性化設定 (Eco -> Normal -> Sport -> Eco) - """ - idx = ACCEL_PERSONALITY_OPTIONS.index(self._personality) if self._personality in ACCEL_PERSONALITY_OPTIONS else 0 - nxt = ACCEL_PERSONALITY_OPTIONS[(idx + 1) % len(ACCEL_PERSONALITY_OPTIONS)] - self.set_accel_personality(nxt) - return int(nxt) - - def is_enabled(self) -> bool: - """ - 檢查縱向個性化控制是否啟用 - """ - return self._enabled - - def set_enabled(self, enabled: bool): - """ - 設定縱向個性化控制的啟用狀態並寫入參數 - """ - self._enabled = bool(enabled) - self.params.put_bool('AccelPersonalityEnabled', self._enabled) - - def toggle_enabled(self) -> bool: - """ - 切換啟用狀態開關 - """ - self.set_enabled(not self._enabled) - return self._enabled - - def reset(self, personality: int | None = None): - """ - 重設控制器至初始狀態 - """ - if personality is None or personality not in ACCEL_PERSONALITY_OPTIONS: - personality = AccelPersonality.normal - self._personality = personality - self.params.put('AccelPersonality', self._personality) - self.frame = 0 - self._first = True - self._a_min = -0.05 - self._a_max = 1.50 - self._cache_v = None - self._cache_v_cruise = None + current = self._accel_personality + next_personality = ACCEL_PERSONALITY_OPTIONS[(ACCEL_PERSONALITY_OPTIONS.index(current) + 1) % len(ACCEL_PERSONALITY_OPTIONS)] + self.set_accel_personality(next_personality) + return int(next_personality) def get_accel_limits(self, v_ego: float) -> tuple[float, float]: - """ - 獲取當前車速下的動態加減速限制值 (考慮快取優化機制) - """ v_ego = max(0.0, v_ego) - # 快取檢查:若車速變化極小且巡航速度未變,直接返回上一次的計算結果以節省運算資源 - if (self._cache_v is not None - and abs(self._cache_v - v_ego) < 0.01 - and self._cache_v_cruise == self._v_cruise): - return self._cache_a_min, self._cache_a_max + target_max = np.interp(v_ego, MAX_ACCEL_BREAKPOINTS, MAX_ACCEL_PROFILES[self.accel_personality]) + target_min = np.interp(v_ego, MIN_ACCEL_BREAKPOINTS, MIN_ACCEL_PROFILES[self.accel_personality]) - # 執行實際的步進計算 - self._cache_a_min, self._cache_a_max = self._step(v_ego) - self._cache_v = v_ego - self._cache_v_cruise = self._v_cruise - return self._cache_a_min, self._cache_a_max + if self.first_run: + self.last_max_accel, self.last_min_accel = target_max, target_min + self.first_run = False + return float(target_min), float(target_max) + + # Smoothing + self.last_max_accel = ACCEL_SMOOTH_ALPHA * target_max + (1 - ACCEL_SMOOTH_ALPHA) * self.last_max_accel + smoothed_decel = DECEL_SMOOTH_ALPHA * target_min + (1 - DECEL_SMOOTH_ALPHA) * self.last_min_accel + + # Rate Limiting (Asymmetric) + raw_change = smoothed_decel - self.last_min_accel + + if raw_change < 0: + limit = MAX_DECEL_INCREASE_RATE * DT_MDL + decel_change = np.clip(raw_change, -limit, limit) + else: + limit = MAX_DECEL_DECREASE_RATE * DT_MDL + decel_change = np.clip(raw_change, -limit, limit) + + self.last_min_accel += decel_change + + # Dynamic Safety Corridor: Ensure min is always strictly less than max. + # We maintain a gap of at least 0.1, or 5% of the current max acceleration. + # This scaling gap prevents solver crashes at high acceleration values. + gap = max(0.1, abs(self.last_max_accel) * 0.05) + + if self.last_min_accel > self.last_max_accel - gap: + self.last_min_accel = self.last_max_accel - gap + + return float(self.last_min_accel), float(self.last_max_accel) def get_min_accel(self, v_ego: float) -> float: - """ - 獲取當前車速下的最小加速度 (減速下限) - """ return self.get_accel_limits(v_ego)[0] def get_max_accel(self, v_ego: float) -> float: - """ - 獲取當前車速下的最大加速度 (加速上限) - """ return self.get_accel_limits(v_ego)[1] - def _ramp_off(self, v_ego: float) -> float: - """ - 接近緩和機制:當車速接近巡航目標速度時,線性調降加速度上限,避免衝過頭 - """ - if self._v_cruise <= 0.0: - return 1.0 - return float(np.clip((self._v_cruise - v_ego) / RAMP_OFF_RANGE, 0.0, 1.0)) + def is_enabled(self) -> bool: + return self._enabled - def _target_max(self, v_ego: float) -> float: - """ - 計算基礎最大加速度目標值,並結合接近緩和增益 - """ - base = float(np.interp(v_ego, A_MAX_BP, A_MAX_V[self._personality])) - return base * self._ramp_off(v_ego) + def set_enabled(self, enabled: bool): + self._enabled = enabled + self.params.put_bool('AccelPersonalityEnabled', enabled) - def _target_min(self, v_ego: float) -> float: - """ - 計算動態減速度目標值(核心升級:滑行與煞車分離) - - 超速或未設定巡航速時:僅提供基礎滑行阻力 (Coast Drag) - - 低於巡航速時:根據速差以指數曲線 (1.5次方) 平滑過渡至煞車底線 (Floor) - """ - coast = float(np.interp(v_ego, COAST_DRAG_BP, COAST_DRAG_V[self._personality])) - if self._v_cruise <= 0.0 or v_ego >= self._v_cruise: - return coast + def toggle_enabled(self) -> bool: + current = self._enabled + self.set_enabled(not current) + return not current - floor = float(np.interp(v_ego, A_MIN_FLOOR_BP, A_MIN_FLOOR_V[self._personality])) - deficit = self._v_cruise - v_ego - # 引入 1.5 次方曲線,讓煞車力道的介入更為平滑線性,提升舒適度 - t = float(np.clip(deficit / DEFICIT_TO_FLOOR, 0.0, 1.0)) ** 1.5 - return coast + t * (floor - coast) - - def _apply_coast_deadband(self, v_ego: float, t_min: float, t_max: float) -> tuple[float, float]: - """ - 巡航死區控制:當車速與目標巡航速度高度接近 (1.0 m/s 內) 時, - 限制加速度上限並將減速下限設為滑行阻力,防止加減速頻繁交替 (Ping-Pong 效應) - """ - if self._v_cruise <= 0.0 or abs(v_ego - self._v_cruise) >= COAST_DEADBAND: - return t_min, t_max - coast = float(np.interp(v_ego, COAST_DRAG_BP, COAST_DRAG_V[self._personality])) - return coast, max(0.05, t_max * 0.25) - - def _rate_limit(self, last: float, target: float, rate_down: float, rate_up: float) -> float: - """ - 線性變化率限制器 (Rate Limiter):根據變化方向應用非對稱變化率上限,確保加減速過渡平順 - """ - rate = rate_up if target > last else rate_down - step = rate * DT_MDL - return float(np.clip(target, last - step, last + step)) - - def _step(self, v_ego: float) -> tuple[float, float]: - """ - 核心加減速限制步進計算 - """ - # 1. 計算目標極值 - t_max = self._target_max(v_ego) - t_min = self._target_min(v_ego) - - # 2. 應用巡航死區修正 - t_min, t_max = self._apply_coast_deadband(v_ego, t_min, t_max) - - # 3. 首次執行直接賦值 - if self._first: - self._a_min, self._a_max = t_min, t_max - self._first = False - return self._a_min, self._a_max - - # 4. 應用嚴格的線性變化率限制 (替代舊版的 EMA 濾波) - new_min = self._rate_limit(self._a_min, t_min, rate_down=A_MIN_TIGHTEN_RATE, rate_up=A_MIN_RELAX_RATE) - new_max = self._rate_limit(self._a_max, t_max, rate_down=A_MAX_RATE, rate_up=A_MAX_RATE) - - # 5. 安全廊道約束:確保 min 永遠小於 max 減去最小間距,防止解算器異常 - new_min = min(new_min, new_max - MIN_MAX_GAP) - - self._a_min, self._a_max = new_min, new_max - return self._a_min, self._a_max + def reset(self): + self._accel_personality = AccelPersonality.normal + self.params.put('AccelPersonality', AccelPersonality.normal) + self.frame = 0 + self.last_max_accel = 2.0 + self.last_min_accel = -0.01 + self.first_run = True