Update desire helper

Update line ok

Fix corner long distance variables for left and right

Add new persistent parameters to params_keys.h

Added new persistent parameters for side gap margin, BSD hold time, and object clear time.

Refactor lane change logic in desire_helper.py

Remove unnecessary reset of lane change completion flag and add safety check for completed direction.
This commit is contained in:
janpoo6427
2026-06-05 14:11:40 +09:00
parent cdd2d36971
commit 2a39a83068
4 changed files with 255 additions and 178 deletions

View File

@@ -304,6 +304,12 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"SideObjectDetectDisplay", {PERSISTENT, INT, "0"}},
{"MaxAngleFrames", {PERSISTENT, INT, "89"}},
{"SideGapMargin", {PERSISTENT, INT, "30"}},
{"BsdHoldSec", {PERSISTENT, INT, "25"}},
{"BsdClearSec", {PERSISTENT, INT, "10"}},
{"ObjectClearSec", {PERSISTENT, INT, "5"}},
{"SoftHoldMode", {PERSISTENT, INT, "0"}},
{"SoftHoldMode", {PERSISTENT, INT, "0"}},
{"LatMpcPathCost", {PERSISTENT, INT, "200"}},

View File

@@ -788,12 +788,13 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control,
values["LANELINE_CURVATURE"] = (min(abs(curvature), 15) + (-1 if curvature < 0 else 0)) if lat_active else 0
values["LANELINE_CURVATURE_DIRECTION"] = 1 if curvature < 0 and lat_active else 0
lane_color = 6 if md is not None and md.meta.laneChangeAvailableLeft else 2
# LANELINE 경고 판단 (CAN raw 기준)
if lane_line_check >= 1:
lane_line_warn_left = CS.out.leftLaneLine % 10 not in (0, 5) # 실선이면 주황
lane_line_warn_left = CS.out.leftLaneLine % 10 not in (0, 5)
lane_line_warn_right = CS.out.rightLaneLine % 10 not in (0, 5)
else:
lane_line_warn_left = CS.out.leftLaneLine >= 20 # 황색(10~)/주황(20~) 차단
lane_line_warn_right = CS.out.rightLaneLine >= 20
lane_line_warn_left = CS.out.leftLaneLine // 10 == 2 # 황색(10~)/주황(20~) 차단
lane_line_warn_right = CS.out.rightLaneLine // 10 == 2
lca_avail_left = (md is not None and md.meta.laneChangeAvailableLeft)
lca_avail_right = (md is not None and md.meta.laneChangeAvailableRight)
@@ -811,7 +812,7 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control,
if hud_control.leftLaneDepart:
values["LANELINE_LEFT"] = 4 if (frame // 50) % 2 == 0 else 1
else:
values["LANELINE_LEFT"] = lane_color if hud_control.leftLaneVisible else 0
values["LANELINE_LEFT"] = lane_color
# 오른쪽
if lca_avail_right:
@@ -819,18 +820,18 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control,
elif lane_line_warn_right or CS.out.rightBlindspot or (side_object_right and side_object_detect_display):
lane_color = 4
else:
lane_line_warn_right = CS.out.rightLaneLine // 10 == 2
lane_color = 4 if lane_line_warn_right or CS.out.rightBlindspot else lane_color
lane_color = 2
if hud_control.rightLaneDepart:
values["LANELINE_RIGHT"] = 4 if (frame // 50) % 2 == 0 else 1
else:
values["LANELINE_RIGHT"] = lane_color if hud_control.rightLaneVisible else 0
values["LANELINE_RIGHT"] = lane_color
values["LCA_LEFT_ARROW"] = 2 if CS.out.leftBlinker else 0
values["LCA_RIGHT_ARROW"] = 2 if CS.out.rightBlinker else 0
values["LCA_LEFT_ICON"] = (1 if CS.out.leftBlindspot else 2) if lat_active else 0
values["LCA_RIGHT_ICON"] = (1 if CS.out.rightBlindspot else 2) if lat_active else 0
values["LCA_LEFT_ICON"] = (1 if not lca_avail_left else 2) if lat_active else 1 if lat_enabled else 0
values["LCA_RIGHT_ICON"] = (1 if not lca_avail_right else 2) if lat_active else 1 if lat_enabled else 0
values["LANE_LEFT"] = 1 if desire in (1, 3) else 0
values["LANE_RIGHT"] = 1 if desire in (2, 4) else 0

View File

@@ -18,7 +18,6 @@ class DesireHelper:
self.params = Params()
self.frame = 0
# FSM core
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_timer = 0.0
@@ -33,24 +32,20 @@ class DesireHelper:
self.desire_disable_count = 0
self.turn_disable_count = 0
# per-side states
self.left = SideState("left")
self.right = SideState("right")
# blinker/ATC state (원본 변수들 유지)
self.blinker_ignore = False
self.driver_blinker_state = BLINKER_NONE
self.carrot_blinker_state = BLINKER_NONE
self.carrot_lane_change_count = 0
self.carrot_cmd_index_last = 0
self.atc_type = ""
self.atc_active = 0 # 0: 없음, 1: ATC 동작, 2: 충돌
self.atc_active = 0
# auto lane change
self.auto_lane_change_enable = False
self.unsafe_cancel_timer = 0.0
# keep pulse
self.keep_pulse_timer = 0.0
self.lane_change_need_torque = 0
@@ -59,12 +54,15 @@ class DesireHelper:
self.lane_change_delay = 0.0
self.model_turn_speed_factor = 0.0
self.model_turn_speed = 200.0
self.side_gap_margin = 3.0
self.bsd_clear_sec = 0.0
self.object_clear_sec = 0.3
self.bsd_hold_sec = 1.5
self.ignore_bsd = False
# misc
self.prev_desire_enabled = False
self.desire_log = ""
# externally readable flags
self.lane_change_available_left = False
self.lane_change_available_right = False
@@ -76,8 +74,6 @@ class DesireHelper:
self.lane_change_cooldown_timer = 0.0
self.lane_appeared_block_timer = 0.0
# ─────────────────────────────────────────────
# params/model
# ─────────────────────────────────────────────
def _update_params_periodic(self):
if self.frame % 100 == 0:
@@ -103,24 +99,19 @@ class DesireHelper:
def _check_desire_state(self, modeldata, carstate, maneuver_type):
desire_state = modeldata.meta.desireState
orientation_rate = abs(modeldata.orientationRate.z[5])
orientation_rate = abs(modeldata.orientationRate.z[5])
orientation_rate_future = abs(modeldata.orientationRate.z[15])
self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1
if maneuver_type == "turn" and abs(carstate.steeringAngleDeg) > 80 and orientation_rate_future < orientation_rate:
self.turn_disable_count = int(10.0 / DT_MDL)
else:
self.turn_disable_count = max(0, self.turn_disable_count - 1)
# ─────────────────────────────────────────────
# blinkers/ATC (원본 로직 유지, side 계산은 별개)
# ─────────────────────────────────────────────
def _update_driver_blinker(self, carstate):
st = carstate.leftBlinker * 1 + carstate.rightBlinker * 2
changed = st != self.driver_blinker_state
self.driver_blinker_state = st
enabled = st in (BLINKER_LEFT, BLINKER_RIGHT)
if self.lane_change_need_torque < 0:
enabled = False
@@ -130,86 +121,87 @@ class DesireHelper:
atc_type = carrotMan.atcType
atc_blinker_state = BLINKER_NONE
# 유지 카운트는 DesireHelper에서 관리
if self.carrot_lane_change_count > 0:
atc_blinker_state = self.carrot_blinker_state
elif carrotMan.carrotCmdIndex != self.carrot_cmd_index_last and carrotMan.carrotCmd == "LANECHANGE":
self.carrot_cmd_index_last = carrotMan.carrotCmdIndex
self.carrot_cmd_index_last = carrotMan.carrotCmdIndex
self.carrot_lane_change_count = int(0.2 / DT_MDL)
self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT
atc_blinker_state = self.carrot_blinker_state
self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT
atc_blinker_state = self.carrot_blinker_state
elif atc_type in ("turn left", "turn right"):
if self.atc_active != 2:
atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT
self.atc_active = 1
self.blinker_ignore = False
atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT
self.atc_active = 1
self.blinker_ignore = False
elif atc_type in ("fork left", "fork right", "atc left", "atc right"):
if self.atc_active != 2:
atc_blinker_state = BLINKER_LEFT if atc_type in ("fork left", "atc left") else BLINKER_RIGHT
self.atc_active = 1
self.atc_active = 1
else:
self.atc_active = 0
# 충돌 시 ATC 무효
if driver_blinker_state != BLINKER_NONE and atc_blinker_state != BLINKER_NONE and driver_blinker_state != atc_blinker_state:
atc_blinker_state = BLINKER_NONE
self.atc_active = 2
self.atc_active = 2
atc_desire_enabled = atc_blinker_state in (BLINKER_LEFT, BLINKER_RIGHT)
# blinker_ignore
if driver_blinker_state == BLINKER_NONE:
self.blinker_ignore = False
if self.blinker_ignore:
atc_blinker_state = BLINKER_NONE
atc_blinker_state = BLINKER_NONE
atc_desire_enabled = False
# 타입 변경 1프레임 무시
if self.atc_type != atc_type:
atc_desire_enabled = False
self.atc_type = atc_type
return atc_blinker_state, atc_desire_enabled
# ─────────────────────────────────────────────
# per-side processing (핵심: 좌/우 모두 매 프레임 계산)
# ─────────────────────────────────────────────
def _process_sides(self, carstate, modeldata, radarState):
# geometry (좌/우)
# left: outer laneLines[0], current laneLines[1], edge[0], cur_prob laneLineProbs[1]
self.left.update_lane_geometry(
modeldata.laneLines[0], modeldata.laneLineProbs[0],
modeldata.laneLines[1],
modeldata.roadEdges[0],
modeldata.laneLines[1], modeldata.roadEdges[0],
cur_prob=modeldata.laneLineProbs[1],
)
# right: outer laneLines[3], current laneLines[2], edge[1], cur_prob laneLineProbs[2]
self.right.update_lane_geometry(
modeldata.laneLines[3], modeldata.laneLineProbs[3],
modeldata.laneLines[2],
modeldata.roadEdges[1],
modeldata.laneLines[2], modeldata.roadEdges[1],
cur_prob=modeldata.laneLineProbs[2],
)
# lane line info (HUD용 raw는 기존대로 leftLaneLine/rightLaneLine)
self.left.update_lane_line_info(carstate.leftLaneLine)
self.right.update_lane_line_info(carstate.rightLaneLine)
# BSD 설정
ignore_bsd = (self.laneChangeBsd < 0)
# obstacles
v_ego = carstate.vEgo
self.left.update_obstacles(v_ego, radarState.leadLeft, carstate.leftBlindspot, ignore_bsd, bsd_hold_sec=2.0)
self.right.update_obstacles(v_ego, radarState.leadRight, carstate.rightBlindspot, ignore_bsd, bsd_hold_sec=2.0)
self.left.update_obstacles(
v_ego, radarState.leadLeft, carstate.leftBlindspot, self.ignore_bsd,
bsd_hold_sec = self.bsd_hold_sec,
side_gap_margin = self.side_gap_margin,
corner_long_dist_f = float(carstate.leftLongDist),
corner_long_dist_r = float(carstate.leftRearLongDist),
corner_lat_dist = float(carstate.leftLatDist),
object_clear_sec = self.object_clear_sec,
)
self.right.update_obstacles(
v_ego, radarState.leadRight, carstate.rightBlindspot, self.ignore_bsd,
bsd_hold_sec = self.bsd_hold_sec,
side_gap_margin = self.side_gap_margin,
corner_long_dist_f = float(carstate.rightLongDist),
corner_long_dist_r = float(carstate.rightRearLongDist),
corner_lat_dist = float(carstate.rightLatDist),
object_clear_sec = self.object_clear_sec,
)
# compute available (include BSD+object)
if self.lane_line_check >= 1:
left_line_ok = self.left.lane_line_info_mod in (0, 5)
right_line_ok = self.right.lane_line_info_mod in (0, 5)
else:
left_line_ok = self.left.lane_line_info_raw < 20
right_line_ok = self.right.lane_line_info_raw < 20
left_line_ok = self.left.lane_line_info_raw // 10 != 2
right_line_ok = self.right.lane_line_info_raw // 10 != 2
self.left.compute_lane_change_available(lane_line_info_lt_20=left_line_ok, bsd_level=self.lane_change_bsd, bsd_clear_sec=self.bsd_clear_sec)
self.right.compute_lane_change_available(lane_line_info_lt_20=right_line_ok, bsd_level=self.lane_change_bsd, bsd_clear_sec=self.bsd_clear_sec)
@@ -217,22 +209,18 @@ class DesireHelper:
self.left.update_triggers()
self.right.update_triggers()
# externally readable
self.lane_change_available_left = self.left.lane_change_available
self.lane_change_available_left = self.left.lane_change_available
self.lane_change_available_right = self.right.lane_change_available
def _get_selected_side(self, blinker_state: int) -> SideState:
return self.left if blinker_state == BLINKER_LEFT else self.right
# ─────────────────────────────────────────────
# main update
# ─────────────────────────────────────────────
def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMan, radarState):
self.frame += 1
self._update_params_periodic()
self._make_model_turn_speed(modeldata)
# counts
self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1)
self.lane_change_delay_timer = max(0.0, self.lane_change_delay_timer - DT_MDL)
self.unsafe_cancel_timer = max(0.0, self.unsafe_cancel_timer - DT_MDL)
@@ -244,18 +232,14 @@ class DesireHelper:
v_ego = carstate.vEgo
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
# per-side compute (좌/우 모두)
self._process_sides(carstate, modeldata, radarState)
# desire state from model
self._check_desire_state(modeldata, carstate, self.maneuver_type)
# blinkers
driver_st, driver_changed, driver_enabled = self._update_driver_blinker(carstate)
atc_st, atc_enabled = self._update_atc_blinker(carrotMan, driver_st)
desire_enabled = driver_enabled or atc_enabled
blinker_state = driver_st if driver_enabled else atc_st
blinker_state = driver_st if driver_enabled else atc_st
# ── 깜빡이 노이즈 필터링 및 반대 방향 전환 감지 ──
if desire_enabled:
@@ -264,11 +248,6 @@ class DesireHelper:
(blinker_state == BLINKER_RIGHT and self.completed_direction == LaneChangeDirection.left):
self.lane_change_completed_on_blinker = False
# 깜빡이가 새로 켜진 순간(재입력 의사 발생) 영구 락아웃을 방지하기 위해
# 완료 플래그는 해제해 주되, 쿨다운 타이머(cooldown_timer)는 그대로 유지하여 안전을 검증합니다.
if not self.prev_desire_enabled:
self.lane_change_completed_on_blinker = False
else:
self.blinker_off_timer += DT_MDL
if self.blinker_off_timer > 0.5:
@@ -320,12 +299,11 @@ class DesireHelper:
# ── auto lane change trigger ──────────────────────────────────
auto_lane_change_trigger = False
if desire_enabled and side is not None:
# carrot_lane_change_count>0이면 강제 허용
if self.carrot_lane_change_count > 0:
auto_lane_change_trigger = side.lane_change_available
else:
# 기존 조건: edge_available + (trigger or appeared) + not side_object_detected
auto_lane_change_trigger = (
self.auto_lane_change_enable and
(not atc_lane_change_manual_only) and
@@ -341,36 +319,30 @@ class DesireHelper:
# ───────────────────────── FSM ─────────────────────────
if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
self.lane_change_state = LaneChangeState.off
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.turn_direction = TurnDirection.none
self.maneuver_type = "none"
self.turn_direction = TurnDirection.none
self.maneuver_type = "none"
elif self.desire_disable_count > 0:
self.lane_change_state = LaneChangeState.off
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.turn_direction = TurnDirection.none
self.maneuver_type = "none"
self.turn_direction = TurnDirection.none
self.maneuver_type = "none"
else:
# classify maneuver type using selected side
if desire_enabled and side is not None:
new_type = classify_maneuver_type(
blinker_state=blinker_state,
carstate=carstate,
side=side,
blinker_state=blinker_state, carstate=carstate, side=side,
turn_desire_state=self.turn_desire_state,
atc_type=self.atc_type,
old_type=self.maneuver_type,
atc_type=self.atc_type, old_type=self.maneuver_type,
)
else:
new_type = "none"
# switching rules
if self.maneuver_type == "lane_change" and new_type == "turn" and self.lane_change_state not in (
LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting
):
self.maneuver_type = "turn"
LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting):
self.maneuver_type = "turn"
self.lane_change_state = LaneChangeState.off
elif self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange):
self.maneuver_type = new_type
@@ -379,16 +351,17 @@ class DesireHelper:
if desire_enabled and self.maneuver_type == "turn" and self.enable_turn_desires:
self.lane_change_state = LaneChangeState.off
if self.turn_disable_count > 0:
self.turn_direction = TurnDirection.none
self.turn_direction = TurnDirection.none
self.lane_change_direction = LaneChangeDirection.none
else:
self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight
self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight
self.lane_change_direction = self.turn_direction
# ─ Lane change FSM ─
else:
self.turn_direction = TurnDirection.none
# ── off 상태 ──────────────────────────────────────────────
if self.lane_change_state == LaneChangeState.off:
reentry = (
not self.prev_desire_enabled or
@@ -413,8 +386,6 @@ class DesireHelper:
lane_exist_counter_side = side.lane_exist_count.counter
lane_change_available_geom = side.lane_change_available_geom
self.auto_lane_change_enable = False if (lane_exist_counter_side > 0 or lane_change_available_geom) else True
self.next_lane_change = False
if avail_just_cleared:
self.auto_lane_change_enable = True
@@ -424,13 +395,13 @@ class DesireHelper:
# ── preLaneChange 상태 ───────────────────────────────────
elif self.lane_change_state == LaneChangeState.preLaneChange:
if side is None:
self.lane_change_state = LaneChangeState.off
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
else:
self.lane_change_direction = LaneChangeDirection.left if blinker_state == BLINKER_LEFT else LaneChangeDirection.right
# torque direction cond
torque_cond = (carstate.steeringTorque > 0) if blinker_state == BLINKER_LEFT else (carstate.steeringTorque < 0)
torque_cond = (carstate.steeringTorque > 0) if blinker_state == BLINKER_LEFT else (carstate.steeringTorque < 0)
torque_applied = carstate.steeringPressed and torque_cond
bsd_detected = side.bsd_hold_counter > 0
@@ -458,7 +429,7 @@ class DesireHelper:
self.auto_lane_change_enable = True
if not desire_enabled or below_lane_change_speed:
self.lane_change_state = LaneChangeState.off
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.auto_lane_change_enable = False
@@ -520,6 +491,8 @@ class DesireHelper:
avail = side.lane_change_available_hold if side is not None else False
if not avail:
# 방향을 none으로 지우기 전에 완료된 방향을 먼저 안전하게 저장합니다.
self.completed_direction = self.lane_change_direction
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_state = LaneChangeState.laneChangeFinishing
else:
@@ -531,7 +504,9 @@ class DesireHelper:
elif self.lane_change_state == LaneChangeState.laneChangeFinishing:
self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0)
if self.lane_change_ll_prob > 0.99:
self.completed_direction = self.lane_change_direction
# 다른 예외 조건(prob 감소 등)으로 진입했을 때를 위해 안전장치 추가
if self.lane_change_direction != LaneChangeDirection.none:
self.completed_direction = self.lane_change_direction
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_state = LaneChangeState.off
@@ -547,30 +522,30 @@ class DesireHelper:
else:
self.lane_change_timer += DT_MDL
# commit last per-side
# ── commit last ──────────────────────────────────────────────
self.left.commit_last()
self.right.commit_last()
self.prev_desire_enabled = desire_enabled
# 반대 방향 토크 cancel (기존 유지)
# ── 반대 방향 토크 cancel ────────────────────────────────────
steering_pressed_cancel = carstate.steeringPressed and (
(carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or
(carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT)
)
if steering_pressed_cancel and self.lane_change_state != LaneChangeState.off:
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_state = LaneChangeState.off
self.blinker_ignore = True
self.lane_change_state = LaneChangeState.off
self.blinker_ignore = True
# final desire
# ── final desire ─────────────────────────────────────────────
if self.turn_direction != TurnDirection.none:
self.desire = TURN_DESIRES[self.turn_direction]
self.desire = TURN_DESIRES[self.turn_direction]
self.lane_change_direction = self.turn_direction
else:
self.desire = DESIRES[self.lane_change_direction][self.lane_change_state]
# keep pulse
# ── keep pulse ───────────────────────────────────────────────
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting):
self.keep_pulse_timer = 0.0
elif self.lane_change_state == LaneChangeState.preLaneChange:

View File

@@ -1,10 +1,7 @@
from dataclasses import dataclass, field
from collections import deque
from typing import Optional
import numpy as np
from openpilot.common.realtime import DT_MDL
from openpilot.common.constants import CV
from .lane_math import calculate_lane_width
from .hysteresis import ExistCounter
@@ -14,47 +11,51 @@ from .hysteresis import ExistCounter
class SideState:
name: str # "left" / "right"
# lane/edge distances
# ── 차선/도로 경계 거리 ────────────────────────────────────────
lane_width: float = 0.0
lane_width_diff: float = 0.0
dist_to_edge: float = 0.0
dist_to_edge_far: float = 0.0
# current lane prob (ego lane line prob on that side)
# ── 자차 차선 확률 ─────────────────────────────────────────────
cur_prob: float = 1.0
current_lane_missing: bool = False
# counters
# ── 히스테리시스 카운터 ────────────────────────────────────────
lane_exist_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2))
lane_width_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2))
edge_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2))
edge_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2))
# availability
lane_available: bool = False
edge_available: bool = False
# ── 차선/경계 가용 여부 ────────────────────────────────────────
lane_available: bool = False
edge_available: bool = False
lane_change_available_last: bool = False
# smoothing
# ── 차선 폭 스무딩 ─────────────────────────────────────────────
lane_width_queue: deque = field(default_factory=lambda: deque(maxlen=int(1.0 / DT_MDL)))
lane_width_sum: float = 0.0
# lane line info
lane_line_info_raw: int = 0
lane_line_info_mod: int = 0
last_lane_line_mod: int = 0
# ── 차선선 정보 ────────────────────────────────────────────────
lane_line_info_raw: int = 0
lane_line_info_mod: int = 0
last_lane_line_mod: int = 0
lane_line_info_edge_detect: bool = False
# transitions
lane_available_last: bool = False
edge_available_last: bool = False
# ── 상태 전환 ──────────────────────────────────────────────────
lane_available_last: bool = False
edge_available_last: bool = False
lane_available_trigger: bool = False
lane_appeared: bool = False
lane_appeared: bool = False
# obstacles
object_detected_count: int = 0
side_object_detected: bool = False
# ── 측방 장애물 감지 ───────────────────────────────────────────
object_detected_count: int = 0
side_object_detected: bool = False
object_clear_count: int = 0
# BSD hold (after detection)
bsd_hold_counter: int = 0
# ── BSD hold ───────────────────────────────────────────────────
bsd_hold_counter: int = 0
bsd_detected_now: bool = False
bsd_clear_count: int = field(default_factory=lambda: int(10.0 / DT_MDL))
# ── 차선 변경 가능 여부 (용도별 3종) ──────────────────────────
# lane_change_available_geom : 기하학적 조건만 (차선 폭·도로 경계)
@@ -104,84 +105,179 @@ class SideState:
self.lane_exist_count.update(bool(lane_valid))
# running mean (O(1))
# running mean O(1)
if len(self.lane_width_queue) == self.lane_width_queue.maxlen:
self.lane_width_sum -= self.lane_width_queue.popleft()
self.lane_width_queue.append(lane_w)
self.lane_width_sum += lane_w
self.lane_width = self.lane_width_sum / len(self.lane_width_queue)
self.lane_width_diff = (self.lane_width_queue[-1] - self.lane_width_queue[0]) if len(self.lane_width_queue) >= 2 else 0.0
self.dist_to_edge = float(dist_edge)
self.lane_width_diff = (
(self.lane_width_queue[-1] - self.lane_width_queue[0])
if len(self.lane_width_queue) >= 2 else 0.0
)
self.dist_to_edge = float(dist_edge)
self.dist_to_edge_far = float(dist_edge_far)
min_lane_width = 2.5
self.lane_width_count.update(self.lane_width > min_lane_width)
self.edge_count.update(self.dist_to_edge > min_lane_width)
available_count = int(0.2 / DT_MDL)
self.lane_available = self.lane_width_count.counter > available_count
self.edge_available = (self.edge_count.counter > available_count) and (self.dist_to_edge_far > min_lane_width)
available_count = int(0.2 / DT_MDL)
self.lane_available = self.lane_width_count.counter > available_count
self.edge_available = (
(self.edge_count.counter > available_count) and
(self.dist_to_edge_far > min_lane_width)
)
self.cur_prob = float(cur_prob)
self.cur_prob = float(cur_prob)
self.current_lane_missing = self.cur_prob < 0.3
def update_lane_line_info(self, lane_line_info_raw: int):
self.lane_line_info_raw = int(lane_line_info_raw)
mod = self.lane_line_info_raw % 10
# edge_detect: 0/5<><35> <20>ٲ<EFBFBD><D9B2> <20><><EFBFBD><EFBFBD> (<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><>/<2F><20><><EFBFBD><EFBFBD> self.lane_line_info <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD>׼<EFBFBD>)
self.lane_line_info_edge_detect = (mod in (0, 5)) and (self.last_lane_line_mod not in (0, 5))
# 0/5 로 바뀌는 순간만 edge_detect = True
self.lane_line_info_edge_detect = (
(mod in (0, 5)) and (self.last_lane_line_mod not in (0, 5))
)
self.last_lane_line_mod = mod
self.lane_line_info_mod = mod
# ════════════════════════════════════════════════════════════════
# 측방 장애물 / BSD 업데이트
# ════════════════════════════════════════════════════════════════
def update_obstacles(self,
v_ego: float,
radar_obj, # radarState.leadLeft / leadRight
blindspot: bool, # carstate.leftBlindspot/rightBlindspot
ignore_bsd: bool,
bsd_hold_sec: float = 2.0):
# object_detected (radar <20><><EFBFBD>)
if radar_obj is not None and radar_obj.status:
d = radar_obj.dRel
v = radar_obj.vLead
side_object_dist = d + v * 4.0
else:
side_object_dist = 255.0
v_ego: float,
radar_obj,
blindspot: bool,
ignore_bsd: bool,
bsd_hold_sec: float = 2.5,
side_gap_margin: float = 3.0,
corner_long_dist_f: float = 0.0,
corner_long_dist_r: float = 0.0,
corner_lat_dist: float = 0.0,
object_clear_sec: float = 0.5):
gap = float(np.clip(side_gap_margin, 1.0, 6.0))
# ── 1) 사이드 레이더 (leadLeft / leadRight)
radar_detected = self._radar_block(radar_obj, v_ego, gap)
# ── 2) 측전방 코너 레이더
front_detected = self._corner_block_front(corner_long_dist_f, corner_lat_dist, v_ego, gap)
# ── 3) 측후방 코너 레이더
rear_detected = self._corner_block_rear(corner_long_dist_r, v_ego, gap)
# ── 4) 코너 레이더 활성 여부 (거리값 유효 여부로 판단)
corner_radar_active = (corner_long_dist_f > 0) or (corner_long_dist_r > 0)
self.corner_radar_active = corner_radar_active
# ── 5) BSD → object 연동
bsd_now = bool(blindspot) and (not ignore_bsd)
bsd_as_object = bsd_now
object_detected = radar_detected or front_detected or rear_detected or bsd_as_object
# ── 6) 디바운싱
CLEAR_FRAMES = max(1, int(object_clear_sec / DT_MDL))
object_detected = side_object_dist < (v_ego * 3.0)
if object_detected:
self.object_detected_count = max(1, self.object_detected_count + 1)
self.object_detected_count = 1
self.object_clear_count = 0
self.side_object_detected = True
else:
self.object_detected_count = min(-1, self.object_detected_count - 1)
self.object_clear_count += 1
if self.object_clear_count >= CLEAR_FRAMES:
self.object_detected_count = 0
self.side_object_detected = False
self.side_object_detected = self.object_detected_count > int(-0.3 / DT_MDL)
# ── 7) BSD hold
self.bsd_detected_now = bsd_now
# BSD hold (<28><EFBFBD><E4B1B8><EFBFBD><EFBFBD>: <20><><EFBFBD><EFBFBD> <20><> 2<><32> <20><><EFBFBD><EFBFBD>)
self.bsd_detected_now = bool(blindspot)
if self.bsd_detected_now and not ignore_bsd:
self.bsd_hold_counter = int(bsd_hold_sec / DT_MDL)
# 코너 레이더 없는 차량은 hold 시간을 늘려 보수적으로 동작
effective_hold_sec = bsd_hold_sec if corner_radar_active else max(bsd_hold_sec, 3.5)
if self.bsd_detected_now:
self.bsd_hold_counter = int(effective_hold_sec / DT_MDL)
self.bsd_clear_count = 0
else:
self.bsd_hold_counter = max(0, self.bsd_hold_counter - 1)
if self.bsd_hold_counter > 0:
self.bsd_hold_counter -= 1
else:
self.bsd_clear_count += 1
def compute_lane_change_available(self, lane_line_info_lt_20: bool, ignore_bsd: bool):
# geometric availability
self.lane_change_available_geom = (self.lane_available or self.edge_available) and lane_line_info_lt_20
# include bsd/object into lane_change_available (<28><EFBFBD><E4B1B8><EFBFBD><EFBFBD>)
bsd_active = (self.bsd_hold_counter > 0) and (not ignore_bsd)
self.lane_change_available = self.lane_change_available_geom and (not self.side_object_detected) and (not bsd_active)
# ════════════════════════════════════════════════════════════════
# 차선 변경 가능 여부 판단
# ════════════════════════════════════════════════════════════════
def compute_lane_change_available(self,
lane_line_info_lt_20: bool,
bsd_level: int,
bsd_clear_sec: float = 1.0):
BSD_CLEAR_FRAMES = max(1, int(bsd_clear_sec / DT_MDL))
self.lane_change_available_geom = (
(self.lane_available or self.edge_available) and lane_line_info_lt_20
)
ignore_bsd = (bsd_level < 0)
bsd_stabilized = (
(self.bsd_hold_counter == 0) and
(self.bsd_clear_count >= BSD_CLEAR_FRAMES)
)
bsd_active = (not bsd_stabilized) and (not ignore_bsd)
# 완전 가능: geom + 장애물 없음 + BSD 없음
self.lane_change_available = (
self.lane_change_available_geom
and (not self.side_object_detected)
and (not bsd_active)
)
# BSD 무시 버전: geom + 장애물 없음
self.lane_change_available_no_bsd = (
self.lane_change_available_geom
and (not self.side_object_detected)
)
# hold: geometry flicker 완화용, 장애물/BSD는 즉시 차단
instant_block = self.side_object_detected or ((bsd_level >= 1) and bsd_active)
hold_base = (
self.lane_change_available if (bsd_level >= 1)
else self.lane_change_available_no_bsd
)
avail_false_thresh = int(0.3 / DT_MDL)
if instant_block:
self.avail_false_count = avail_false_thresh
self.lane_change_available_hold = False
elif hold_base:
self.avail_false_count = 0
self.lane_change_available_hold = True
else:
self.avail_false_count += 1
if self.avail_false_count >= avail_false_thresh:
self.lane_change_available_hold = False
# ════════════════════════════════════════════════════════════════
# 트리거 / 커밋
# ════════════════════════════════════════════════════════════════
def update_triggers(self):
# lane_available_trigger (<28><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD>)
self.lane_available_trigger = False
if self.lane_width_diff > 0.8 and (self.lane_width < self.dist_to_edge):
self.lane_available_trigger = True
# lane_appeared (bugfix: == <20><><EFBFBD><EFBFBD> >=<3D><> <20>ڿ<EFBFBD><DABF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>)
# + edge<67><65> <20>ʹ<EFBFBD> <20>ָ<EFBFBD>(<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>) lane_appeared<65><64> <20><><EFBFBD><EFBFBD><EFBFBD>ϰ<EFBFBD> true<75><65> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20>ʰ<EFBFBD> <20><><EFBFBD><EFBFBD>
appeared_now = self.lane_exist_count.counter >= int(0.2 / DT_MDL)
self.lane_appeared = (self.lane_appeared or appeared_now) and (self.dist_to_edge < 4.0)
self.lane_available_trigger = (
self.lane_width_diff > 0.8 and
self.lane_width < self.dist_to_edge
)
appeared_now = self.lane_exist_count.counter >= int(0.2 / DT_MDL)
self.lane_appeared = (
(self.lane_appeared or appeared_now) and
(self.dist_to_edge < 4.0)
)
def commit_last(self):
self.lane_available_last = self.lane_available
@@ -358,4 +454,3 @@ class SideState:
safety_block = d_cur < safety_th
return dist_block or ttc_block or safety_block or future_block