Compare commits

...

3 Commits

Author SHA1 Message Date
rav4kumar
e1f2f2b43e clean up 2026-05-15 18:54:31 -07:00
rav4kumar
e66840ac5b so picky 2026-05-15 18:24:26 -07:00
rav4kumar
a23cd7dddc relc 2026-05-15 18:24:00 -07:00
12 changed files with 237 additions and 6 deletions

View File

@@ -342,6 +342,7 @@ struct OnroadEventSP @0xda96579883444c35 {
speedLimitChanged @21;
speedLimitPending @22;
e2eChime @23;
laneChangeRoadEdge @24;
}
}
@@ -448,6 +449,8 @@ struct LiveMapDataSP @0xf416ec09499d9d19 {
struct ModelDataV2SP @0xa1680744031fdb2d {
laneTurnDirection @0 :TurnDirection;
leftLaneChangeEdgeBlock @1 :Bool;
rightLaneChangeEdgeBlock @2 :Bool;
enum TurnDirection {
none @0;

View File

@@ -178,6 +178,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RoadEdgeLaneChangeEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowTurnSignals", {PERSISTENT | BACKUP, BOOL, "0"}},

View File

@@ -56,7 +56,7 @@ class DesireHelper:
def get_lane_change_direction(CS):
return LaneChangeDirection.left if CS.leftBlinker else LaneChangeDirection.right
def update(self, carstate, lateral_active, lane_change_prob):
def update(self, carstate, lateral_active, lane_change_prob, left_edge_detected=False, right_edge_detected=False):
self.alc.update_params()
self.lane_turn_controller.update_params()
v_ego = carstate.vEgo
@@ -88,8 +88,8 @@ class DesireHelper:
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = (((carstate.leftBlindspot or left_edge_detected) and self.lane_change_direction == LaneChangeDirection.left) or
((carstate.rightBlindspot or right_edge_detected) and self.lane_change_direction == LaneChangeDirection.right))
self.alc.update_lane_change(blindspot_detected, carstate.brakePressed)

View File

@@ -298,9 +298,16 @@ class SelfdriveD(CruiseHelper):
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
mdv2sp = self.sm['modelDataV2SP']
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
(CS.rightBlindspot and direction == LaneChangeDirection.right):
self.events.add(EventName.laneChangeBlocked)
elif (mdv2sp.leftLaneChangeEdgeBlock and direction == LaneChangeDirection.left) or \
(mdv2sp.rightLaneChangeEdgeBlock and direction == LaneChangeDirection.right):
self.events_sp.add(custom.OnroadEventSP.EventName.laneChangeRoadEdge)
else:
if direction == LaneChangeDirection.left:
self.events.add(EventName.preLaneChangeLeft)

View File

@@ -51,11 +51,17 @@ class LaneChangeSettingsLayout(Widget):
description=lambda: tr("Toggle to enable a delay timer for seamless lane changes when blind spot monitoring " +
"(BSM) detects a obstructing vehicle, ensuring safe maneuvering."),
)
self._road_edge_block = toggle_item_sp(
param="RoadEdgeLaneChangeEnabled",
title=lambda: tr("Block Lane Change: Road Edge Detection"),
description=lambda: tr("Blocks the lane change if the model sees a road edge on your signaled side."),
)
items = [
self._lane_change_timer,
LineSeparatorSP(40),
self._bsm_delay,
self._road_edge_block,
]
return items

View File

@@ -34,6 +34,7 @@ from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld_v2.modeld_base import ModelStateBase
from openpilot.sunnypilot.models.helpers import get_active_bundle
from openpilot.sunnypilot.models.runners.helpers import get_model_runner
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
PROCESS_NAME = "selfdrive.modeld.modeld_tinygrad"
@@ -246,6 +247,9 @@ def main(demo=False):
prev_action = log.ModelDataV2.Action()
DH = DesireHelper()
RELC = RoadEdgeLaneChangeController(DH)
while True:
# Keep receiving frames until we are at least 1 frame ahead of previous extra frame
@@ -349,7 +353,10 @@ def main(demo=False):
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
RELC.update(modelv2_send.modelV2.roadEdgeStds, modelv2_send.modelV2.laneLineProbs, v_ego)
mdv2sp_send.modelDataV2SP.leftLaneChangeEdgeBlock = RELC.left_edge_detected
mdv2sp_send.modelDataV2SP.rightLaneChangeEdgeBlock = RELC.right_edge_detected
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, RELC.left_edge_detected, RELC.right_edge_detected)
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
mdv2sp_send.modelDataV2SP.laneTurnDirection = DH.lane_turn_direction

View File

@@ -0,0 +1,84 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import numpy as np
from openpilot.common.constants import CV
from openpilot.common.realtime import DT_MDL
from openpilot.common.params import Params
NEARSIDE_PROB = 0.2
EDGE_PROB = 0.35
EDGE_REACTION_TIME = 1.0
EDGE_CLEAR_TIME = 0.3
MIN_SPEED = 20 * CV.MPH_TO_MS
class RoadEdgeLaneChangeController:
def __init__(self, desire_helper):
self.DH = desire_helper
self.params = Params()
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
self.param_read_counter = 0
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def read_params(self) -> None:
self.enabled = self.params.get_bool("RoadEdgeLaneChangeEnabled")
def update_params(self) -> None:
if self.param_read_counter % 50 == 0:
self.read_params()
self.param_read_counter += 1
def reset(self) -> None:
self.left_edge_detected = False
self.right_edge_detected = False
self.left_edge_timer = 0.0
self.right_edge_timer = 0.0
self.left_clear_timer = 0.0
self.right_clear_timer = 0.0
def update(self, road_edge_stds, lane_line_probs, v_ego: float) -> None:
self.update_params()
if not self.enabled or v_ego < MIN_SPEED:
self.reset()
return
left_edge_prob = np.clip(1.0 - road_edge_stds[0], 0.0, 1.0)
right_edge_prob = np.clip(1.0 - road_edge_stds[1], 0.0, 1.0)
left_lane_prob = lane_line_probs[0]
right_lane_prob = lane_line_probs[3]
left_cond = left_edge_prob > EDGE_PROB and left_lane_prob < NEARSIDE_PROB and right_lane_prob >= left_lane_prob
right_cond = right_edge_prob > EDGE_PROB and right_lane_prob < NEARSIDE_PROB and left_lane_prob >= right_lane_prob
if left_cond:
self.left_edge_timer = min(self.left_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.left_clear_timer = 0.0
if self.left_edge_timer > EDGE_REACTION_TIME:
self.left_edge_detected = True
else:
self.left_clear_timer += DT_MDL
if self.left_clear_timer > EDGE_CLEAR_TIME:
self.left_edge_timer = 0.0
self.left_edge_detected = False
if right_cond:
self.right_edge_timer = min(self.right_edge_timer + DT_MDL, EDGE_REACTION_TIME + EDGE_CLEAR_TIME)
self.right_clear_timer = 0.0
if self.right_edge_timer > EDGE_REACTION_TIME:
self.right_edge_detected = True
else:
self.right_clear_timer += DT_MDL
if self.right_clear_timer > EDGE_CLEAR_TIME:
self.right_edge_timer = 0.0
self.right_edge_detected = False

View File

@@ -5,6 +5,8 @@ from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.lane_turn_desire import LaneTurnController, LANE_CHANGE_SPEED_MIN
from openpilot.sunnypilot.selfdrive.controls.lib.auto_lane_change import AutoLaneChangeMode
from openpilot.sunnypilot.selfdrive.controls.lib.relc import RoadEdgeLaneChangeController
TurnDirection = custom.ModelDataV2SP.TurnDirection
@@ -107,7 +109,11 @@ def set_lane_turn_params():
])
def test_desire_helper_integration(carstate, lateral_active, lane_change_prob, expected_desire, set_lane_turn_params):
dh = DesireHelper()
relc = RoadEdgeLaneChangeController(dh)
relc.enabled = True
dh.alc.lane_change_set_timer = AutoLaneChangeMode.NUDGE
for _ in range(10):
dh.update(carstate, lateral_active, lane_change_prob)
dh.update(carstate, lateral_active, lane_change_prob,
left_edge_detected=relc.left_edge_detected, right_edge_detected=relc.right_edge_detected)
assert dh.desire == expected_desire # The first four tests were unit tests to test the controller, where this tests the integration in desire helpers

