mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-20 13:32:04 +08:00
Adaptive Coasting Mode (ACM) - 2025/05/20
This commit is contained in:
@@ -119,4 +119,6 @@ inline static std::unordered_map<std::string, uint32_t> keys = {
|
||||
{"Version", PERSISTENT},
|
||||
{"dp_device_last_log", CLEAR_ON_MANAGER_START},
|
||||
{"dp_device_reset_conf", CLEAR_ON_MANAGER_START},
|
||||
{"dp_lon_acm", PERSISTENT},
|
||||
{"dp_lon_acm_downhill", PERSISTENT},
|
||||
};
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
'''
|
||||
MIT Non-Commercial License
|
||||
|
||||
Copyright (c) 2019, dragonpilot
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, for non-commercial purposes only, subject to the following conditions:
|
||||
|
||||
- The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
- Commercial use (e.g., use in a product, service, or activity intended to generate revenue) is prohibited without explicit written permission from dragonpilot. Contact ricklan@gmail.com for inquiries.
|
||||
- Any project that uses the Software must visibly mention the following acknowledgment: "This project uses software from dragonpilot and is licensed under a custom license requiring permission for use."
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
Downhill Coasting allows the vehicle to maintain or slightly increase speed on downhill slopes without braking.
|
||||
'''
|
||||
|
||||
import numpy as np
|
||||
|
||||
SLOPE = -0.04
|
||||
RATIO = 0.9
|
||||
|
||||
TTC = 5.
|
||||
TTC_BP = [3.5, 2.5]
|
||||
MIN_BRAKE_ALLOW_VALS = [0., -0.5]
|
||||
|
||||
class ACM:
|
||||
def __init__(self):
|
||||
self.enabled = False
|
||||
self.downhill_only = False
|
||||
self._is_downhill = False
|
||||
self._is_speed_over_cruise = False
|
||||
self._has_lead = False
|
||||
self._active_prev = False
|
||||
|
||||
self.active = False
|
||||
self.just_disabled = False
|
||||
self.allowed_brake_val = 0.
|
||||
|
||||
def update_states(self, cs, rs, user_ctrl_lon, v_ego, v_cruise):
|
||||
self.lead_ttc = float('inf') # Default if no lead
|
||||
|
||||
if not self.enabled:
|
||||
self.active = False
|
||||
return
|
||||
|
||||
if len(cs.orientationNED) != 3:
|
||||
self.active = False
|
||||
return
|
||||
|
||||
pitch_rad = cs.orientationNED[1]
|
||||
self._is_downhill = np.sin(pitch_rad) < SLOPE
|
||||
self._is_speed_over_cruise = v_ego > (v_cruise * RATIO)
|
||||
|
||||
lead = rs.leadOne
|
||||
if lead and lead.status:
|
||||
self.lead_ttc = lead.dRel / v_ego if v_ego > 0 else float('inf')
|
||||
self.allowed_brake_val = np.interp(self.lead_ttc, TTC_BP, MIN_BRAKE_ALLOW_VALS)
|
||||
self._has_lead = self.lead_ttc < TTC
|
||||
else:
|
||||
self._has_lead = False
|
||||
|
||||
self.active = not user_ctrl_lon and not self._has_lead and self._is_speed_over_cruise and (self._is_downhill if self.downhill_only else True)
|
||||
|
||||
self.just_disabled = self._active_prev and not self.active
|
||||
self._active_prev = self.active
|
||||
|
||||
def update_a_desired_trajectory(self, a_desired_trajectory):
|
||||
if not self.active:
|
||||
return a_desired_trajectory
|
||||
|
||||
# Suppress all braking to allow smooth coasting
|
||||
for i in range(len(a_desired_trajectory)):
|
||||
if a_desired_trajectory[i] < 0 and a_desired_trajectory[i] > self.allowed_brake_val:
|
||||
a_desired_trajectory[i] = 0.0
|
||||
return a_desired_trajectory
|
||||
|
||||
def update_output_a_target(self, output_a_target):
|
||||
if not self.active:
|
||||
return output_a_target
|
||||
|
||||
# Suppress braking
|
||||
if output_a_target < 0 and output_a_target > self.allowed_brake_val:
|
||||
output_a_target = 0.0
|
||||
return output_a_target
|
||||
@@ -14,6 +14,7 @@ from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDX
|
||||
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, get_speed_error
|
||||
from openpilot.selfdrive.car.cruise import V_CRUISE_MAX, V_CRUISE_UNSET
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from dragonpilot.selfdrive.controls.lib.acm import ACM
|
||||
|
||||
LON_MPC_STEP = 0.2 # first step is 0.2s
|
||||
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
|
||||
@@ -27,6 +28,8 @@ _A_TOTAL_MAX_V = [1.7, 3.2]
|
||||
_A_TOTAL_MAX_BP = [20., 40.]
|
||||
|
||||
class DPFlags:
|
||||
ACM = 1
|
||||
ACM_DOWNHILL = 2
|
||||
pass
|
||||
|
||||
def get_max_accel(v_ego):
|
||||
@@ -86,6 +89,7 @@ class LongitudinalPlanner:
|
||||
self.a_desired_trajectory = np.zeros(CONTROL_N)
|
||||
self.j_desired_trajectory = np.zeros(CONTROL_N)
|
||||
self.solverExecutionTime = 0.0
|
||||
self.acm = ACM()
|
||||
|
||||
@staticmethod
|
||||
def parse_model(model_msg, model_error):
|
||||
@@ -128,6 +132,20 @@ class LongitudinalPlanner:
|
||||
# PCM cruise speed may be updated a few cycles later, check if initialized
|
||||
reset_state = reset_state or not v_cruise_initialized
|
||||
|
||||
# Update ACM status
|
||||
if not sm['selfdriveState'].experimentalMode:
|
||||
if not self.acm.enabled and dp_flags & DPFlags.ACM:
|
||||
self.acm.enabled = True
|
||||
self.acm.downhill_only = dp_flags & DPFlags.ACM_DOWNHILL
|
||||
else:
|
||||
self.acm.enabled = False
|
||||
|
||||
user_control = long_control_off if self.CP.openpilotLongitudinalControl else not sm['selfdriveState'].enabled
|
||||
self.acm.update_states(sm['carControl'], sm['radarState'], user_control, v_ego, v_cruise)
|
||||
|
||||
if self.acm.just_disabled:
|
||||
reset_state = True
|
||||
|
||||
# No change cost when user is controlling the speed, or when standstill
|
||||
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
|
||||
|
||||
@@ -167,6 +185,9 @@ class LongitudinalPlanner:
|
||||
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
|
||||
self.j_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC[:-1], self.mpc.j_solution)
|
||||
|
||||
# Apply ACM post-processing to the acceleration trajectory if active
|
||||
self.a_desired_trajectory = self.acm.update_a_desired_trajectory(self.a_desired_trajectory)
|
||||
|
||||
# TODO counter is only needed because radar is glitchy, remove once radar is gone
|
||||
self.fcw = self.mpc.crash_cnt > 2 and not sm['carState'].standstill
|
||||
if self.fcw:
|
||||
@@ -181,6 +202,9 @@ class LongitudinalPlanner:
|
||||
output_a_target, self.output_should_stop = get_accel_from_plan(self.v_desired_trajectory, self.a_desired_trajectory,
|
||||
action_t=action_t, vEgoStopping=self.CP.vEgoStopping)
|
||||
|
||||
# Apply ACM to the final output acceleration target as well
|
||||
output_a_target = self.acm.update_output_a_target(output_a_target)
|
||||
|
||||
for idx in range(2):
|
||||
accel_clip[idx] = np.clip(accel_clip[idx], self.prev_accel_clip[idx] - 0.05, self.prev_accel_clip[idx] + 0.05)
|
||||
self.output_a_target = np.clip(output_a_target, accel_clip[0], accel_clip[1])
|
||||
|
||||
@@ -23,6 +23,10 @@ def main():
|
||||
poll='modelV2')
|
||||
|
||||
dp_flags = 0
|
||||
if params.get_bool("dp_lon_acm"):
|
||||
dp_flags |= DPFlags.ACM
|
||||
if params.get_bool("dp_lon_acm_downhill"):
|
||||
dp_flags |= DPFlags.ACM_DOWNHILL
|
||||
while True:
|
||||
sm.update()
|
||||
if sm.updated['modelV2']:
|
||||
|
||||
@@ -139,6 +139,16 @@ void DPPanel::add_longitudinal_toggles() {
|
||||
QString::fromUtf8("🐉 ") + tr("Longitudinal Ctrl"),
|
||||
"",
|
||||
},
|
||||
{
|
||||
"dp_lon_acm",
|
||||
QString::fromUtf8("🚧 ") + tr("Enable Adaptive Coasting Mode (ACM)"),
|
||||
tr("Adaptive Coasting Mode (ACM) reduces braking to allow smoother coasting when appropriate.\nDOES NOT WORK with Experimental Mode enabled."),
|
||||
},
|
||||
{
|
||||
"dp_lon_acm_downhill",
|
||||
QString::fromUtf8(" ") + tr("Downhill Only"),
|
||||
tr("Limited to downhill driving."),
|
||||
},
|
||||
};
|
||||
|
||||
QWidget *label = nullptr;
|
||||
@@ -150,6 +160,9 @@ void DPPanel::add_longitudinal_toggles() {
|
||||
addItem(label);
|
||||
continue;
|
||||
}
|
||||
if ((param == "dp_lon_acm" || param == "dp_lon_acm_downhill") && !vehicle_has_long_ctrl) {
|
||||
continue;
|
||||
}
|
||||
|
||||
has_toggle = true;
|
||||
auto toggle = new ParamControl(param, title, desc, "", this);
|
||||
@@ -287,6 +300,9 @@ void DPPanel::updateStates() {
|
||||
}
|
||||
|
||||
// do state change logic here
|
||||
if (vehicle_has_long_ctrl) {
|
||||
toggles["dp_lon_acm_downhill"]->setVisible(params.getBool("dp_lon_acm"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,8 @@ def manager_init() -> None:
|
||||
("OpenpilotEnabledToggle", "1"),
|
||||
("LongitudinalPersonality", str(log.LongitudinalPersonality.standard)),
|
||||
("DisableLogging", "0"),
|
||||
("dp_lon_acm", "0"),
|
||||
("dp_lon_acm_downhill", "0"),
|
||||
]
|
||||
|
||||
if params.get_bool("RecordFrontLock"):
|
||||
|
||||
Reference in New Issue
Block a user