From 2a39a830686bfb368d4119bdc7a477331bddd6b8 Mon Sep 17 00:00:00 2001 From: janpoo6427 Date: Fri, 5 Jun 2026 14:11:40 +0900 Subject: [PATCH] Update desire helper Update line ok Fix corner long distance variables for left and right Add new persistent parameters to params_keys.h Added new persistent parameters for side gap margin, BSD hold time, and object clear time. Refactor lane change logic in desire_helper.py Remove unnecessary reset of lane change completion flag and add safety check for completed direction. --- common/params_keys.h | 6 + .../opendbc/car/hyundai/hyundaicanfd.py | 23 +- selfdrive/controls/lib/desire_helper.py | 167 ++++++------ .../controls/lib/desire_lib/side_state.py | 237 ++++++++++++------ 4 files changed, 255 insertions(+), 178 deletions(-) diff --git a/common/params_keys.h b/common/params_keys.h index 45650114d..adc3fbe67 100644 --- a/common/params_keys.h +++ b/common/params_keys.h @@ -303,6 +303,12 @@ inline static std::unordered_map keys = { {"LaneLineCheck", {PERSISTENT, INT, "0"}}, {"SideObjectDetectDisplay", {PERSISTENT, INT, "0"}}, {"MaxAngleFrames", {PERSISTENT, INT, "89"}}, + + {"SideGapMargin", {PERSISTENT, INT, "30"}}, + {"BsdHoldSec", {PERSISTENT, INT, "25"}}, + {"BsdClearSec", {PERSISTENT, INT, "10"}}, + {"ObjectClearSec", {PERSISTENT, INT, "5"}}, + {"SoftHoldMode", {PERSISTENT, INT, "0"}}, {"SoftHoldMode", {PERSISTENT, INT, "0"}}, diff --git a/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py b/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py index 10e47ac46..103c0c9a5 100644 --- a/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py +++ b/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py @@ -788,12 +788,13 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, values["LANELINE_CURVATURE"] = (min(abs(curvature), 15) + (-1 if curvature < 0 else 0)) if lat_active else 0 values["LANELINE_CURVATURE_DIRECTION"] = 1 if curvature < 0 and lat_active else 0 - lane_color = 6 if md is not None and md.meta.laneChangeAvailableLeft else 2 + # LANELINE 경고 판단 (CAN raw 기준) if lane_line_check >= 1: - lane_line_warn_left = CS.out.leftLaneLine % 10 not in (0, 5) # 실선이면 주황 + lane_line_warn_left = CS.out.leftLaneLine % 10 not in (0, 5) + lane_line_warn_right = CS.out.rightLaneLine % 10 not in (0, 5) else: - lane_line_warn_left = CS.out.leftLaneLine >= 20 # 황색(10~)/주황(20~) 차단 - lane_line_warn_right = CS.out.rightLaneLine >= 20 + lane_line_warn_left = CS.out.leftLaneLine // 10 == 2 # 황색(10~)/주황(20~) 차단 + lane_line_warn_right = CS.out.rightLaneLine // 10 == 2 lca_avail_left = (md is not None and md.meta.laneChangeAvailableLeft) lca_avail_right = (md is not None and md.meta.laneChangeAvailableRight) @@ -811,7 +812,7 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, if hud_control.leftLaneDepart: values["LANELINE_LEFT"] = 4 if (frame // 50) % 2 == 0 else 1 else: - values["LANELINE_LEFT"] = lane_color if hud_control.leftLaneVisible else 0 + values["LANELINE_LEFT"] = lane_color # 오른쪽 if lca_avail_right: @@ -819,18 +820,18 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, elif lane_line_warn_right or CS.out.rightBlindspot or (side_object_right and side_object_detect_display): lane_color = 4 else: - lane_line_warn_right = CS.out.rightLaneLine // 10 == 2 - lane_color = 4 if lane_line_warn_right or CS.out.rightBlindspot else lane_color + lane_color = 2 + if hud_control.rightLaneDepart: values["LANELINE_RIGHT"] = 4 if (frame // 50) % 2 == 0 else 1 else: - values["LANELINE_RIGHT"] = lane_color if hud_control.rightLaneVisible else 0 + values["LANELINE_RIGHT"] = lane_color values["LCA_LEFT_ARROW"] = 2 if CS.out.leftBlinker else 0 values["LCA_RIGHT_ARROW"] = 2 if CS.out.rightBlinker else 0 - values["LCA_LEFT_ICON"] = (1 if CS.out.leftBlindspot else 2) if lat_active else 0 - values["LCA_RIGHT_ICON"] = (1 if CS.out.rightBlindspot else 2) if lat_active else 0 + values["LCA_LEFT_ICON"] = (1 if not lca_avail_left else 2) if lat_active else 1 if lat_enabled else 0 + values["LCA_RIGHT_ICON"] = (1 if not lca_avail_right else 2) if lat_active else 1 if lat_enabled else 0 values["LANE_LEFT"] = 1 if desire in (1, 3) else 0 values["LANE_RIGHT"] = 1 if desire in (2, 4) else 0 @@ -926,4 +927,4 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, values["SPEED_LIMIT"] = 100 ret.append(packer.make_can_msg("HDA_INFO_4A3", CAN.CAM, values)) - return ret + return ret \ No newline at end of file diff --git a/selfdrive/controls/lib/desire_helper.py b/selfdrive/controls/lib/desire_helper.py index ceb84bdc0..5ae24f0d5 100644 --- a/selfdrive/controls/lib/desire_helper.py +++ b/selfdrive/controls/lib/desire_helper.py @@ -18,7 +18,6 @@ class DesireHelper: self.params = Params() self.frame = 0 - # FSM core self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.lane_change_timer = 0.0 @@ -33,24 +32,20 @@ class DesireHelper: self.desire_disable_count = 0 self.turn_disable_count = 0 - # per-side states self.left = SideState("left") self.right = SideState("right") - # blinker/ATC state (원본 변수들 유지) self.blinker_ignore = False self.driver_blinker_state = BLINKER_NONE self.carrot_blinker_state = BLINKER_NONE self.carrot_lane_change_count = 0 self.carrot_cmd_index_last = 0 self.atc_type = "" - self.atc_active = 0 # 0: 없음, 1: ATC 동작, 2: 충돌 + self.atc_active = 0 - # auto lane change self.auto_lane_change_enable = False self.unsafe_cancel_timer = 0.0 - # keep pulse self.keep_pulse_timer = 0.0 self.lane_change_need_torque = 0 @@ -59,12 +54,15 @@ class DesireHelper: self.lane_change_delay = 0.0 self.model_turn_speed_factor = 0.0 self.model_turn_speed = 200.0 + self.side_gap_margin = 3.0 + self.bsd_clear_sec = 0.0 + self.object_clear_sec = 0.3 + self.bsd_hold_sec = 1.5 + self.ignore_bsd = False - # misc self.prev_desire_enabled = False self.desire_log = "" - # externally readable flags self.lane_change_available_left = False self.lane_change_available_right = False @@ -76,8 +74,6 @@ class DesireHelper: self.lane_change_cooldown_timer = 0.0 self.lane_appeared_block_timer = 0.0 - # ───────────────────────────────────────────── - # params/model # ───────────────────────────────────────────── def _update_params_periodic(self): if self.frame % 100 == 0: @@ -103,24 +99,19 @@ class DesireHelper: def _check_desire_state(self, modeldata, carstate, maneuver_type): desire_state = modeldata.meta.desireState - orientation_rate = abs(modeldata.orientationRate.z[5]) + orientation_rate = abs(modeldata.orientationRate.z[5]) orientation_rate_future = abs(modeldata.orientationRate.z[15]) - self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1 - if maneuver_type == "turn" and abs(carstate.steeringAngleDeg) > 80 and orientation_rate_future < orientation_rate: self.turn_disable_count = int(10.0 / DT_MDL) else: self.turn_disable_count = max(0, self.turn_disable_count - 1) - # ───────────────────────────────────────────── - # blinkers/ATC (원본 로직 유지, side 계산은 별개) # ───────────────────────────────────────────── def _update_driver_blinker(self, carstate): st = carstate.leftBlinker * 1 + carstate.rightBlinker * 2 changed = st != self.driver_blinker_state self.driver_blinker_state = st - enabled = st in (BLINKER_LEFT, BLINKER_RIGHT) if self.lane_change_need_torque < 0: enabled = False @@ -130,86 +121,87 @@ class DesireHelper: atc_type = carrotMan.atcType atc_blinker_state = BLINKER_NONE - # 유지 카운트는 DesireHelper에서 관리 if self.carrot_lane_change_count > 0: atc_blinker_state = self.carrot_blinker_state elif carrotMan.carrotCmdIndex != self.carrot_cmd_index_last and carrotMan.carrotCmd == "LANECHANGE": - self.carrot_cmd_index_last = carrotMan.carrotCmdIndex + self.carrot_cmd_index_last = carrotMan.carrotCmdIndex self.carrot_lane_change_count = int(0.2 / DT_MDL) - self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT - atc_blinker_state = self.carrot_blinker_state + self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT + atc_blinker_state = self.carrot_blinker_state elif atc_type in ("turn left", "turn right"): if self.atc_active != 2: - atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT - self.atc_active = 1 - self.blinker_ignore = False + atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT + self.atc_active = 1 + self.blinker_ignore = False elif atc_type in ("fork left", "fork right", "atc left", "atc right"): if self.atc_active != 2: atc_blinker_state = BLINKER_LEFT if atc_type in ("fork left", "atc left") else BLINKER_RIGHT - self.atc_active = 1 + self.atc_active = 1 else: self.atc_active = 0 - # 충돌 시 ATC 무효 if driver_blinker_state != BLINKER_NONE and atc_blinker_state != BLINKER_NONE and driver_blinker_state != atc_blinker_state: atc_blinker_state = BLINKER_NONE - self.atc_active = 2 + self.atc_active = 2 atc_desire_enabled = atc_blinker_state in (BLINKER_LEFT, BLINKER_RIGHT) - # blinker_ignore if driver_blinker_state == BLINKER_NONE: self.blinker_ignore = False if self.blinker_ignore: - atc_blinker_state = BLINKER_NONE + atc_blinker_state = BLINKER_NONE atc_desire_enabled = False - # 타입 변경 1프레임 무시 if self.atc_type != atc_type: atc_desire_enabled = False self.atc_type = atc_type return atc_blinker_state, atc_desire_enabled - # ───────────────────────────────────────────── - # per-side processing (핵심: 좌/우 모두 매 프레임 계산) # ───────────────────────────────────────────── def _process_sides(self, carstate, modeldata, radarState): - # geometry (좌/우) - # left: outer laneLines[0], current laneLines[1], edge[0], cur_prob laneLineProbs[1] self.left.update_lane_geometry( modeldata.laneLines[0], modeldata.laneLineProbs[0], - modeldata.laneLines[1], - modeldata.roadEdges[0], + modeldata.laneLines[1], modeldata.roadEdges[0], cur_prob=modeldata.laneLineProbs[1], ) - # right: outer laneLines[3], current laneLines[2], edge[1], cur_prob laneLineProbs[2] self.right.update_lane_geometry( modeldata.laneLines[3], modeldata.laneLineProbs[3], - modeldata.laneLines[2], - modeldata.roadEdges[1], + modeldata.laneLines[2], modeldata.roadEdges[1], cur_prob=modeldata.laneLineProbs[2], ) - # lane line info (HUD용 raw는 기존대로 leftLaneLine/rightLaneLine) self.left.update_lane_line_info(carstate.leftLaneLine) self.right.update_lane_line_info(carstate.rightLaneLine) - # BSD 설정 - ignore_bsd = (self.laneChangeBsd < 0) - - # obstacles v_ego = carstate.vEgo - self.left.update_obstacles(v_ego, radarState.leadLeft, carstate.leftBlindspot, ignore_bsd, bsd_hold_sec=2.0) - self.right.update_obstacles(v_ego, radarState.leadRight, carstate.rightBlindspot, ignore_bsd, bsd_hold_sec=2.0) + + self.left.update_obstacles( + v_ego, radarState.leadLeft, carstate.leftBlindspot, self.ignore_bsd, + bsd_hold_sec = self.bsd_hold_sec, + side_gap_margin = self.side_gap_margin, + corner_long_dist_f = float(carstate.leftLongDist), + corner_long_dist_r = float(carstate.leftRearLongDist), + corner_lat_dist = float(carstate.leftLatDist), + object_clear_sec = self.object_clear_sec, + ) + self.right.update_obstacles( + v_ego, radarState.leadRight, carstate.rightBlindspot, self.ignore_bsd, + bsd_hold_sec = self.bsd_hold_sec, + side_gap_margin = self.side_gap_margin, + corner_long_dist_f = float(carstate.rightLongDist), + corner_long_dist_r = float(carstate.rightRearLongDist), + corner_lat_dist = float(carstate.rightLatDist), + object_clear_sec = self.object_clear_sec, + ) # compute available (include BSD+object) if self.lane_line_check >= 1: left_line_ok = self.left.lane_line_info_mod in (0, 5) right_line_ok = self.right.lane_line_info_mod in (0, 5) else: - left_line_ok = self.left.lane_line_info_raw < 20 - right_line_ok = self.right.lane_line_info_raw < 20 + left_line_ok = self.left.lane_line_info_raw // 10 != 2 + right_line_ok = self.right.lane_line_info_raw // 10 != 2 self.left.compute_lane_change_available(lane_line_info_lt_20=left_line_ok, bsd_level=self.lane_change_bsd, bsd_clear_sec=self.bsd_clear_sec) self.right.compute_lane_change_available(lane_line_info_lt_20=right_line_ok, bsd_level=self.lane_change_bsd, bsd_clear_sec=self.bsd_clear_sec) @@ -217,22 +209,18 @@ class DesireHelper: self.left.update_triggers() self.right.update_triggers() - # externally readable - self.lane_change_available_left = self.left.lane_change_available + self.lane_change_available_left = self.left.lane_change_available self.lane_change_available_right = self.right.lane_change_available def _get_selected_side(self, blinker_state: int) -> SideState: return self.left if blinker_state == BLINKER_LEFT else self.right - # ───────────────────────────────────────────── - # main update # ───────────────────────────────────────────── def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMan, radarState): self.frame += 1 self._update_params_periodic() self._make_model_turn_speed(modeldata) - # counts self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1) self.lane_change_delay_timer = max(0.0, self.lane_change_delay_timer - DT_MDL) self.unsafe_cancel_timer = max(0.0, self.unsafe_cancel_timer - DT_MDL) @@ -244,18 +232,14 @@ class DesireHelper: v_ego = carstate.vEgo below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN - # per-side compute (좌/우 모두) self._process_sides(carstate, modeldata, radarState) - - # desire state from model self._check_desire_state(modeldata, carstate, self.maneuver_type) - # blinkers driver_st, driver_changed, driver_enabled = self._update_driver_blinker(carstate) atc_st, atc_enabled = self._update_atc_blinker(carrotMan, driver_st) desire_enabled = driver_enabled or atc_enabled - blinker_state = driver_st if driver_enabled else atc_st + blinker_state = driver_st if driver_enabled else atc_st # ── 깜빡이 노이즈 필터링 및 반대 방향 전환 감지 ── if desire_enabled: @@ -264,11 +248,6 @@ class DesireHelper: (blinker_state == BLINKER_RIGHT and self.completed_direction == LaneChangeDirection.left): self.lane_change_completed_on_blinker = False - # 깜빡이가 새로 켜진 순간(재입력 의사 발생) 영구 락아웃을 방지하기 위해 - # 완료 플래그는 해제해 주되, 쿨다운 타이머(cooldown_timer)는 그대로 유지하여 안전을 검증합니다. - if not self.prev_desire_enabled: - self.lane_change_completed_on_blinker = False - else: self.blinker_off_timer += DT_MDL if self.blinker_off_timer > 0.5: @@ -320,12 +299,11 @@ class DesireHelper: # ── auto lane change trigger ────────────────────────────────── auto_lane_change_trigger = False + if desire_enabled and side is not None: - # carrot_lane_change_count>0이면 강제 허용 if self.carrot_lane_change_count > 0: auto_lane_change_trigger = side.lane_change_available else: - # 기존 조건: edge_available + (trigger or appeared) + not side_object_detected auto_lane_change_trigger = ( self.auto_lane_change_enable and (not atc_lane_change_manual_only) and @@ -341,36 +319,30 @@ class DesireHelper: # ───────────────────────── FSM ───────────────────────── if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX: - self.lane_change_state = LaneChangeState.off + self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none - self.turn_direction = TurnDirection.none - self.maneuver_type = "none" + self.turn_direction = TurnDirection.none + self.maneuver_type = "none" elif self.desire_disable_count > 0: - self.lane_change_state = LaneChangeState.off + self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none - self.turn_direction = TurnDirection.none - self.maneuver_type = "none" + self.turn_direction = TurnDirection.none + self.maneuver_type = "none" else: - # classify maneuver type using selected side if desire_enabled and side is not None: new_type = classify_maneuver_type( - blinker_state=blinker_state, - carstate=carstate, - side=side, + blinker_state=blinker_state, carstate=carstate, side=side, turn_desire_state=self.turn_desire_state, - atc_type=self.atc_type, - old_type=self.maneuver_type, + atc_type=self.atc_type, old_type=self.maneuver_type, ) else: new_type = "none" - # switching rules if self.maneuver_type == "lane_change" and new_type == "turn" and self.lane_change_state not in ( - LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting - ): - self.maneuver_type = "turn" + LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting): + self.maneuver_type = "turn" self.lane_change_state = LaneChangeState.off elif self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): self.maneuver_type = new_type @@ -379,16 +351,17 @@ class DesireHelper: if desire_enabled and self.maneuver_type == "turn" and self.enable_turn_desires: self.lane_change_state = LaneChangeState.off if self.turn_disable_count > 0: - self.turn_direction = TurnDirection.none + self.turn_direction = TurnDirection.none self.lane_change_direction = LaneChangeDirection.none else: - self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight + self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight self.lane_change_direction = self.turn_direction # ─ Lane change FSM ─ else: self.turn_direction = TurnDirection.none + # ── off 상태 ────────────────────────────────────────────── if self.lane_change_state == LaneChangeState.off: reentry = ( not self.prev_desire_enabled or @@ -413,8 +386,6 @@ class DesireHelper: lane_exist_counter_side = side.lane_exist_count.counter lane_change_available_geom = side.lane_change_available_geom - self.auto_lane_change_enable = False if (lane_exist_counter_side > 0 or lane_change_available_geom) else True - self.next_lane_change = False if avail_just_cleared: self.auto_lane_change_enable = True @@ -424,13 +395,13 @@ class DesireHelper: # ── preLaneChange 상태 ─────────────────────────────────── elif self.lane_change_state == LaneChangeState.preLaneChange: if side is None: - self.lane_change_state = LaneChangeState.off + self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none + else: self.lane_change_direction = LaneChangeDirection.left if blinker_state == BLINKER_LEFT else LaneChangeDirection.right - # torque direction cond - torque_cond = (carstate.steeringTorque > 0) if blinker_state == BLINKER_LEFT else (carstate.steeringTorque < 0) + torque_cond = (carstate.steeringTorque > 0) if blinker_state == BLINKER_LEFT else (carstate.steeringTorque < 0) torque_applied = carstate.steeringPressed and torque_cond bsd_detected = side.bsd_hold_counter > 0 @@ -458,7 +429,7 @@ class DesireHelper: self.auto_lane_change_enable = True if not desire_enabled or below_lane_change_speed: - self.lane_change_state = LaneChangeState.off + self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.auto_lane_change_enable = False @@ -520,6 +491,8 @@ class DesireHelper: avail = side.lane_change_available_hold if side is not None else False if not avail: + # 방향을 none으로 지우기 전에 완료된 방향을 먼저 안전하게 저장합니다. + self.completed_direction = self.lane_change_direction self.lane_change_direction = LaneChangeDirection.none self.lane_change_state = LaneChangeState.laneChangeFinishing else: @@ -531,7 +504,9 @@ class DesireHelper: elif self.lane_change_state == LaneChangeState.laneChangeFinishing: self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) if self.lane_change_ll_prob > 0.99: - self.completed_direction = self.lane_change_direction + # 다른 예외 조건(prob 감소 등)으로 진입했을 때를 위해 안전장치 추가 + if self.lane_change_direction != LaneChangeDirection.none: + self.completed_direction = self.lane_change_direction self.lane_change_direction = LaneChangeDirection.none self.lane_change_state = LaneChangeState.off @@ -547,30 +522,30 @@ class DesireHelper: else: self.lane_change_timer += DT_MDL - # commit last per-side + # ── commit last ────────────────────────────────────────────── self.left.commit_last() self.right.commit_last() self.prev_desire_enabled = desire_enabled - # 반대 방향 토크로 cancel (기존 유지) + # ── 반대 방향 토크 cancel ──────────────────────────────────── steering_pressed_cancel = carstate.steeringPressed and ( (carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or (carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT) ) if steering_pressed_cancel and self.lane_change_state != LaneChangeState.off: self.lane_change_direction = LaneChangeDirection.none - self.lane_change_state = LaneChangeState.off - self.blinker_ignore = True + self.lane_change_state = LaneChangeState.off + self.blinker_ignore = True - # final desire + # ── final desire ───────────────────────────────────────────── if self.turn_direction != TurnDirection.none: - self.desire = TURN_DESIRES[self.turn_direction] + self.desire = TURN_DESIRES[self.turn_direction] self.lane_change_direction = self.turn_direction else: self.desire = DESIRES[self.lane_change_direction][self.lane_change_state] - # keep pulse + # ── keep pulse ─────────────────────────────────────────────── if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting): self.keep_pulse_timer = 0.0 elif self.lane_change_state == LaneChangeState.preLaneChange: diff --git a/selfdrive/controls/lib/desire_lib/side_state.py b/selfdrive/controls/lib/desire_lib/side_state.py index 6bbc8c14e..d59954597 100644 --- a/selfdrive/controls/lib/desire_lib/side_state.py +++ b/selfdrive/controls/lib/desire_lib/side_state.py @@ -1,10 +1,7 @@ from dataclasses import dataclass, field from collections import deque -from typing import Optional - import numpy as np from openpilot.common.realtime import DT_MDL -from openpilot.common.constants import CV from .lane_math import calculate_lane_width from .hysteresis import ExistCounter @@ -14,47 +11,51 @@ from .hysteresis import ExistCounter class SideState: name: str # "left" / "right" - # lane/edge distances + # ── 차선/도로 경계 거리 ──────────────────────────────────────── lane_width: float = 0.0 lane_width_diff: float = 0.0 dist_to_edge: float = 0.0 dist_to_edge_far: float = 0.0 - # current lane prob (ego lane line prob on that side) + # ── 자차 차선 확률 ───────────────────────────────────────────── cur_prob: float = 1.0 current_lane_missing: bool = False - # counters + # ── 히스테리시스 카운터 ──────────────────────────────────────── lane_exist_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) lane_width_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) - edge_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) + edge_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) - # availability - lane_available: bool = False - edge_available: bool = False + # ── 차선/경계 가용 여부 ──────────────────────────────────────── + lane_available: bool = False + edge_available: bool = False + lane_change_available_last: bool = False - # smoothing + # ── 차선 폭 스무딩 ───────────────────────────────────────────── lane_width_queue: deque = field(default_factory=lambda: deque(maxlen=int(1.0 / DT_MDL))) + lane_width_sum: float = 0.0 - # lane line info - lane_line_info_raw: int = 0 - lane_line_info_mod: int = 0 - last_lane_line_mod: int = 0 + # ── 차선선 정보 ──────────────────────────────────────────────── + lane_line_info_raw: int = 0 + lane_line_info_mod: int = 0 + last_lane_line_mod: int = 0 lane_line_info_edge_detect: bool = False - # transitions - lane_available_last: bool = False - edge_available_last: bool = False + # ── 상태 전환 ────────────────────────────────────────────────── + lane_available_last: bool = False + edge_available_last: bool = False lane_available_trigger: bool = False - lane_appeared: bool = False + lane_appeared: bool = False - # obstacles - object_detected_count: int = 0 - side_object_detected: bool = False + # ── 측방 장애물 감지 ─────────────────────────────────────────── + object_detected_count: int = 0 + side_object_detected: bool = False + object_clear_count: int = 0 - # BSD hold (after detection) - bsd_hold_counter: int = 0 + # ── BSD hold ─────────────────────────────────────────────────── + bsd_hold_counter: int = 0 bsd_detected_now: bool = False + bsd_clear_count: int = field(default_factory=lambda: int(10.0 / DT_MDL)) # ── 차선 변경 가능 여부 (용도별 3종) ────────────────────────── # lane_change_available_geom : 기하학적 조건만 (차선 폭·도로 경계) @@ -104,84 +105,179 @@ class SideState: self.lane_exist_count.update(bool(lane_valid)) - # running mean (O(1)) + # running mean O(1) if len(self.lane_width_queue) == self.lane_width_queue.maxlen: self.lane_width_sum -= self.lane_width_queue.popleft() self.lane_width_queue.append(lane_w) self.lane_width_sum += lane_w self.lane_width = self.lane_width_sum / len(self.lane_width_queue) - self.lane_width_diff = (self.lane_width_queue[-1] - self.lane_width_queue[0]) if len(self.lane_width_queue) >= 2 else 0.0 - - self.dist_to_edge = float(dist_edge) + self.lane_width_diff = ( + (self.lane_width_queue[-1] - self.lane_width_queue[0]) + if len(self.lane_width_queue) >= 2 else 0.0 + ) + self.dist_to_edge = float(dist_edge) self.dist_to_edge_far = float(dist_edge_far) min_lane_width = 2.5 self.lane_width_count.update(self.lane_width > min_lane_width) self.edge_count.update(self.dist_to_edge > min_lane_width) - available_count = int(0.2 / DT_MDL) - self.lane_available = self.lane_width_count.counter > available_count - self.edge_available = (self.edge_count.counter > available_count) and (self.dist_to_edge_far > min_lane_width) + available_count = int(0.2 / DT_MDL) + self.lane_available = self.lane_width_count.counter > available_count + self.edge_available = ( + (self.edge_count.counter > available_count) and + (self.dist_to_edge_far > min_lane_width) + ) - self.cur_prob = float(cur_prob) + self.cur_prob = float(cur_prob) self.current_lane_missing = self.cur_prob < 0.3 + def update_lane_line_info(self, lane_line_info_raw: int): self.lane_line_info_raw = int(lane_line_info_raw) mod = self.lane_line_info_raw % 10 - # edge_detect: 0/5�� �ٲ�� ���� (������ ��/�찡 ���� self.lane_line_info ������ ���׼�) - self.lane_line_info_edge_detect = (mod in (0, 5)) and (self.last_lane_line_mod not in (0, 5)) + # 0/5 로 바뀌는 순간만 edge_detect = True + self.lane_line_info_edge_detect = ( + (mod in (0, 5)) and (self.last_lane_line_mod not in (0, 5)) + ) self.last_lane_line_mod = mod self.lane_line_info_mod = mod + + # ════════════════════════════════════════════════════════════════ + # 측방 장애물 / BSD 업데이트 + # ════════════════════════════════════════════════════════════════ def update_obstacles(self, - v_ego: float, - radar_obj, # radarState.leadLeft / leadRight - blindspot: bool, # carstate.leftBlindspot/rightBlindspot - ignore_bsd: bool, - bsd_hold_sec: float = 2.0): - # object_detected (radar ���) - if radar_obj is not None and radar_obj.status: - d = radar_obj.dRel - v = radar_obj.vLead - side_object_dist = d + v * 4.0 - else: - side_object_dist = 255.0 + v_ego: float, + radar_obj, + blindspot: bool, + ignore_bsd: bool, + bsd_hold_sec: float = 2.5, + side_gap_margin: float = 3.0, + corner_long_dist_f: float = 0.0, + corner_long_dist_r: float = 0.0, + corner_lat_dist: float = 0.0, + object_clear_sec: float = 0.5): + gap = float(np.clip(side_gap_margin, 1.0, 6.0)) + + # ── 1) 사이드 레이더 (leadLeft / leadRight) + radar_detected = self._radar_block(radar_obj, v_ego, gap) + + # ── 2) 측전방 코너 레이더 + front_detected = self._corner_block_front(corner_long_dist_f, corner_lat_dist, v_ego, gap) + + # ── 3) 측후방 코너 레이더 + rear_detected = self._corner_block_rear(corner_long_dist_r, v_ego, gap) + + # ── 4) 코너 레이더 활성 여부 (거리값 유효 여부로 판단) + corner_radar_active = (corner_long_dist_f > 0) or (corner_long_dist_r > 0) + self.corner_radar_active = corner_radar_active + + # ── 5) BSD → object 연동 + bsd_now = bool(blindspot) and (not ignore_bsd) + + bsd_as_object = bsd_now + + object_detected = radar_detected or front_detected or rear_detected or bsd_as_object + + # ── 6) 디바운싱 + CLEAR_FRAMES = max(1, int(object_clear_sec / DT_MDL)) - object_detected = side_object_dist < (v_ego * 3.0) if object_detected: - self.object_detected_count = max(1, self.object_detected_count + 1) + self.object_detected_count = 1 + self.object_clear_count = 0 + self.side_object_detected = True else: - self.object_detected_count = min(-1, self.object_detected_count - 1) + self.object_clear_count += 1 + if self.object_clear_count >= CLEAR_FRAMES: + self.object_detected_count = 0 + self.side_object_detected = False - self.side_object_detected = self.object_detected_count > int(-0.3 / DT_MDL) + # ── 7) BSD hold + self.bsd_detected_now = bsd_now - # BSD hold (�䱸����: ���� �� 2�� ����) - self.bsd_detected_now = bool(blindspot) - if self.bsd_detected_now and not ignore_bsd: - self.bsd_hold_counter = int(bsd_hold_sec / DT_MDL) + # 코너 레이더 없는 차량은 hold 시간을 늘려 보수적으로 동작 + effective_hold_sec = bsd_hold_sec if corner_radar_active else max(bsd_hold_sec, 3.5) + + if self.bsd_detected_now: + self.bsd_hold_counter = int(effective_hold_sec / DT_MDL) + self.bsd_clear_count = 0 else: - self.bsd_hold_counter = max(0, self.bsd_hold_counter - 1) + if self.bsd_hold_counter > 0: + self.bsd_hold_counter -= 1 + else: + self.bsd_clear_count += 1 - def compute_lane_change_available(self, lane_line_info_lt_20: bool, ignore_bsd: bool): - # geometric availability - self.lane_change_available_geom = (self.lane_available or self.edge_available) and lane_line_info_lt_20 - # include bsd/object into lane_change_available (�䱸����) - bsd_active = (self.bsd_hold_counter > 0) and (not ignore_bsd) - self.lane_change_available = self.lane_change_available_geom and (not self.side_object_detected) and (not bsd_active) + # ════════════════════════════════════════════════════════════════ + # 차선 변경 가능 여부 판단 + # ════════════════════════════════════════════════════════════════ + + def compute_lane_change_available(self, + lane_line_info_lt_20: bool, + bsd_level: int, + bsd_clear_sec: float = 1.0): + BSD_CLEAR_FRAMES = max(1, int(bsd_clear_sec / DT_MDL)) + + self.lane_change_available_geom = ( + (self.lane_available or self.edge_available) and lane_line_info_lt_20 + ) + + ignore_bsd = (bsd_level < 0) + bsd_stabilized = ( + (self.bsd_hold_counter == 0) and + (self.bsd_clear_count >= BSD_CLEAR_FRAMES) + ) + bsd_active = (not bsd_stabilized) and (not ignore_bsd) + + # 완전 가능: geom + 장애물 없음 + BSD 없음 + self.lane_change_available = ( + self.lane_change_available_geom + and (not self.side_object_detected) + and (not bsd_active) + ) + + # BSD 무시 버전: geom + 장애물 없음 + self.lane_change_available_no_bsd = ( + self.lane_change_available_geom + and (not self.side_object_detected) + ) + + # hold: geometry flicker 완화용, 장애물/BSD는 즉시 차단 + instant_block = self.side_object_detected or ((bsd_level >= 1) and bsd_active) + hold_base = ( + self.lane_change_available if (bsd_level >= 1) + else self.lane_change_available_no_bsd + ) + avail_false_thresh = int(0.3 / DT_MDL) + + if instant_block: + self.avail_false_count = avail_false_thresh + self.lane_change_available_hold = False + elif hold_base: + self.avail_false_count = 0 + self.lane_change_available_hold = True + else: + self.avail_false_count += 1 + if self.avail_false_count >= avail_false_thresh: + self.lane_change_available_hold = False + + + # ════════════════════════════════════════════════════════════════ + # 트리거 / 커밋 + # ════════════════════════════════════════════════════════════════ def update_triggers(self): - # lane_available_trigger (���� ���� ����) - self.lane_available_trigger = False - if self.lane_width_diff > 0.8 and (self.lane_width < self.dist_to_edge): - self.lane_available_trigger = True - - # lane_appeared (bugfix: == ���� >=�� �ڿ�������) - # + edge�� �ʹ� �ָ�(������) lane_appeared�� �����ϰ� true�� ������ �ʰ� ���� - appeared_now = self.lane_exist_count.counter >= int(0.2 / DT_MDL) - self.lane_appeared = (self.lane_appeared or appeared_now) and (self.dist_to_edge < 4.0) + self.lane_available_trigger = ( + self.lane_width_diff > 0.8 and + self.lane_width < self.dist_to_edge + ) + appeared_now = self.lane_exist_count.counter >= int(0.2 / DT_MDL) + self.lane_appeared = ( + (self.lane_appeared or appeared_now) and + (self.dist_to_edge < 4.0) + ) def commit_last(self): self.lane_available_last = self.lane_available @@ -358,4 +454,3 @@ class SideState: safety_block = d_cur < safety_th return dist_block or ttc_block or safety_block or future_block -