View File

@@ -0,0 +1,99 @@
"""
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pytest
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.sunnypilot.selfdrive.controls.lib.relc import (
RoadEdgeLaneChangeController, EDGE_REACTION_TIME, EDGE_CLEAR_TIME, MIN_SPEED,
)
V_HIGH = MIN_SPEED + 2.0
V_LOW = MIN_SPEED - 1.0
@pytest.fixture
def relc(mocker):
mock_params = mocker.patch("openpilot.sunnypilot.selfdrive.controls.lib.relc.Params")
mock_params.return_value.get_bool.return_value = True
controller = RoadEdgeLaneChangeController(DesireHelper())
controller.enabled = True
return controller
def drive(controller, road_edge_stds, lane_line_probs, seconds, v_ego=V_HIGH):
for _ in range(int(seconds / DT_MDL) + 1):
controller.update(road_edge_stds, lane_line_probs, v_ego)
@pytest.mark.parametrize("road_edge_stds,lane_line_probs,attr", [
([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], "left_edge_detected"),
([0.9, 0.0], [0.8, 0.8, 0.8, 0.0], "right_edge_detected"),
])
def test_edge_detection(relc, road_edge_stds, lane_line_probs, attr):
drive(relc, road_edge_stds, lane_line_probs, EDGE_REACTION_TIME + 0.1)
assert getattr(relc, attr)
def test_edge_detection_requires_time(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME - 0.05)
assert not relc.left_edge_detected
def test_both_edges_detected(relc):
drive(relc, [0.0, 0.0], [0.0, 0.8, 0.8, 0.0], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
assert relc.right_edge_detected
def test_noise_doesnt_clear(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update(*clear, V_HIGH)
relc.update(*edge, V_HIGH)
assert relc.left_edge_detected
def test_clears_after_window(relc):
edge = ([0.0, 0.9], [0.0, 0.8, 0.8, 0.8])
clear = ([0.9, 0.9], [0.8, 0.8, 0.8, 0.8])
drive(relc, *edge, EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
drive(relc, *clear, EDGE_CLEAR_TIME + 0.05)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_low_speed_skips(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1, v_ego=V_LOW)
assert not relc.left_edge_detected
assert relc.left_edge_timer == 0.0
def test_speed_drop_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_LOW)
assert not relc.left_edge_detected
def test_param_off_resets(relc):
drive(relc, [0.0, 0.9], [0.0, 0.8, 0.8, 0.8], EDGE_REACTION_TIME + 0.1)
assert relc.left_edge_detected
relc.params.get_bool.return_value = False
relc.read_params()
relc.update([0.0, 0.9], [0.0, 0.8, 0.8, 0.8], V_HIGH)
assert not relc.left_edge_detected
assert not relc.right_edge_detected

View File

@@ -243,4 +243,12 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.prompt, 3.),
},
EventNameSP.laneChangeRoadEdge: {
ET.WARNING: Alert(
"Lane Change Unavailable: Road Edge",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 0.1),
},
}

View File

@@ -1114,6 +1114,10 @@
"title": "Record Front Lock",
"description": ""
},
"RoadEdgeLaneChangeEnabled": {
"title": "Block Lane Change: Road Edge Detection",
"description": ""
},
"RoadName": {
"title": "Road Name",
"description": ""

View File

@@ -519,6 +519,12 @@
}
]
},
{
"key": "RoadEdgeLaneChangeEnabled",
"widget": "toggle",
"title": "Block Lane Change: Road Edge Detection",
"description": "Blocks lane change when the model sees a road edge on the side you signal."
},
{
"key": "AutoLaneChangeBsmDelay",
"widget": "toggle",