diff --git a/RELEASES.md b/RELEASES.md index cc21fc1b..a5527dac 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,3 +1,9 @@ +Carrot2-v9 (2025-12-06) +======================== +* DarkSouls model +* fix Angle Steering(HKG car) +* fix LaneChange desire + Carrot2-v9 (2025-12-06) ======================== * PP(planplus) model diff --git a/opendbc_repo/opendbc/car/hyundai/carcontroller.py b/opendbc_repo/opendbc/car/hyundai/carcontroller.py index dd9ced53..6396aa4f 100644 --- a/opendbc_repo/opendbc/car/hyundai/carcontroller.py +++ b/opendbc_repo/opendbc/car/hyundai/carcontroller.py @@ -53,6 +53,38 @@ def process_hud_alert(enabled, fingerprint, hud_control): return sys_warning, sys_state, left_lane_warning, right_lane_warning +def calc_rate_limit_by_lat_accel(delta_deg: float, + v_ego: float, + wheelbase: float, + max_lat_accel: float, + max_rate_low: float, + max_rate_high: float) -> float: + """ + 반환값: 허용 스티어링휠 각속도 (deg/s) + delta_deg : 현재 스티어링휠 각도 (deg) + v_ego : 속도 (m/s) + wheelbase : 축거 (m) + """ + + # v가 너무 작으면 공식이 터지니까, 저속 전용 상수 사용 + if v_ego < 0.5: + return max_rate_low # 예: 300 deg/s + + delta_rad = np.radians(delta_deg) + + # cos^2 항이 0에 가까워지면 rate가 폭발하니 하한 넣어줌 + cos_delta = np.cos(delta_rad) + cos2 = max(cos_delta * cos_delta, 0.05) # 0.05 정도면 충분 + + # 물리식: |δ̇| <= L * a_lat_max / (v^2 * sec^2(δ)) + # 여기서 sec^2(δ) = 1 / cos^2(δ) + delta_rate_rad_s = (wheelbase * max_lat_accel) / (v_ego * v_ego * cos2) + + delta_rate_deg_s = np.degrees(delta_rate_rad_s) + + # 저속에서는 너무 크지 않게, 고속에서는 너무 작지 않게 clip + # max_rate_high <= delta_rate_deg_s <= max_rate_low + return float(np.clip(delta_rate_deg_s, max_rate_high, max_rate_low)) class CarController(CarControllerBase): def __init__(self, dbc_names, CP): @@ -164,18 +196,47 @@ class CarController(CarControllerBase): self.angle_limit_counter, self.max_angle_frames, MAX_ANGLE_CONSECUTIVE_FRAMES) - apply_angle = apply_std_steer_angle_limits(actuators.steeringAngleDeg, self.apply_angle_last, CS.out.vEgoRaw, - CS.out.steeringAngleDeg, CC.latActive, self.params.ANGLE_LIMITS) + #apply_angle = apply_std_steer_angle_limits(actuators.steeringAngleDeg, self.apply_angle_last, CS.out.vEgoRaw, + # CS.out.steeringAngleDeg, CC.latActive, self.params.ANGLE_LIMITS) - if abs(apply_angle - self.apply_angle_last) > 0.1: - alpha = min(0.1 + 0.9 * CS.out.vEgoRaw / (30.0 * CV.KPH_TO_MS), 1.0) - apply_angle = self.apply_angle_last * (1 - alpha) + apply_angle * alpha + MAX_LAT_ACCEL = 8.0 + MAX_RATE_LOW = 200 # 저속, deg/s + MAX_RATE_HIGH = 40 # 고속, deg/s + UNWIND_SCALE = 1.5 + UNWIND_MAX = 200 + + delta = actuators.steeringAngleDeg - self.apply_angle_last + same_dir = (np.sign(delta) == np.sign(self.apply_angle_last)) or (abs(self.apply_angle_last) < 2.0) + + rate_deg_s = calc_rate_limit_by_lat_accel(self.apply_angle_last, CS.out.vEgoRaw, self.CP.wheelbase, MAX_LAT_ACCEL, MAX_RATE_LOW, MAX_RATE_HIGH) + if not same_dir: + rate_deg_s = min(rate_deg_s * UNWIND_SCALE, UNWIND_MAX) + + rate_deg_per_tick = rate_deg_s * DT_CTRL + apply_angle = np.clip(actuators.steeringAngleDeg, + self.apply_angle_last - rate_deg_per_tick, + self.apply_angle_last + rate_deg_per_tick) + + angle_limits = self.params.ANGLE_LIMITS + apply_angle = np.clip(apply_angle, -angle_limits.STEER_ANGLE_MAX, angle_limits.STEER_ANGLE_MAX) + + #if abs(apply_angle - self.apply_angle_last) > 0.1: + # alpha = min(0.1 + 0.9 * CS.out.vEgoRaw / (30.0 * CV.KPH_TO_MS), 1.0) + # apply_angle = self.apply_angle_last * (1 - alpha) + apply_angle * alpha + + v_ego_kph = CS.out.vEgoRaw * CV.MS_TO_KPH + if abs(apply_angle - self.apply_angle_last) < 0.1: + alpha = min(0.05 + 0.45 * v_ego_kph / 30.0, 0.5) + else: + alpha = 1.0 # min(0.1 + 0.9 * v_ego_kph / 30.0, 1.0) + + apply_angle = self.apply_angle_last * (1 - alpha) + apply_angle * alpha if angle_control: apply_steer_req = CC.latActive if CS.out.steeringPressed: - self.apply_angle_last = actuators.steeringAngleDeg + #self.apply_angle_last = CS.out.steeringAngleDeg self.lkas_max_torque = self.lkas_max_torque = max(self.lkas_max_torque - 20, 25) else: target_torque = self.angle_max_torque diff --git a/selfdrive/carrot/carrot_functions.py b/selfdrive/carrot/carrot_functions.py index f5bb7dac..a32a729c 100644 --- a/selfdrive/carrot/carrot_functions.py +++ b/selfdrive/carrot/carrot_functions.py @@ -353,7 +353,7 @@ class CarrotPlanner: leadOne = radarstate.leadOne self.mySafeFactor = 1.0 - if leadOne.status and leadOne.vLead < 5 and leadOne.aLead < 0.2 and v_ego > 1.0: # 앞차가 매우 느리거나 정지한경우 + if leadOne.status and leadOne.radar and leadOne.vLead < 5 and leadOne.aLead < 0.2 and v_ego > 1.0: # 앞차가 매우 느리거나 정지한경우 self.myDrivingMode = DrivingMode.Safe if self.myDrivingMode == DrivingMode.Eco: # eco self.mySafeFactor = self.myEcoModeFactor diff --git a/selfdrive/carrot/carrot_man.py b/selfdrive/carrot/carrot_man.py index 30d08b4e..24f4d4f1 100644 --- a/selfdrive/carrot/carrot_man.py +++ b/selfdrive/carrot/carrot_man.py @@ -359,12 +359,15 @@ class CarrotMan: self.v_cruise_change = 0 elif self._last_vt == CS.vCruise: self.v_cruise_last = CS.vCruise - elif self.long_active and CC.longActive and self.gas_pressed_count == 0: + elif self.long_active and CC.longActive: # and self.gas_pressed_count == 0: if self.v_cruise_last < CS.vCruise: # 속도가 증가하면 - self.v_cruise_change = 100 + if v_ego_kph < CS.vCruise: + self.v_cruise_change = 100 + else: + self.v_cruise_change = 0 elif self.v_cruise_last > CS.vCruise: # 속도가 감소하면 if v_ego_kph < CS.vCruise: # 주행속도가 느리면 - self.v_cruise_change = 100 + self.v_cruise_change = -100 #100 else: # 주행속도가 빠르면 self.v_cruise_change = -100 @@ -376,20 +379,21 @@ class CarrotMan: self.v_cruise_last = CS.vCruise else: self.v_cruise_change = 0 + v_cruise_apply = max(min(CS.vCruise, v_ego_kph), 20) now = time.monotonic() heading = self.carrot_serv.bearing #nPosAnglePhone lat, lon = self.carrot_serv.vpPosPointLat, self.carrot_serv.vpPosPointLon #self.carrot_serv.estimate_position(self.carrot_serv.phone_latitude, self.carrot_serv.phone_longitude, heading, v_ego, now - self.carrot_serv.last_update_gps_time_phone) vt = carrot_speed.query_target_dist(lat, lon, heading, 0.0) if self.v_cruise_change != 0: - carrot_speed.add_sample(lat, lon, heading, self.v_cruise_last if self.v_cruise_change > 0 else (- self.v_cruise_last)) + carrot_speed.add_sample(lat, lon, heading, v_cruise_apply if self.v_cruise_change > 0 else (- v_cruise_apply)) if self.v_cruise_change > 0: self.v_cruise_change -= 1 if self.v_cruise_change < 0: self.v_cruise_change += 1 else: if self.gas_pressed_count > 0: - vt = max(vt, self.v_cruise_last) + vt = max(vt, v_cruise_apply) carrot_speed.add_sample(lat, lon, heading, vt) self.params_memory.put_int_nonblocking("CarrotSpeed", int(vt)) diff --git a/selfdrive/carrot/carrot_serv.py b/selfdrive/carrot/carrot_serv.py index 2402aec1..bb784ba6 100644 --- a/selfdrive/carrot/carrot_serv.py +++ b/selfdrive/carrot/carrot_serv.py @@ -982,10 +982,14 @@ class CarrotServ: if self.turnSpeedControlMode == 2: if -500 < self.xDistToTurn < 500: speed_n_sources.append((route_speed, "route")) - elif self.turnSpeedControlMode == 3: + elif self.turnSpeedControlMode in [3, 4]: speed_n_sources.append((route_speed, "route")) #speed_n_sources.append((self.calculate_current_speed(dist, speed * self.mapTurnSpeedFactor, 0, 1.2), "route")) + model_turn_speed = max(sm['modelV2'].meta.modelTurnSpeed, self.autoCurveSpeedLowerLimit) + if model_turn_speed < 200 and abs(vturn_speed) < 120: + speed_n_sources.append((model_turn_speed, "model")) + desired_speed, source = min(speed_n_sources, key=lambda x: x[0]) if CS is not None: diff --git a/selfdrive/controls/lib/desire_helper.py b/selfdrive/controls/lib/desire_helper.py index 144202e3..9794aa44 100644 --- a/selfdrive/controls/lib/desire_helper.py +++ b/selfdrive/controls/lib/desire_helper.py @@ -38,12 +38,14 @@ DESIRES = { LaneChangeState.laneChangeFinishing: log.Desire.laneChangeRight, }, } + TURN_DESIRES = { TurnDirection.none: log.Desire.none, TurnDirection.turnLeft: log.Desire.turnLeft, TurnDirection.turnRight: log.Desire.turnRight, } + def calculate_lane_width_frog(lane, current_lane, road_edge): lane_x, lane_y = np.array(lane.x), np.array(lane.y) edge_x, edge_y = np.array(road_edge.x), np.array(road_edge.y) @@ -57,63 +59,78 @@ def calculate_lane_width_frog(lane, current_lane, road_edge): return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge + def calculate_lane_width(lane, lane_prob, current_lane, road_edge): - t = 1.0 # 약 1초 앞의 차선. + # t ≈ 1초 앞 기준으로 차선/도로 가장자리까지의 거리 계산 + t = 1.0 current_lane_y = np.interp(t, ModelConstants.T_IDXS, current_lane.y) lane_y = np.interp(t, ModelConstants.T_IDXS, lane.y) distance_to_lane = abs(current_lane_y - lane_y) - #if lane_prob < 0.3:# 차선이 없으면 없는것으로 간주시킴. - # distance_to_lane = min(2.0, distance_to_lane) + road_edge_y = np.interp(t, ModelConstants.T_IDXS, road_edge.y) distance_to_road_edge = abs(current_lane_y - road_edge_y) distance_to_road_edge_far = abs(current_lane_y - np.interp(2.0, ModelConstants.T_IDXS, road_edge.y)) - return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge, distance_to_road_edge_far, lane_prob > 0.5 + + lane_valid = lane_prob > 0.5 + return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge, distance_to_road_edge_far, lane_valid + class ExistCounter: + """ + 존재 여부가 노이즈처럼 튀는 신호를 히스테리시스로 안정화해 주는 카운터. + - true가 일정 시간 이상 지속되면 counter를 양수로 증가 + - false가 일정 시간 이상 지속되면 counter를 음수로 감소 + """ def __init__(self): self.counter = 0 self.true_count = 0 self.false_count = 0 - self.threshold = int(0.2 / DT_MDL) # 노이즈를 무시하기 위한 임계값 설정 + self.threshold = int(0.2 / DT_MDL) # 약 0.2초 이상 지속 시 유효로 판단 - def update(self, exist_flag): + def update(self, exist_flag: bool): if exist_flag: self.true_count += 1 - self.false_count = 0 # false count 초기화 + self.false_count = 0 if self.true_count >= self.threshold: - self.counter = max(self.counter + 1, 1) + self.counter = max(self.counter + 1, 1) else: self.false_count += 1 - self.true_count = 0 # true count 초기화 + self.true_count = 0 if self.false_count >= self.threshold: - self.counter = min(self.counter - 1, -1) - + self.counter = min(self.counter - 1, -1) return self.true_count + class DesireHelper: def __init__(self): self.params = Params() self.frame = 0 + + # Lane change / turn 상태 self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.lane_change_timer = 0.0 self.lane_change_ll_prob = 1.0 - self.keep_pulse_timer = 0.0 - self.prev_desire_enabled = False + self.lane_change_delay = 0.0 + self.maneuver_type = "none" # "none" / "turn" / "lane_change" + + # Desire / turn 관련 self.desire = log.Desire.none self.turn_direction = TurnDirection.none self.enable_turn_desires = True - self.atc_active = 0 - self.desireLog = "" - self.lane_width_left = 0 - self.lane_width_right = 0 - self.lane_width_left_diff = 0 - self.lane_width_right_diff = 0 - self.distance_to_road_edge_left = 0 - self.distance_to_road_edge_right = 0 - self.distance_to_road_edge_left_far = 0 - self.distance_to_road_edge_right_far = 0 - self.blinker_ignore = False + self.turn_desire_state = False + self.desire_disable_count = 0 # turn 후 일정 시간 동안 차선변경 금지 + self.turn_disable_count = 0 # steeringAngle 매우 클 때 turn 금지 + + # Lane / road edge 상태 + self.lane_width_left = 0.0 + self.lane_width_right = 0.0 + self.lane_width_left_diff = 0.0 + self.lane_width_right_diff = 0.0 + self.distance_to_road_edge_left = 0.0 + self.distance_to_road_edge_right = 0.0 + self.distance_to_road_edge_left_far = 0.0 + self.distance_to_road_edge_right_far = 0.0 self.lane_exist_left_count = ExistCounter() self.lane_exist_right_count = ExistCounter() @@ -121,170 +138,320 @@ class DesireHelper: self.lane_width_right_count = ExistCounter() self.road_edge_left_count = ExistCounter() self.road_edge_right_count = ExistCounter() + self.available_left_lane = False self.available_right_lane = False self.available_left_edge = False self.available_right_edge = False - self.lane_width_left_queue = deque(maxlen=int(1.0/DT_MDL)) - self.lane_width_right_queue = deque(maxlen=int(1.0/DT_MDL)) + + self.lane_width_left_queue = deque(maxlen=int(1.0 / DT_MDL)) + self.lane_width_right_queue = deque(maxlen=int(1.0 / DT_MDL)) self.lane_available_last = False self.edge_available_last = False - self.object_detected_count = 0 self.lane_available_trigger = False self.lane_appeared = False self.lane_line_info = 0 - self.laneChangeNeedTorque = 0 - self.laneChangeBsd = 0 - self.laneChangeDelay = 0 - self.lane_change_delay = 0.0 + # Blinkers / ATC + self.blinker_ignore = False self.driver_blinker_state = BLINKER_NONE - self.atc_type = "" - + self.carrot_blinker_state = BLINKER_NONE self.carrot_lane_change_count = 0 self.carrot_cmd_index_last = 0 - self.carrot_blinker_state = BLINKER_NONE + self.atc_type = "" + self.atc_active = 0 # 0: 없음, 1: ATC 동작 중, 2: 운전자와 ATC 상충 - self.turn_desire_state = False - self.desire_disable_count = 0 - self.turn_disable_count = 0 - self.blindspot_detected_counter = 0 + # Auto lane change 관련 self.auto_lane_change_enable = False self.next_lane_change = False + self.blindspot_detected_counter = 0 + # Keep pulse + self.keep_pulse_timer = 0.0 + + # 파라미터 + self.laneChangeNeedTorque = 0 + self.laneChangeBsd = 0 + self.laneChangeDelay = 0.0 self.modelTurnSpeedFactor = 0.0 self.model_turn_speed = 0.0 - def check_lane_state(self, modeldata): - lane_width_left, self.distance_to_road_edge_left, self.distance_to_road_edge_left_far, lane_prob_left = calculate_lane_width(modeldata.laneLines[0], modeldata.laneLineProbs[0], - modeldata.laneLines[1], modeldata.roadEdges[0]) - lane_width_right, self.distance_to_road_edge_right, self.distance_to_road_edge_right_far, lane_prob_right = calculate_lane_width(modeldata.laneLines[3], modeldata.laneLineProbs[3], - modeldata.laneLines[2], modeldata.roadEdges[1]) + # 기타 + self.prev_desire_enabled = False + self.desireLog = "" + self.object_detected_count = 0 + + # ★ 현재 차선 확률 (Ego 기준 좌/우) + self.cur_left_prob = 1.0 # laneLineProbs[1] + self.cur_right_prob = 1.0 # laneLineProbs[2] + self.current_lane_missing = False + # ───────────────────────────────────────────── + # Config / Model 관련 + # ───────────────────────────────────────────── + + def _update_params_periodic(self): + if self.frame % 100 == 0: + self.laneChangeNeedTorque = self.params.get_int("LaneChangeNeedTorque") + self.laneChangeBsd = self.params.get_int("LaneChangeBsd") + self.laneChangeDelay = self.params.get_float("LaneChangeDelay") * 0.1 + self.modelTurnSpeedFactor = self.params.get_float("ModelTurnSpeedFactor") * 0.1 + + def _make_model_turn_speed(self, modeldata): + if self.modelTurnSpeedFactor > 0: + model_turn_speed = np.interp(self.modelTurnSpeedFactor, + modeldata.velocity.t, + modeldata.velocity.x) * CV.MS_TO_KPH * 1.2 + self.model_turn_speed = self.model_turn_speed * 0.9 + model_turn_speed * 0.1 + else: + self.model_turn_speed = 200.0 + + # ───────────────────────────────────────────── + # Lane / Edge 상태 계산 + # ───────────────────────────────────────────── + + def _check_lane_state(self, modeldata): + # 왼쪽: laneLines[0] vs 현재차선 laneLines[1], roadEdges[0] + lane_width_left, self.distance_to_road_edge_left, self.distance_to_road_edge_left_far, lane_prob_left = \ + calculate_lane_width(modeldata.laneLines[0], modeldata.laneLineProbs[0], + modeldata.laneLines[1], modeldata.roadEdges[0]) + + # 오른쪽: laneLines[3] vs 현재차선 laneLines[2], roadEdges[1] + lane_width_right, self.distance_to_road_edge_right, self.distance_to_road_edge_right_far, lane_prob_right = \ + calculate_lane_width(modeldata.laneLines[3], modeldata.laneLineProbs[3], + modeldata.laneLines[2], modeldata.roadEdges[1]) + + # 차선 존재 카운터 업데이트 self.lane_exist_left_count.update(lane_prob_left) self.lane_exist_right_count.update(lane_prob_right) - + + # 1초 이동 평균 (노이즈 줄이기) self.lane_width_left_queue.append(lane_width_left) self.lane_width_right_queue.append(lane_width_right) - self.lane_width_left = np.mean(self.lane_width_left_queue) - self.lane_width_right = np.mean(self.lane_width_right_queue) + + self.lane_width_left = float(np.mean(self.lane_width_left_queue)) + self.lane_width_right = float(np.mean(self.lane_width_right_queue)) + self.lane_width_left_diff = self.lane_width_left_queue[-1] - self.lane_width_left_queue[0] self.lane_width_right_diff = self.lane_width_right_queue[-1] - self.lane_width_right_queue[0] - + + # 유효 차선/엣지 판단 min_lane_width = 2.5 self.lane_width_left_count.update(self.lane_width_left > min_lane_width) self.lane_width_right_count.update(self.lane_width_right > min_lane_width) self.road_edge_left_count.update(self.distance_to_road_edge_left > min_lane_width) self.road_edge_right_count.update(self.distance_to_road_edge_right > min_lane_width) + available_count = int(0.2 / DT_MDL) self.available_left_lane = self.lane_width_left_count.counter > available_count self.available_right_lane = self.lane_width_right_count.counter > available_count self.available_left_edge = self.road_edge_left_count.counter > available_count and self.distance_to_road_edge_left_far > min_lane_width self.available_right_edge = self.road_edge_right_count.counter > available_count and self.distance_to_road_edge_right_far > min_lane_width - def check_desire_state(self, modeldata, carstate): - desire_state = modeldata.meta.desireState - self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1 - if self.turn_desire_state: - self.desire_disable_count = int(2.0/DT_MDL) - else: - self.desire_disable_count = max(0, self.desire_disable_count - 1) + self.cur_left_prob = modeldata.laneLineProbs[1] + self.cur_right_prob = modeldata.laneLineProbs[2] + # ───────────────────────────────────────────── + # 모델 내 desire 상태 (turn 예측 등) + # ───────────────────────────────────────────── + + def _check_desire_state(self, modeldata, carstate, maneuver_type): + desire_state = modeldata.meta.desireState + # turnLeft + turnRight 확률 + self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1 + + #if self.turn_desire_state: + # self.desire_disable_count = int(2.0 / DT_MDL) + #else: + # self.desire_disable_count = max(0, self.desire_disable_count - 1) + + # steeringAngle 너무 크면 turn 자체를 일정 시간 막기 if abs(carstate.steeringAngleDeg) > 80: - self.turn_disable_count = int(10.0/DT_MDL) + self.turn_disable_count = int(10.0 / DT_MDL) else: self.turn_disable_count = max(0, self.turn_disable_count - 1) - #print(f"desire_state = {desire_state}, turn_desire_state = {self.turn_desire_state}, disable_count = {self.desire_disable_count}") - def make_model_turn_speed(self, modeldata): - if self.modelTurnSpeedFactor > 0: - model_turn_speed = np.interp(self.modelTurnSpeedFactor, modeldata.velocity.t, modeldata.velocity.x) * CV.MS_TO_KPH * 1.2 - self.model_turn_speed = self.model_turn_speed * 0.9 + model_turn_speed * 0.1 - else: - self.model_turn_speed = 200.0 - - def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMan, radarState): + # ───────────────────────────────────────────── + # Blinkers / ATC 상태 업데이트 + # ───────────────────────────────────────────── - if self.frame % 100 == 0: - self.laneChangeNeedTorque = self.params.get_int("LaneChangeNeedTorque") - self.laneChangeBsd = self.params.get_int("LaneChangeBsd") - self.laneChangeDelay = self.params.get_float("LaneChangeDelay") * 0.1 - self.modelTurnSpeedFactor= self.params.get_float("ModelTurnSpeedFactor") * 0.1 - - self.frame += 1 - - self.make_model_turn_speed(modeldata) - - self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1) - self.lane_change_delay = max(0, self.lane_change_delay - DT_MDL) - - v_ego = carstate.vEgo - below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN - - ##### check lane state - self.check_lane_state(modeldata) - self.check_desire_state(modeldata, carstate) - - #### check driver's blinker state + def _update_driver_blinker(self, carstate): driver_blinker_state = carstate.leftBlinker * 1 + carstate.rightBlinker * 2 driver_blinker_changed = driver_blinker_state != self.driver_blinker_state self.driver_blinker_state = driver_blinker_state + driver_desire_enabled = driver_blinker_state in [BLINKER_LEFT, BLINKER_RIGHT] - if self.laneChangeNeedTorque < 0: # 운전자가 깜박이 켜도 차선변경 안함. + if self.laneChangeNeedTorque < 0: + # 운전자 깜빡이를 무시하고 차선변경 안 하는 설정 driver_desire_enabled = False - ignore_bsd = True if self.laneChangeBsd < 0 else False - block_lanechange_bsd = True if self.laneChangeBsd == 1 else False + return driver_blinker_state, driver_blinker_changed, driver_desire_enabled - self.blindspot_detected_counter = max(0, self.blindspot_detected_counter - 1) - - ##### check ATC's blinker state + def _update_atc_blinker(self, carrotMan, v_ego, driver_blinker_state): + """ + ATC에서 온 turn/lanechange 명령 기반 깜빡이 상태 갱신. + """ atc_type = carrotMan.atcType atc_blinker_state = BLINKER_NONE + + # ATC 기반 자동 차선변경 유지 시간 if self.carrot_lane_change_count > 0: atc_blinker_state = self.carrot_blinker_state elif carrotMan.carrotCmdIndex != self.carrot_cmd_index_last and carrotMan.carrotCmd == "LANECHANGE": self.carrot_cmd_index_last = carrotMan.carrotCmdIndex self.carrot_lane_change_count = int(0.2 / DT_MDL) - #print(f"Desire lanechange: {carrotMan.carrotArg}") self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT + atc_blinker_state = self.carrot_blinker_state elif atc_type in ["turn left", "turn right"]: + # 네비 turn 안내: 속도 조건을 턴 쪽으로 강제 if self.atc_active != 2: - below_lane_change_speed = True - self.lane_change_timer = 0.0 atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT self.atc_active = 1 self.blinker_ignore = False elif atc_type in ["fork left", "fork right", "atc left", "atc right"]: + # 분기(lanechange에 가까움) if self.atc_active != 2: - below_lane_change_speed = False - atc_blinker_state = BLINKER_LEFT if atc_type in ["fork left", "atc left"] else BLINKER_RIGHT + atc_blinker_state = BLINKER_LEFT if atc_type in ["fork left", "atc left"] else BLINKER_RIGHT self.atc_active = 1 else: self.atc_active = 0 + + # 운전자 깜빡이와 ATC 깜빡이가 충돌할 경우 ATC 무효화 if driver_blinker_state != BLINKER_NONE and atc_blinker_state != BLINKER_NONE and driver_blinker_state != atc_blinker_state: atc_blinker_state = BLINKER_NONE self.atc_active = 2 + atc_desire_enabled = atc_blinker_state in [BLINKER_LEFT, BLINKER_RIGHT] + # blinker_ignore일 때는 깜빡이 신호를 잠시 무시 if driver_blinker_state == BLINKER_NONE: self.blinker_ignore = False if self.blinker_ignore: driver_blinker_state = BLINKER_NONE atc_blinker_state = BLINKER_NONE - driver_desire_enabled = False - - if self.atc_type != atc_type: atc_desire_enabled = False + # ATC 타입이 바뀌었으면 이번 프레임은 무시 (안정화 목적) + if self.atc_type != atc_type: + atc_desire_enabled = False self.atc_type = atc_type + return atc_blinker_state, atc_desire_enabled + + # ───────────────────────────────────────────── + # Turn / LaneChange 모드 분류 + # ───────────────────────────────────────────── + + def _classify_maneuver_type(self, blinker_state, carstate, old_type): + """ + 깜빡이가 들어왔을 때 이번 조작이 turn인지 lane_change인지 분류. + - 너무 복잡하게 가지 않고, 현재 속도/감속/차선상태/모델 turn 상태 기준으로 점수화. + """ + if blinker_state == BLINKER_NONE: + return "none" + + v_kph = carstate.vEgo * CV.MS_TO_KPH + accel = carstate.aEgo + + # 깜빡이 방향에 따라 참조할 lane/edge 선택 + if blinker_state == BLINKER_LEFT: + lane_exist_counter = self.lane_exist_left_count.counter + lane_available = self.available_left_lane + edge_available = self.available_left_edge + lane_prob_side = self.cur_left_prob + edge_dist = self.distance_to_road_edge_left_far + else: + lane_exist_counter = self.lane_exist_right_count.counter + lane_available = self.available_right_lane + edge_available = self.available_right_edge + lane_prob_side = self.cur_right_prob + edge_dist = self.distance_to_road_edge_right_far + + score_turn = 0 + + if v_kph < 30.0: + score_turn += 1 + elif v_kph < 40.0 and accel < -1.0: + score_turn += 1 + + # 차로가 없고, 로드에지도 여유없고.. + if v_kph < 40.0 and not lane_available and not edge_available: + score_turn += 1 + + # 차선이 잘 안 보이거나(교차로/삼거리 등) + if v_kph < 40.0 and lane_exist_counter < int(0.5 / DT_MDL): + score_turn += 1 + + # steeringAngle이 크면 턴에 가깝다고 본다 + #if abs(carstate.steeringAngleDeg) > 45.0: + # score_turn += 1 + + # 모델이 이미 turn을 예측 중이면 가중치 + if self.turn_desire_state: + score_turn += 1 + + # ATC가 turn 안내 중이면 가중치 + if self.atc_type in ["turn left", "turn right"]: + score_turn += 2 + elif self.atc_type in ["fork left", "fork right", "atc left", "atc right"]: + score_turn -= 2 # fork/atc는 lanechange 쪽에 더 가깝게 + + # ★ road edge가 충분히 멀면(교차로/넓은 공간으로 판단) 턴 쪽으로 가산점 + edge_far = edge_dist > 4.0 # 튜닝 포인트 (4~6m 정도가 무난) + #if edge_far: + # score_turn += 1 + + current_lane_missing = lane_prob_side < 0.3 + self.current_lane_missing = current_lane_missing + # 튜닝 포인트: score_turn 임계값 + if score_turn >= 2: + #if current_lane_missing and edge_far: + if edge_far: + return "turn" + else: + return old_type + else: + return "lane_change" + + # ───────────────────────────────────────────── + # 메인 업데이트 루틴 + # ───────────────────────────────────────────── + + def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMan, radarState): + self.frame += 1 + self._update_params_periodic() + self._make_model_turn_speed(modeldata) + + # 카운터 감소 + self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1) + self.lane_change_delay = max(0.0, self.lane_change_delay - DT_MDL) + self.blindspot_detected_counter = max(0, self.blindspot_detected_counter - 1) + + v_ego = carstate.vEgo + below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN + + # Lane / desire 상태 갱신 + self._check_lane_state(modeldata) + self._check_desire_state(modeldata, carstate, self.maneuver_type) + + # 운전자 깜빡이 + driver_blinker_state, driver_blinker_changed, driver_desire_enabled = self._update_driver_blinker(carstate) + + # BSD 설정 + ignore_bsd = (self.laneChangeBsd < 0) + block_lanechange_bsd = (self.laneChangeBsd == 1) + + # ATC 깜빡이 + atc_blinker_state, atc_desire_enabled = self._update_atc_blinker(carrotMan, v_ego, driver_blinker_state) + + # 최종 깜빡이/Desire enabled 판단 desire_enabled = driver_desire_enabled or atc_desire_enabled blinker_state = driver_blinker_state if driver_desire_enabled else atc_blinker_state + # lane_line_info (HUD용 등) lane_line_info = carstate.leftLaneLine if blinker_state == BLINKER_LEFT else carstate.rightLaneLine - lane_line_info_edge_detect = False + # BSD / 주변 차량 감지 if desire_enabled: lane_exist_counter = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter lane_available = self.available_left_lane if blinker_state == BLINKER_LEFT else self.available_right_lane @@ -294,9 +461,12 @@ class DesireHelper: radar = radarState.leadLeft if blinker_state == BLINKER_LEFT else radarState.leadRight side_object_dist = radar.dRel + radar.vLead * 4.0 if radar.status else 255 object_detected = side_object_dist < v_ego * 3.0 - self.object_detected_count = max(1, self.object_detected_count + 1) if object_detected else min(-1, self.object_detected_count - 1) - if lane_line_info % 10 in [0, 5] and self.lane_line_info not in [0, 5]: - lane_line_info_edge_detect = True + if object_detected: + self.object_detected_count = max(1, self.object_detected_count + 1) + else: + self.object_detected_count = min(-1, self.object_detected_count - 1) + + lane_line_info_edge_detect = (lane_line_info % 10 in [0, 5] and self.lane_line_info not in [0, 5]) self.lane_line_info = lane_line_info % 10 else: lane_exist_counter = 0 @@ -305,155 +475,183 @@ class DesireHelper: self.lane_appeared = False self.lane_available_trigger = False self.object_detected_count = 0 + lane_line_info_edge_detect = False self.lane_line_info = lane_line_info % 10 - #lane_available_trigger = not self.lane_available_last and lane_available - lane_change_available = (lane_available or edge_available) and lane_line_info < 20 # lane_line_info가 20보다 작으면 흰색라인임. + # 차선/엣지 기반 lane change 가능 여부 + lane_change_available = (lane_available or edge_available) and lane_line_info < 20 # 20 미만이면 흰색 라인 + + # lane_available 변화 & 폭 변화로 lane_available_trigger 계산 self.lane_available_trigger = False - lane_width_diff = self.lane_width_left_diff if atc_blinker_state == BLINKER_LEFT else self.lane_width_right_diff - distance_to_road_edge = self.distance_to_road_edge_left if atc_blinker_state == BLINKER_LEFT else self.distance_to_road_edge_right - lane_width_side = self.lane_width_left if atc_blinker_state == BLINKER_LEFT else self.lane_width_right + if blinker_state == BLINKER_LEFT: + lane_width_diff = self.lane_width_left_diff + distance_to_road_edge = self.distance_to_road_edge_left + lane_width_side = self.lane_width_left + else: + lane_width_diff = self.lane_width_right_diff + distance_to_road_edge = self.distance_to_road_edge_right + lane_width_side = self.lane_width_right + if lane_width_diff > 0.8 and (lane_width_side < distance_to_road_edge): self.lane_available_trigger = True - edge_availabled = not self.edge_available_last and edge_available + + edge_availabled = (not self.edge_available_last and edge_available) side_object_detected = self.object_detected_count > -0.3 / DT_MDL self.lane_appeared = self.lane_appeared and distance_to_road_edge < 4.0 - + # Auto lane change 트리거 if self.carrot_lane_change_count > 0: auto_lane_change_blocked = False auto_lane_change_trigger = lane_change_available else: auto_lane_change_blocked = ((atc_blinker_state == BLINKER_LEFT) and (driver_blinker_state != BLINKER_LEFT)) - #auto_lane_change_trigger = not auto_lane_change_blocked and edge_available and (lane_available_trigger or edge_availabled or lane_appeared) and not side_object_detected - auto_lane_change_trigger = self.auto_lane_change_enable and not auto_lane_change_blocked and edge_available and (self.lane_available_trigger or self.lane_appeared) and not side_object_detected + self.auto_lane_change_enable = self.auto_lane_change_enable and not auto_lane_change_blocked + auto_lane_change_trigger = self.auto_lane_change_enable and edge_available and (self.lane_available_trigger or self.lane_appeared) and not side_object_detected self.desireLog = f"L:{self.auto_lane_change_enable},{auto_lane_change_blocked},E:{lane_available},{edge_available},A:{self.lane_available_trigger},{self.lane_appeared},{lane_width_diff:.1f},{lane_width_side:.1f},{distance_to_road_edge:.1f}={auto_lane_change_trigger}" + # 메인 상태머신 + + # 0) lateral 끊기거나 너무 오래 지속되면 리셋 if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX: - #print("Desire canceled") self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.turn_direction = TurnDirection.none - elif desire_enabled and ((below_lane_change_speed and not carstate.standstill and self.enable_turn_desires) or self.turn_desire_state): - #print("Desire Turning") - self.lane_change_state = LaneChangeState.off - if self.turn_disable_count > 0: - self.turn_direction = TurnDirection.none - self.lane_change_direction = LaneChangeDirection.none - else: - self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight - self.lane_change_direction = self.turn_direction #LaneChangeDirection.none - desire_enabled = False - elif self.desire_disable_count > 0: # Turn 후 일정시간 동안 차선변경 불가능 - #print("Desire after turning") + self.maneuver_type = "none" + + # 1) turn 후 일정시간 동안은 아무 것도 하지 않음 + elif self.desire_disable_count > 0: self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.turn_direction = TurnDirection.none + self.maneuver_type = "none" + else: - #print(f"Desire LaneChange below={below_lane_change_speed}, lane_change_state={self.lane_change_state}, desire_enabled={desire_enabled},{self.prev_desire_enabled} ") - self.turn_direction = TurnDirection.none - # LaneChangeState.off - if self.lane_change_state == LaneChangeState.off and desire_enabled and not self.prev_desire_enabled and not below_lane_change_speed: - self.lane_change_state = LaneChangeState.preLaneChange - self.lane_change_ll_prob = 1.0 - self.lane_change_delay = self.laneChangeDelay + # 깜빡이 켜져 있을 때, 이번 조작이 turn인지 lane_change인지 먼저 분류 + if desire_enabled: + new_type = self._classify_maneuver_type(blinker_state, carstate, self.maneuver_type) + else: + new_type = "none" - # 맨끝차선이 아니면(측면에 차선이 있으면), ATC 자동작동 안함. - #self.auto_lane_change_enable = False if lane_exist_counter > 0 else True - self.auto_lane_change_enable = False if lane_exist_counter > 0 or lane_change_available else True - self.next_lane_change = False - + # ★ 1) 원래 lane_change였는데 새로 보니 turn 조건 + 차선 없음이면 → 강제 전환 허용 + if self.maneuver_type == "lane_change" and new_type == "turn" and self.lane_change_state not in [LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting]: + # 차선변경 도중에도 조건 만족 시 턴으로 스위칭 + self.maneuver_type = "turn" + self.lane_change_state = LaneChangeState.off # FSM 리셋 후 turn 루트로 + # ★ 2) 그 외에는 off/pre 상태에서만 모드 변경 + elif self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): + self.maneuver_type = new_type - # LaneChangeState.preLaneChange - elif self.lane_change_state == LaneChangeState.preLaneChange: - # Set lane change direction - self.lane_change_direction = LaneChangeDirection.left if \ - blinker_state == BLINKER_LEFT else LaneChangeDirection.right - - dir_map = { - LaneChangeDirection.left: (carstate.steeringTorque > 0, carstate.leftBlindspot), - LaneChangeDirection.right: (carstate.steeringTorque < 0, carstate.rightBlindspot), - } - torque_cond, blindspot_cond = dir_map.get(self.lane_change_direction, (False, False)) - torque_applied = carstate.steeringPressed and torque_cond - blindspot_detected = blindspot_cond - - if not lane_available or lane_exist_counter < int(2.0 / DT_MDL): #lane_exist_counter > int(0.2 / DT_MDL) and not lane_change_available: - self.auto_lane_change_enable = True - - if blindspot_detected and not ignore_bsd: - self.blindspot_detected_counter = int(1.5 / DT_MDL) - # BSD검출시.. 아래 두줄로 자동차선변경 해제함.. 위험해서 자동차선변경기능은 안하는걸로... - #self.lane_change_state = LaneChangeState.off - #self.lane_change_direction = LaneChangeDirection.none - if not desire_enabled or below_lane_change_speed: - self.lane_change_state = LaneChangeState.off + # ─ TURN 모드 처리 ─ + if desire_enabled and self.maneuver_type == "turn" and self.enable_turn_desires: # and not carstate.standstill: + self.lane_change_state = LaneChangeState.off + if self.turn_disable_count > 0: + self.turn_direction = TurnDirection.none self.lane_change_direction = LaneChangeDirection.none else: - if (lane_change_available and self.lane_change_delay == 0) or lane_line_info_edge_detect: - if self.blindspot_detected_counter > 0 and not ignore_bsd: # BSD검출시 - if torque_applied and not block_lanechange_bsd: - self.lane_change_state = LaneChangeState.laneChangeStarting - elif self.laneChangeNeedTorque > 0 or self.next_lane_change: # 조향토크필요 - if torque_applied: - self.lane_change_state = LaneChangeState.laneChangeStarting - elif driver_desire_enabled: - self.lane_change_state = LaneChangeState.laneChangeStarting - # ATC작동인경우 차선이 나타나거나 차선이 생기면 차선변경 시작 - # lane_appeared: 차선이 생기는건 안함.. 위험. - elif torque_applied or auto_lane_change_trigger or lane_line_info_edge_detect: - self.lane_change_state = LaneChangeState.laneChangeStarting + self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight + # 호환성을 위해 lane_change_direction도 turn과 동일하게 세팅 + self.lane_change_direction = self.turn_direction - # LaneChangeState.laneChangeStarting - elif self.lane_change_state == LaneChangeState.laneChangeStarting: - # fade out over .5s - self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) + # ─ Lane Change FSM 처리 ─ + else: + self.turn_direction = TurnDirection.none - # 98% certainty - if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01: - self.lane_change_state = LaneChangeState.laneChangeFinishing - - # LaneChangeState.laneChangeFinishing - elif self.lane_change_state == LaneChangeState.laneChangeFinishing: - # fade in laneline over 1s - self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) - - if self.lane_change_ll_prob > 0.99: - self.lane_change_direction = LaneChangeDirection.none - if desire_enabled: + # LaneChangeState.off + if self.lane_change_state == LaneChangeState.off: + if desire_enabled and not self.prev_desire_enabled and not below_lane_change_speed: self.lane_change_state = LaneChangeState.preLaneChange - self.next_lane_change = True - else: - self.lane_change_state = LaneChangeState.off + self.lane_change_ll_prob = 1.0 + self.lane_change_delay = self.laneChangeDelay + # 맨 끝 차선이 아니면, ATC 자동 차선변경 비활성 + lane_exist_counter_side = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter + self.auto_lane_change_enable = False if lane_exist_counter_side > 0 or lane_change_available else True + self.next_lane_change = False + + # LaneChangeState.preLaneChange + elif self.lane_change_state == LaneChangeState.preLaneChange: + self.lane_change_direction = LaneChangeDirection.left if blinker_state == BLINKER_LEFT else LaneChangeDirection.right + dir_map = { + LaneChangeDirection.left: (carstate.steeringTorque > 0, carstate.leftBlindspot), + LaneChangeDirection.right: (carstate.steeringTorque < 0, carstate.rightBlindspot), + } + torque_cond, blindspot_cond = dir_map.get(self.lane_change_direction, (False, False)) + torque_applied = carstate.steeringPressed and torque_cond + blindspot_detected = blindspot_cond + + # 차선이 일정시간 이상 안보이면 자동차선변경 허용 + lane_exist_counter_side = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter + if not lane_available or lane_exist_counter_side < int(2.0 / DT_MDL): + self.auto_lane_change_enable = True + + # BSD + if blindspot_detected and not ignore_bsd: + self.blindspot_detected_counter = int(1.5 / DT_MDL) + + if not desire_enabled or below_lane_change_speed: + self.lane_change_state = LaneChangeState.off + self.lane_change_direction = LaneChangeDirection.none + else: + # 차선변경 시작 조건 + if (lane_change_available and self.lane_change_delay == 0) or lane_line_info_edge_detect: + if self.blindspot_detected_counter > 0 and not ignore_bsd: + if torque_applied and not block_lanechange_bsd: + self.lane_change_state = LaneChangeState.laneChangeStarting + elif self.laneChangeNeedTorque > 0 or self.next_lane_change: + if torque_applied: + self.lane_change_state = LaneChangeState.laneChangeStarting + elif driver_desire_enabled: + self.lane_change_state = LaneChangeState.laneChangeStarting + elif torque_applied or auto_lane_change_trigger or lane_line_info_edge_detect: + self.lane_change_state = LaneChangeState.laneChangeStarting + + # LaneChangeState.laneChangeStarting + elif self.lane_change_state == LaneChangeState.laneChangeStarting: + # 원래 차선라인을 0.5초 동안 서서히 fade-out + self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) + if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01: + self.lane_change_state = LaneChangeState.laneChangeFinishing + + # LaneChangeState.laneChangeFinishing + elif self.lane_change_state == LaneChangeState.laneChangeFinishing: + # 1초 동안 서서히 lane line 복귀 + self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) + if self.lane_change_ll_prob > 0.99: + self.lane_change_direction = LaneChangeDirection.none + if desire_enabled: + self.lane_change_state = LaneChangeState.preLaneChange + self.next_lane_change = True + else: + self.lane_change_state = LaneChangeState.off + + # lane_change_timer 관리 if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): self.lane_change_timer = 0.0 else: self.lane_change_timer += DT_MDL - self.lane_available_last = lane_available self.edge_available_last = edge_available self.prev_desire_enabled = desire_enabled - steering_pressed = carstate.steeringPressed and \ - ((carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or (carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT)) - if steering_pressed and self.lane_change_state != LaneChangeState.off: + + # 운전자가 반대 방향으로 강하게 조향하면 해당 차선변경/턴 취소 + steering_pressed_cancel = carstate.steeringPressed and \ + ((carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or + (carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT)) + if steering_pressed_cancel and self.lane_change_state != LaneChangeState.off: self.lane_change_direction = LaneChangeDirection.none self.lane_change_state = LaneChangeState.off self.blinker_ignore = True + # 최종 desire 결정 if self.turn_direction != TurnDirection.none: self.desire = TURN_DESIRES[self.turn_direction] self.lane_change_direction = self.turn_direction else: self.desire = DESIRES[self.lane_change_direction][self.lane_change_state] - #print(f"desire = {self.desire}") - #self.desireLog = f"desire = {self.desire}" - #self.desireLog = f"rlane={self.distance_to_road_edge_right:.1f},{self.distance_to_road_edge_right_far:.1f}" - - # Send keep pulse once per second during LaneChangeStart.preLaneChange + # keep pulse (LaneChangeState.preLaneChange에서 유지) if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting): self.keep_pulse_timer = 0.0 elif self.lane_change_state == LaneChangeState.preLaneChange: diff --git a/selfdrive/controls/lib/longitudinal_planner.py b/selfdrive/controls/lib/longitudinal_planner.py index f1e3132b..aa734654 100644 --- a/selfdrive/controls/lib/longitudinal_planner.py +++ b/selfdrive/controls/lib/longitudinal_planner.py @@ -26,8 +26,9 @@ ALLOW_THROTTLE_THRESHOLD = 0.5 MIN_ALLOW_THROTTLE_SPEED = 2.5 # Lookup table for turns -_A_TOTAL_MAX_V = [1.7, 3.2] +_A_TOTAL_MAX_V = [2.4, 4.8] #[1.7, 3.2] _A_TOTAL_MAX_BP = [20., 40.] +LAT_WEIGHT = 0.7 def get_max_accel(v_ego): @@ -37,7 +38,7 @@ def get_coast_accel(pitch): return np.sin(pitch) * -5.65 - 0.3 # fitted from data using xx/projects/allow_throttle/compute_coast_accel.py -def limit_accel_in_turns(v_ego, angle_steers, a_target, CP): +def limit_accel_in_turns_org(v_ego, angle_steers, a_target, CP): """ This function returns a limited long acceleration allowed, depending on the existing lateral acceleration this should avoid accelerating when losing the target in turns @@ -48,11 +49,38 @@ def limit_accel_in_turns(v_ego, angle_steers, a_target, CP): if v_ego > 20 or (v_ego > 25 and steer_abs < 3.0): return a_target a_total_max = np.interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V) - a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase) + a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase) * LAT_WEIGHT a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.)) return [a_target[0], min(a_target[1], a_x_allowed)] +def limit_accel_in_turns(v_ego, curvature, a_target, a_lat_max): + """ + v_ego : m/s + curvature: 1/m (조향에서 이미 나온 곡률, sign 포함) + a_target : [a_min, a_max] + a_lat_max: 허용 최대 횡가속 (예: 6.0 ~ 8.0 m/s^2) + + return : [a_min, 제한된 a_max] + """ + # 아주 저속이면 굳이 제한 안 걸어도 됨 + if v_ego < 0.1 or a_lat_max <= 0.0: + return a_target + + # 횡가속 + a_y = v_ego * v_ego * curvature # m/s^2 + a_y_abs = abs(a_y) + a_lat_max_abs = abs(a_lat_max) + + # a_total^2 = a_x^2 + a_y^2 <= a_lat_max^2 라고 보고, + # 남은 a_x 여유를 계산 + if a_y_abs >= a_lat_max_abs: + a_x_allowed = 0.0 + else: + a_x_allowed = math.sqrt(a_lat_max_abs**2 - a_y_abs**2) + + # a_target = [min, max] 중에서 max만 줄여줌 + return [a_target[0], min(a_target[1], a_x_allowed)] class LongitudinalPlanner: def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL): @@ -138,7 +166,9 @@ class LongitudinalPlanner: #accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)] accel_limits = [A_CRUISE_MIN, carrot.get_carrot_accel(v_ego)] steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg - accel_limits_turns = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_limits, self.CP) + #accel_limits_turns = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_limits, self.CP) + a_lat_max = 4.0 + accel_limits_turns = limit_accel_in_turns(v_ego, sm['controlsState'].desiredCurvature, accel_limits, a_lat_max) else: accel_limits = [ACCEL_MIN, ACCEL_MAX] accel_limits_turns = [ACCEL_MIN, ACCEL_MAX] diff --git a/selfdrive/modeld/modeld.py b/selfdrive/modeld/modeld.py index c2914b89..91866328 100755 --- a/selfdrive/modeld/modeld.py +++ b/selfdrive/modeld/modeld.py @@ -45,11 +45,11 @@ POLICY_METADATA_PATH = Path(__file__).parent / 'models/driving_policy_metadata.p LAT_SMOOTH_SECONDS = 0.13 LONG_SMOOTH_SECONDS = 0.3 MIN_LAT_CONTROL_SPEED = 0.3 -RECOVERY_POWER = 1.0 # The higher this number the more aggressively the model will recover to lanecenter, too high and it will ping-pong + def get_action_from_model(model_output: dict[str, np.ndarray], prev_action: log.ModelDataV2.Action, lat_action_t: float, long_action_t: float, v_ego: float, lat_smooth_seconds: float, vEgoStopping: float) -> log.ModelDataV2.Action: - plan = model_output['plan'][0] + RECOVERY_POWER*model_output['planplus'][0] + plan = model_output['plan'][0] desired_accel, should_stop, _, desired_velocity_now = get_accel_from_plan(plan[:,Plan.VELOCITY][:,0], plan[:,Plan.ACCELERATION][:,0], ModelConstants.T_IDXS, diff --git a/selfdrive/modeld/models/driving_policy.onnx b/selfdrive/modeld/models/driving_policy.onnx index 531af6df..5c519823 100644 Binary files a/selfdrive/modeld/models/driving_policy.onnx and b/selfdrive/modeld/models/driving_policy.onnx differ diff --git a/selfdrive/modeld/models/driving_vision.onnx b/selfdrive/modeld/models/driving_vision.onnx index bad4cbf9..902f1dd3 100644 Binary files a/selfdrive/modeld/models/driving_vision.onnx and b/selfdrive/modeld/models/driving_vision.onnx differ diff --git a/selfdrive/modeld/parse_model_outputs.py b/selfdrive/modeld/parse_model_outputs.py index 0881a1ff..038f51ca 100644 --- a/selfdrive/modeld/parse_model_outputs.py +++ b/selfdrive/modeld/parse_model_outputs.py @@ -113,7 +113,6 @@ class Parser: plan_mhp = self.is_mhp(outs, 'plan', ModelConstants.IDX_N * ModelConstants.PLAN_WIDTH) plan_in_N, plan_out_N = (ModelConstants.PLAN_MHP_N, ModelConstants.PLAN_MHP_SELECTION) if plan_mhp else (0, 0) self.parse_mdn('plan', outs, in_N=plan_in_N, out_N=plan_out_N, out_shape=(ModelConstants.IDX_N, ModelConstants.PLAN_WIDTH)) - self.parse_mdn('planplus', outs, in_N=plan_in_N, out_N=plan_out_N, out_shape=(ModelConstants.IDX_N, ModelConstants.PLAN_WIDTH)) self.parse_categorical_crossentropy('desire_state', outs, out_shape=(ModelConstants.DESIRE_PRED_WIDTH,)) return outs