Merge remote-tracking branch 'sunnypilot/sunnypilot/master' into hkg-angle-steering-2025

# Conflicts:
#	opendbc_repo
This commit is contained in:
Jason Wen
2026-01-26 11:45:29 -05:00
173 changed files with 5029 additions and 3155 deletions
-1
View File
@@ -108,7 +108,6 @@ jobs:
build_mac:
name: build macOS
runs-on: ${{ ((github.repository == 'commaai/openpilot') && ((github.event_name != 'pull_request') || (github.event.pull_request.head.repo.full_name == 'commaai/openpilot'))) && 'namespace-profile-macos-8x14' || 'macos-latest' }}
if: false # There'll be one day that this works. That day is not today.
steps:
- uses: actions/checkout@v4
with:
Vendored
+1 -1
View File
@@ -22,7 +22,7 @@ shopt -s huponexit # kill all child processes when the shell exits
export CI=1
export PYTHONWARNINGS=error
export LOGPRINT=debug
#export LOGPRINT=debug # this has gotten too spammy...
export TEST_DIR=${env.TEST_DIR}
export SOURCE_DIR=${env.SOURCE_DIR}
export GIT_BRANCH=${env.GIT_BRANCH}
+4
View File
@@ -1,3 +1,7 @@
Version 0.10.4 (2026-02-17)
========================
* Lexus LS 2018 support thanks to Hacheoy!
Version 0.10.3 (2025-12-17)
========================
* New driving model #36249
+1
View File
@@ -87,6 +87,7 @@ struct OnroadEvent @0xc4fa6047f024e718 {
laneChange @50;
lowMemory @51;
stockAeb @52;
stockLkas @98;
ldw @53;
carUnrecognized @54;
invalidLkasSetting @55;
+1 -1
View File
@@ -13,7 +13,7 @@ from typing import Optional, List, Union, Dict
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.util import MovingAverage
from openpilot.common.utils import MovingAverage
NO_TRAVERSAL_LIMIT = 2**64-1
+1 -6
View File
@@ -19,11 +19,6 @@ if GetOption('extras'):
# Cython bindings
params_python = envCython.Program('params_pyx.so', 'params_pyx.pyx', LIBS=envCython['LIBS'] + [_common, 'zmq', 'json11'])
SConscript([
'transformations/SConscript',
])
Import('transformations_python')
common_python = [params_python, transformations_python]
common_python = [params_python]
Export('common_python')
+2 -2
View File
@@ -18,8 +18,8 @@ class Api:
return self.service.get_token(payload_extra, expiry_hours)
def api_get(endpoint, method='GET', timeout=None, access_token=None, **params):
return CommaConnectApi(None).api_get(endpoint, method, timeout, access_token, **params)
def api_get(endpoint, method='GET', timeout=None, access_token=None, session=None, **params):
return CommaConnectApi(None).api_get(endpoint, method, timeout, access_token, session, **params)
def get_key_pair() -> tuple[str, str, str] | tuple[None, None, None]:
+4 -2
View File
@@ -51,7 +51,7 @@ class BaseApi:
ascii_encoded_text = normalized_text.encode('ascii', 'ignore')
return ascii_encoded_text.decode()
def api_get(self, endpoint, method='GET', timeout=None, access_token=None, json=None, **params):
def api_get(self, endpoint, method='GET', timeout=None, access_token=None, session=None, json=None, **params):
headers = {}
if access_token is not None:
headers['Authorization'] = "JWT " + access_token
@@ -59,7 +59,9 @@ class BaseApi:
version = self.remove_non_ascii_chars(get_version())
headers['User-Agent'] = self.user_agent + version
return requests.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, json=json, params=params)
# TODO: add session to Api
req = requests if session is None else session
return req.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, json=json, params=params)
@staticmethod
def get_key_pair() -> tuple[str, str, str] | tuple[None, None, None]:
+6 -6
View File
@@ -4,27 +4,27 @@ from openpilot.common.utils import run_cmd, run_cmd_default
@cache
def get_commit(cwd: str = None, branch: str = "HEAD") -> str:
def get_commit(cwd: str | None = None, branch: str = "HEAD") -> str:
return run_cmd_default(["git", "rev-parse", branch], cwd=cwd)
@cache
def get_commit_date(cwd: str = None, commit: str = "HEAD") -> str:
def get_commit_date(cwd: str | None = None, commit: str = "HEAD") -> str:
return run_cmd_default(["git", "show", "--no-patch", "--format='%ct %ci'", commit], cwd=cwd)
@cache
def get_short_branch(cwd: str = None) -> str:
def get_short_branch(cwd: str | None = None) -> str:
return run_cmd_default(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd)
@cache
def get_branch(cwd: str = None) -> str:
def get_branch(cwd: str | None = None) -> str:
return run_cmd_default(["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], cwd=cwd)
@cache
def get_origin(cwd: str = None) -> str:
def get_origin(cwd: str | None = None) -> str:
try:
local_branch = run_cmd(["git", "name-rev", "--name-only", "HEAD"], cwd=cwd)
tracking_remote = run_cmd(["git", "config", "branch." + local_branch + ".remote"], cwd=cwd)
@@ -34,7 +34,7 @@ def get_origin(cwd: str = None) -> str:
@cache
def get_normalized_origin(cwd: str = None) -> str:
def get_normalized_origin(cwd: str | None = None) -> str:
return get_origin(cwd) \
.replace("git@", "", 1) \
.replace(".git", "", 1) \
+1 -1
View File
@@ -1 +1 @@
#define DEFAULT_MODEL "Dark Souls 2 (Default)"
#define DEFAULT_MODEL "WMI (Default)"
+1
View File
@@ -173,6 +173,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"QuickBootToggle", {PERSISTENT | BACKUP, BOOL, "0"}},
{"QuietMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RainbowMode", {PERSISTENT | BACKUP, BOOL, "0"}},
{"RocketFuel", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowAdvancedControls", {PERSISTENT | BACKUP, BOOL, "0"}},
{"ShowTurnSignals", {PERSISTENT | BACKUP, BOOL, "0"}},
{"StandstillTimer", {PERSISTENT | BACKUP, BOOL, "0"}},
+3 -9
View File
@@ -3,15 +3,9 @@ from numbers import Number
class PIDController:
def __init__(self, k_p, k_i, k_d=0., pos_limit=1e308, neg_limit=-1e308, rate=100):
self._k_p = k_p
self._k_i = k_i
self._k_d = k_d
if isinstance(self._k_p, Number):
self._k_p = [[0], [self._k_p]]
if isinstance(self._k_i, Number):
self._k_i = [[0], [self._k_i]]
if isinstance(self._k_d, Number):
self._k_d = [[0], [self._k_d]]
self._k_p: list[list[float]] = [[0], [k_p]] if isinstance(k_p, Number) else k_p
self._k_i: list[list[float]] = [[0], [k_i]] if isinstance(k_i, Number) else k_i
self._k_d: list[list[float]] = [[0], [k_d]] if isinstance(k_d, Number) else k_d
self.set_limits(pos_limit, neg_limit)
+5 -1
View File
@@ -13,7 +13,11 @@ public:
if (prefix.empty()) {
prefix = util::random_string(15);
}
msgq_path = Path::shm_path() + "/" + prefix;
#ifdef __APPLE__
msgq_path = "/tmp/msgq_" + prefix;
#else
msgq_path = "/dev/shm/msgq_" + prefix;
#endif
bool ret = util::create_directories(msgq_path, 0777);
assert(ret);
setenv("OPENPILOT_PREFIX", prefix.c_str(), 1);
+4 -2
View File
@@ -1,4 +1,5 @@
import os
import platform
import shutil
import uuid
@@ -9,9 +10,10 @@ from openpilot.system.hardware.hw import Paths
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT
class OpenpilotPrefix:
def __init__(self, prefix: str = None, create_dirs_on_enter: bool = True, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
def __init__(self, prefix: str | None = None, create_dirs_on_enter: bool = True, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
self.prefix = prefix if prefix else str(uuid.uuid4().hex[0:15])
self.msgq_path = os.path.join(Paths.shm_path(), "msgq_" + self.prefix)
shm_path = "/tmp" if platform.system() == "Darwin" else "/dev/shm"
self.msgq_path = os.path.join(shm_path, "msgq_" + self.prefix)
self.create_dirs_on_enter = create_dirs_on_enter
self.clean_dirs_on_exit = clean_dirs_on_exit
self.shared_download_cache = shared_download_cache
+1 -1
View File
@@ -6,7 +6,7 @@ import time
from setproctitle import getproctitle
from openpilot.common.util import MovingAverage
from openpilot.common.utils import MovingAverage
from openpilot.system.hardware import PC
-5
View File
@@ -1,5 +0,0 @@
Import('env', 'envCython')
transformations = env.Library('transformations', ['orientation.cc', 'coordinates.cc'])
transformations_python = envCython.Program('transformations.so', 'transformations.pyx')
Export('transformations', 'transformations_python')
@@ -102,3 +102,36 @@ class TestNED:
np.testing.assert_allclose(converter.ned2ecef(ned_offsets_batch),
ecef_positions_offset_batch,
rtol=1e-9, atol=1e-7)
def test_errors(self):
# Test wrong shape/type for geodetic2ecef
# numpy_wrap raises IndexError for scalar input
with np.testing.assert_raises(IndexError):
coord.geodetic2ecef(1.0)
with np.testing.assert_raises_regex(ValueError, "Geodetic must be size 3"):
coord.geodetic2ecef([0, 0])
with np.testing.assert_raises_regex(ValueError, "Geodetic must be size 3"):
coord.geodetic2ecef([0, 0, 0, 0])
with np.testing.assert_raises(TypeError):
coord.geodetic2ecef(['a', 'b', 'c'])
# Test LocalCoord constructor errors
with np.testing.assert_raises(ValueError):
coord.LocalCoord.from_geodetic([0, 0])
with np.testing.assert_raises(ValueError):
coord.LocalCoord.from_geodetic(1)
with np.testing.assert_raises(TypeError):
coord.LocalCoord.from_geodetic(['a', 'b', 'c'])
# Test wrong shape/type for ecef2geodetic
with np.testing.assert_raises(ValueError):
coord.ecef2geodetic([1, 2])
with np.testing.assert_raises(ValueError):
coord.ecef2geodetic([1, 2, 3, 4])
with np.testing.assert_raises(IndexError):
coord.ecef2geodetic(1.0)
@@ -1,4 +1,5 @@
import numpy as np
import pytest
from openpilot.common.transformations.orientation import euler2quat, quat2euler, euler2rot, rot2euler, \
rot2quat, quat2rot, \
@@ -59,3 +60,32 @@ class TestOrientation:
np.testing.assert_allclose(ned_eulers[i], ned_euler_from_ecef(ecef_positions[i], eulers[i]), rtol=1e-7)
#np.testing.assert_allclose(eulers[i], ecef_euler_from_ned(ecef_positions[i], ned_eulers[i]), rtol=1e-7)
# np.testing.assert_allclose(ned_eulers, ned_euler_from_ecef(ecef_positions, eulers), rtol=1e-7)
def test_inputs(self):
with pytest.raises(ValueError):
euler2quat([1, 2])
with pytest.raises(ValueError):
quat2rot([1, 2, 3])
with pytest.raises(IndexError):
rot2quat(np.zeros((2, 2)))
def test_euler_rot_consistency(self):
rpy = [0.1, 0.2, 0.3]
R = euler2rot(rpy)
# R -> q -> R
q = rot2quat(R)
R_new = quat2rot(q)
np.testing.assert_allclose(R, R_new, atol=1e-15)
# q -> R -> Euler (quat2euler) -> R
rpy_new = quat2euler(q)
R_new2 = euler2rot(rpy_new)
np.testing.assert_allclose(R, R_new2, atol=1e-15)
# R -> Euler (rot2euler) -> R
rpy_from_rot = rot2euler(R)
R_new3 = euler2rot(rpy_from_rot)
np.testing.assert_allclose(R, R_new3, atol=1e-15)
@@ -1,72 +0,0 @@
# cython: language_level=3
from libcpp cimport bool
cdef extern from "orientation.cc":
pass
cdef extern from "orientation.hpp":
cdef cppclass Quaternion "Eigen::Quaterniond":
Quaternion()
Quaternion(double, double, double, double)
double w()
double x()
double y()
double z()
cdef cppclass Vector3 "Eigen::Vector3d":
Vector3()
Vector3(double, double, double)
double operator()(int)
cdef cppclass Matrix3 "Eigen::Matrix3d":
Matrix3()
Matrix3(double*)
double operator()(int, int)
Quaternion euler2quat(const Vector3 &)
Vector3 quat2euler(const Quaternion &)
Matrix3 quat2rot(const Quaternion &)
Quaternion rot2quat(const Matrix3 &)
Vector3 rot2euler(const Matrix3 &)
Matrix3 euler2rot(const Vector3 &)
Matrix3 rot_matrix(double, double, double)
Vector3 ecef_euler_from_ned(const ECEF &, const Vector3 &)
Vector3 ned_euler_from_ecef(const ECEF &, const Vector3 &)
cdef extern from "coordinates.cc":
cdef struct ECEF:
double x
double y
double z
cdef struct NED:
double n
double e
double d
cdef struct Geodetic:
double lat
double lon
double alt
bool radians
ECEF geodetic2ecef(const Geodetic &)
Geodetic ecef2geodetic(const ECEF &)
cdef cppclass LocalCoord_c "LocalCoord":
Matrix3 ned2ecef_matrix
Matrix3 ecef2ned_matrix
LocalCoord_c(const Geodetic &, const ECEF &)
LocalCoord_c(const Geodetic &)
LocalCoord_c(const ECEF &)
NED ecef2ned(const ECEF &)
ECEF ned2ecef(const NED &)
NED geodetic2ned(const Geodetic &)
Geodetic ned2geodetic(const NED &)
cdef extern from "coordinates.hpp":
pass
+342
View File
@@ -0,0 +1,342 @@
import numpy as np
# Constants
a = 6378137.0
b = 6356752.3142
esq = 6.69437999014e-3
e1sq = 6.73949674228e-3
def geodetic2ecef_single(g):
"""
Convert geodetic coordinates (latitude, longitude, altitude) to ECEF.
"""
try:
if len(g) != 3:
raise ValueError("Geodetic must be size 3")
except TypeError:
raise ValueError("Geodetic must be a sequence of length 3") from None
lat, lon, alt = g
lat = np.radians(lat)
lon = np.radians(lon)
xi = np.sqrt(1.0 - esq * np.sin(lat)**2)
x = (a / xi + alt) * np.cos(lat) * np.cos(lon)
y = (a / xi + alt) * np.cos(lat) * np.sin(lon)
z = (a / xi * (1.0 - esq) + alt) * np.sin(lat)
return np.array([x, y, z])
def ecef2geodetic_single(e):
"""
Convert ECEF to geodetic coordinates using Ferrari's solution.
"""
x, y, z = e
r = np.sqrt(x**2 + y**2)
Esq = a**2 - b**2
F = 54 * b**2 * z**2
G = r**2 + (1 - esq) * z**2 - esq * Esq
C = (esq**2 * F * r**2) / (G**3)
S = np.cbrt(1 + C + np.sqrt(C**2 + 2 * C))
P = F / (3 * (S + 1 / S + 1)**2 * G**2)
Q = np.sqrt(1 + 2 * esq**2 * P)
r_0 = -(P * esq * r) / (1 + Q) + np.sqrt(0.5 * a**2 * (1 + 1.0 / Q) - P * (1 - esq) * z**2 / (Q * (1 + Q)) - 0.5 * P * r**2)
U = np.sqrt((r - esq * r_0)**2 + z**2)
V = np.sqrt((r - esq * r_0)**2 + (1 - esq) * z**2)
Z_0 = b**2 * z / (a * V)
h = U * (1 - b**2 / (a * V))
lat = np.arctan((z + e1sq * Z_0) / r)
lon = np.arctan2(y, x)
return np.array([np.degrees(lat), np.degrees(lon), h])
def euler2quat_single(euler):
"""
Convert Euler angles (roll, pitch, yaw) to a quaternion.
Rotation order: Z-Y-X (yaw, pitch, roll).
"""
phi, theta, psi = euler
c_phi, s_phi = np.cos(phi / 2), np.sin(phi / 2)
c_theta, s_theta = np.cos(theta / 2), np.sin(theta / 2)
c_psi, s_psi = np.cos(psi / 2), np.sin(psi / 2)
w = c_phi * c_theta * c_psi + s_phi * s_theta * s_psi
x = s_phi * c_theta * c_psi - c_phi * s_theta * s_psi
y = c_phi * s_theta * c_psi + s_phi * c_theta * s_psi
z = c_phi * c_theta * s_psi - s_phi * s_theta * c_psi
if w < 0:
return np.array([-w, -x, -y, -z])
return np.array([w, x, y, z])
def quat2euler_single(q):
"""
Convert a quaternion to Euler angles (roll, pitch, yaw).
"""
w, x, y, z = q
gamma = np.arctan2(2 * (w * x + y * z), 1 - 2 * (x**2 + y**2))
sin_arg = 2 * (w * y - z * x)
sin_arg = np.clip(sin_arg, -1.0, 1.0)
theta = np.arcsin(sin_arg)
psi = np.arctan2(2 * (w * z + x * y), 1 - 2 * (y**2 + z**2))
return np.array([gamma, theta, psi])
def quat2rot_single(q):
"""
Convert a quaternion to a 3x3 rotation matrix.
"""
w, x, y, z = q
xx, yy, zz = x * x, y * y, z * z
xy, xz, yz = x * y, x * z, y * z
wx, wy, wz = w * x, w * y, w * z
mat = np.array([
[1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)],
[2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)],
[2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)]
])
return mat
def rot2quat_single(rot):
"""
Convert a 3x3 rotation matrix to a quaternion.
"""
trace = np.trace(rot)
if trace > 0:
s = 0.5 / np.sqrt(trace + 1.0)
w = 0.25 / s
x = (rot[2, 1] - rot[1, 2]) * s
y = (rot[0, 2] - rot[2, 0]) * s
z = (rot[1, 0] - rot[0, 1]) * s
else:
if rot[0, 0] > rot[1, 1] and rot[0, 0] > rot[2, 2]:
s = 2.0 * np.sqrt(1.0 + rot[0, 0] - rot[1, 1] - rot[2, 2])
w = (rot[2, 1] - rot[1, 2]) / s
x = 0.25 * s
y = (rot[0, 1] + rot[1, 0]) / s
z = (rot[0, 2] + rot[2, 0]) / s
elif rot[1, 1] > rot[2, 2]:
s = 2.0 * np.sqrt(1.0 + rot[1, 1] - rot[0, 0] - rot[2, 2])
w = (rot[0, 2] - rot[2, 0]) / s
x = (rot[0, 1] + rot[1, 0]) / s
y = 0.25 * s
z = (rot[1, 2] + rot[2, 1]) / s
else:
s = 2.0 * np.sqrt(1.0 + rot[2, 2] - rot[0, 0] - rot[1, 1])
w = (rot[1, 0] - rot[0, 1]) / s
x = (rot[0, 2] + rot[2, 0]) / s
y = (rot[1, 2] + rot[2, 1]) / s
z = 0.25 * s
if w < 0:
return np.array([-w, -x, -y, -z])
return np.array([w, x, y, z])
def euler2rot_single(euler):
"""
Convert Euler angles (roll, pitch, yaw) to a 3x3 rotation matrix.
Rotation order: Z-Y-X (yaw, pitch, roll).
"""
phi, theta, psi = euler
cx, sx = np.cos(phi), np.sin(phi)
cy, sy = np.cos(theta), np.sin(theta)
cz, sz = np.cos(psi), np.sin(psi)
Rx = np.array([[1, 0, 0], [0, cx, -sx], [0, sx, cx]])
Ry = np.array([[cy, 0, sy], [0, 1, 0], [-sy, 0, cy]])
Rz = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]])
return Rz @ Ry @ Rx
def rot2euler_single(rot):
"""
Convert a 3x3 rotation matrix to Euler angles (roll, pitch, yaw).
"""
return quat2euler_single(rot2quat_single(rot))
def rot_matrix(roll, pitch, yaw):
"""
Create a 3x3 rotation matrix from roll, pitch, and yaw angles.
"""
return euler2rot_single([roll, pitch, yaw])
def axis_angle_to_rot(axis, angle):
"""
Convert an axis-angle representation to a 3x3 rotation matrix.
"""
c = np.cos(angle / 2)
s = np.sin(angle / 2)
q = np.array([c, s*axis[0], s*axis[1], s*axis[2]])
return quat2rot_single(q)
class LocalCoord:
"""
A class to handle conversions between ECEF and local NED coordinates.
"""
def __init__(self, geodetic=None, ecef=None):
"""
Initialize LocalCoord with either geodetic or ECEF coordinates.
"""
if geodetic is not None:
self.init_ecef = geodetic2ecef_single(geodetic)
lat, lon, _ = geodetic
elif ecef is not None:
self.init_ecef = np.array(ecef)
lat, lon, _ = ecef2geodetic_single(ecef)
else:
raise ValueError("Must provide geodetic or ecef")
lat = np.radians(lat)
lon = np.radians(lon)
self.ned2ecef_matrix = np.array([
[-np.sin(lat) * np.cos(lon), -np.sin(lon), -np.cos(lat) * np.cos(lon)],
[-np.sin(lat) * np.sin(lon), np.cos(lon), -np.cos(lat) * np.sin(lon)],
[np.cos(lat), 0, -np.sin(lat)]
])
self.ecef2ned_matrix = self.ned2ecef_matrix.T
@classmethod
def from_geodetic(cls, geodetic):
"""
Create a LocalCoord instance from geodetic coordinates.
"""
return cls(geodetic=geodetic)
@classmethod
def from_ecef(cls, ecef):
"""
Create a LocalCoord instance from ECEF coordinates.
"""
return cls(ecef=ecef)
def ecef2ned_single(self, ecef):
"""
Convert a single ECEF point to NED coordinates relative to the origin.
"""
return self.ecef2ned_matrix @ (ecef - self.init_ecef)
def ned2ecef_single(self, ned):
"""
Convert a single NED point to ECEF coordinates.
"""
return self.ned2ecef_matrix @ ned + self.init_ecef
def geodetic2ned_single(self, geodetic):
"""
Convert a single geodetic point to NED coordinates.
"""
ecef = geodetic2ecef_single(geodetic)
return self.ecef2ned_single(ecef)
def ned2geodetic_single(self, ned):
"""
Convert a single NED point to geodetic coordinates.
"""
ecef = self.ned2ecef_single(ned)
return ecef2geodetic_single(ecef)
@property
def ned_from_ecef_matrix(self):
"""
Returns the rotation matrix from ECEF to NED coordinates.
"""
return self.ecef2ned_matrix
@property
def ecef_from_ned_matrix(self):
"""
Returns the rotation matrix from NED to ECEF coordinates.
"""
return self.ned2ecef_matrix
def ecef_euler_from_ned_single(ecef_init, ned_pose):
"""
Convert NED Euler angles (roll, pitch, yaw) at a given ECEF origin
to equivalent ECEF Euler angles.
"""
converter = LocalCoord(ecef=ecef_init)
zero = np.array(ecef_init)
x0 = converter.ned2ecef_single([1, 0, 0]) - zero
y0 = converter.ned2ecef_single([0, 1, 0]) - zero
z0 = converter.ned2ecef_single([0, 0, 1]) - zero
phi, theta, psi = ned_pose
x1 = axis_angle_to_rot(z0, psi) @ x0
y1 = axis_angle_to_rot(z0, psi) @ y0
z1 = axis_angle_to_rot(z0, psi) @ z0
x2 = axis_angle_to_rot(y1, theta) @ x1
y2 = axis_angle_to_rot(y1, theta) @ y1
z2 = axis_angle_to_rot(y1, theta) @ z1
x3 = axis_angle_to_rot(x2, phi) @ x2
y3 = axis_angle_to_rot(x2, phi) @ y2
x0 = np.array([1.0, 0, 0])
y0 = np.array([0, 1.0, 0])
z0 = np.array([0, 0, 1.0])
psi_out = np.arctan2(np.dot(x3, y0), np.dot(x3, x0))
theta_out = np.arctan2(-np.dot(x3, z0), np.sqrt(np.dot(x3, x0)**2 + np.dot(x3, y0)**2))
y2 = axis_angle_to_rot(z0, psi_out) @ y0
z2 = axis_angle_to_rot(y2, theta_out) @ z0
phi_out = np.arctan2(np.dot(y3, z2), np.dot(y3, y2))
return np.array([phi_out, theta_out, psi_out])
def ned_euler_from_ecef_single(ecef_init, ecef_pose):
"""
Convert ECEF Euler angles (roll, pitch, yaw) at a given ECEF origin
to equivalent NED Euler angles.
"""
converter = LocalCoord(ecef=ecef_init)
x0 = np.array([1.0, 0, 0])
y0 = np.array([0, 1.0, 0])
z0 = np.array([0, 0, 1.0])
phi, theta, psi = ecef_pose
x1 = axis_angle_to_rot(z0, psi) @ x0
y1 = axis_angle_to_rot(z0, psi) @ y0
z1 = axis_angle_to_rot(z0, psi) @ z0
x2 = axis_angle_to_rot(y1, theta) @ x1
y2 = axis_angle_to_rot(y1, theta) @ y1
z2 = axis_angle_to_rot(y1, theta) @ z1
x3 = axis_angle_to_rot(x2, phi) @ x2
y3 = axis_angle_to_rot(x2, phi) @ y2
zero = np.array(ecef_init)
x0 = converter.ned2ecef_single([1, 0, 0]) - zero
y0 = converter.ned2ecef_single([0, 1, 0]) - zero
z0 = converter.ned2ecef_single([0, 0, 1]) - zero
psi_out = np.arctan2(np.dot(x3, y0), np.dot(x3, x0))
theta_out = np.arctan2(-np.dot(x3, z0), np.sqrt(np.dot(x3, x0)**2 + np.dot(x3, y0)**2))
y2 = axis_angle_to_rot(z0, psi_out) @ y0
z2 = axis_angle_to_rot(y2, theta_out) @ z0
phi_out = np.arctan2(np.dot(y3, z2), np.dot(y3, y2))
return np.array([phi_out, theta_out, psi_out])
-173
View File
@@ -1,173 +0,0 @@
# distutils: language = c++
# cython: language_level = 3
from openpilot.common.transformations.transformations cimport Matrix3, Vector3, Quaternion
from openpilot.common.transformations.transformations cimport ECEF, NED, Geodetic
from openpilot.common.transformations.transformations cimport euler2quat as euler2quat_c
from openpilot.common.transformations.transformations cimport quat2euler as quat2euler_c
from openpilot.common.transformations.transformations cimport quat2rot as quat2rot_c
from openpilot.common.transformations.transformations cimport rot2quat as rot2quat_c
from openpilot.common.transformations.transformations cimport euler2rot as euler2rot_c
from openpilot.common.transformations.transformations cimport rot2euler as rot2euler_c
from openpilot.common.transformations.transformations cimport rot_matrix as rot_matrix_c
from openpilot.common.transformations.transformations cimport ecef_euler_from_ned as ecef_euler_from_ned_c
from openpilot.common.transformations.transformations cimport ned_euler_from_ecef as ned_euler_from_ecef_c
from openpilot.common.transformations.transformations cimport geodetic2ecef as geodetic2ecef_c
from openpilot.common.transformations.transformations cimport ecef2geodetic as ecef2geodetic_c
from openpilot.common.transformations.transformations cimport LocalCoord_c
import numpy as np
cimport numpy as np
cdef np.ndarray[double, ndim=2] matrix2numpy(Matrix3 m):
return np.array([
[m(0, 0), m(0, 1), m(0, 2)],
[m(1, 0), m(1, 1), m(1, 2)],
[m(2, 0), m(2, 1), m(2, 2)],
])
cdef Matrix3 numpy2matrix(np.ndarray[double, ndim=2, mode="fortran"] m):
assert m.shape[0] == 3
assert m.shape[1] == 3
return Matrix3(<double*>m.data)
cdef ECEF list2ecef(ecef):
cdef ECEF e
e.x = ecef[0]
e.y = ecef[1]
e.z = ecef[2]
return e
cdef NED list2ned(ned):
cdef NED n
n.n = ned[0]
n.e = ned[1]
n.d = ned[2]
return n
cdef Geodetic list2geodetic(geodetic):
cdef Geodetic g
g.lat = geodetic[0]
g.lon = geodetic[1]
g.alt = geodetic[2]
return g
def euler2quat_single(euler):
cdef Vector3 e = Vector3(euler[0], euler[1], euler[2])
cdef Quaternion q = euler2quat_c(e)
return [q.w(), q.x(), q.y(), q.z()]
def quat2euler_single(quat):
cdef Quaternion q = Quaternion(quat[0], quat[1], quat[2], quat[3])
cdef Vector3 e = quat2euler_c(q)
return [e(0), e(1), e(2)]
def quat2rot_single(quat):
cdef Quaternion q = Quaternion(quat[0], quat[1], quat[2], quat[3])
cdef Matrix3 r = quat2rot_c(q)
return matrix2numpy(r)
def rot2quat_single(rot):
cdef Matrix3 r = numpy2matrix(np.asfortranarray(rot, dtype=np.double))
cdef Quaternion q = rot2quat_c(r)
return [q.w(), q.x(), q.y(), q.z()]
def euler2rot_single(euler):
cdef Vector3 e = Vector3(euler[0], euler[1], euler[2])
cdef Matrix3 r = euler2rot_c(e)
return matrix2numpy(r)
def rot2euler_single(rot):
cdef Matrix3 r = numpy2matrix(np.asfortranarray(rot, dtype=np.double))
cdef Vector3 e = rot2euler_c(r)
return [e(0), e(1), e(2)]
def rot_matrix(roll, pitch, yaw):
return matrix2numpy(rot_matrix_c(roll, pitch, yaw))
def ecef_euler_from_ned_single(ecef_init, ned_pose):
cdef ECEF init = list2ecef(ecef_init)
cdef Vector3 pose = Vector3(ned_pose[0], ned_pose[1], ned_pose[2])
cdef Vector3 e = ecef_euler_from_ned_c(init, pose)
return [e(0), e(1), e(2)]
def ned_euler_from_ecef_single(ecef_init, ecef_pose):
cdef ECEF init = list2ecef(ecef_init)
cdef Vector3 pose = Vector3(ecef_pose[0], ecef_pose[1], ecef_pose[2])
cdef Vector3 e = ned_euler_from_ecef_c(init, pose)
return [e(0), e(1), e(2)]
def geodetic2ecef_single(geodetic):
cdef Geodetic g = list2geodetic(geodetic)
cdef ECEF e = geodetic2ecef_c(g)
return [e.x, e.y, e.z]
def ecef2geodetic_single(ecef):
cdef ECEF e = list2ecef(ecef)
cdef Geodetic g = ecef2geodetic_c(e)
return [g.lat, g.lon, g.alt]
cdef class LocalCoord:
cdef LocalCoord_c * lc
def __init__(self, geodetic=None, ecef=None):
assert (geodetic is not None) or (ecef is not None)
if geodetic is not None:
self.lc = new LocalCoord_c(list2geodetic(geodetic))
elif ecef is not None:
self.lc = new LocalCoord_c(list2ecef(ecef))
@property
def ned2ecef_matrix(self):
return matrix2numpy(self.lc.ned2ecef_matrix)
@property
def ecef2ned_matrix(self):
return matrix2numpy(self.lc.ecef2ned_matrix)
@property
def ned_from_ecef_matrix(self):
return self.ecef2ned_matrix
@property
def ecef_from_ned_matrix(self):
return self.ned2ecef_matrix
@classmethod
def from_geodetic(cls, geodetic):
return cls(geodetic=geodetic)
@classmethod
def from_ecef(cls, ecef):
return cls(ecef=ecef)
def ecef2ned_single(self, ecef):
assert self.lc
cdef ECEF e = list2ecef(ecef)
cdef NED n = self.lc.ecef2ned(e)
return [n.n, n.e, n.d]
def ned2ecef_single(self, ned):
assert self.lc
cdef NED n = list2ned(ned)
cdef ECEF e = self.lc.ned2ecef(n)
return [e.x, e.y, e.z]
def geodetic2ned_single(self, geodetic):
assert self.lc
cdef Geodetic g = list2geodetic(geodetic)
cdef NED n = self.lc.geodetic2ned(g)
return [n.n, n.e, n.d]
def ned2geodetic_single(self, ned):
assert self.lc
cdef NED n = list2ned(ned)
cdef Geodetic g = self.lc.ned2geodetic(n)
return [g.lat, g.lon, g.alt]
def __dealloc__(self):
del self.lc
-46
View File
@@ -1,46 +0,0 @@
import os
import subprocess
def sudo_write(val: str, path: str) -> None:
try:
with open(path, 'w') as f:
f.write(str(val))
except PermissionError:
os.system(f"sudo chmod a+w {path}")
try:
with open(path, 'w') as f:
f.write(str(val))
except PermissionError:
# fallback for debugfs files
os.system(f"sudo su -c 'echo {val} > {path}'")
def sudo_read(path: str) -> str:
try:
return subprocess.check_output(f"sudo cat {path}", shell=True, encoding='utf8').strip()
except Exception:
return ""
class MovingAverage:
def __init__(self, window_size: int):
self.window_size: int = window_size
self.buffer: list[float] = [0.0] * window_size
self.index: int = 0
self.count: int = 0
self.sum: float = 0.0
def add_value(self, new_value: float):
# Update the sum: subtract the value being replaced and add the new value
self.sum -= self.buffer[self.index]
self.buffer[self.index] = new_value
self.sum += new_value
# Update the index in a circular manner
self.index = (self.index + 1) % self.window_size
# Track the number of added values (for partial windows)
self.count = min(self.count + 1, self.window_size)
def get_average(self) -> float:
if self.count == 0:
return float('nan')
return self.sum / self.count
+50 -3
View File
@@ -7,14 +7,61 @@ import time
import functools
from subprocess import Popen, PIPE, TimeoutExpired
import zstandard as zstd
from openpilot.common.swaglog import cloudlog
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
def sudo_write(val: str, path: str) -> None:
try:
with open(path, 'w') as f:
f.write(str(val))
except PermissionError:
os.system(f"sudo chmod a+w {path}")
try:
with open(path, 'w') as f:
f.write(str(val))
except PermissionError:
# fallback for debugfs files
os.system(f"sudo su -c 'echo {val} > {path}'")
def sudo_read(path: str) -> str:
try:
return subprocess.check_output(f"sudo cat {path}", shell=True, encoding='utf8').strip()
except Exception:
return ""
class MovingAverage:
def __init__(self, window_size: int):
self.window_size: int = window_size
self.buffer: list[float] = [0.0] * window_size
self.index: int = 0
self.count: int = 0
self.sum: float = 0.0
def add_value(self, new_value: float):
# Update the sum: subtract the value being replaced and add the new value
self.sum -= self.buffer[self.index]
self.buffer[self.index] = new_value
self.sum += new_value
# Update the index in a circular manner
self.index = (self.index + 1) % self.window_size
# Track the number of added values (for partial windows)
self.count = min(self.count + 1, self.window_size)
def get_average(self) -> float:
if self.count == 0:
return float('nan')
return self.sum / self.count
class CallbackReader:
"""Wraps a file, but overrides the read method to also
call a callback function with the number of bytes read so far."""
def __init__(self, f, callback, *args):
self.f = f
self.callback = callback
@@ -107,11 +154,11 @@ def retry(attempts=3, delay=1.0, ignore_failure=False):
try:
return func(*args, **kwargs)
except Exception:
cloudlog.exception(f"{func.__name__} failed, trying again")
print(f"{func.__name__} failed, trying again")
time.sleep(delay)
if ignore_failure:
cloudlog.error(f"{func.__name__} failed after retry")
print(f"{func.__name__} failed after retry")
else:
raise Exception(f"{func.__name__} failed after retry")
return wrapper
+1 -1
View File
@@ -1 +1 @@
#define COMMA_VERSION "0.10.3"
#define COMMA_VERSION "0.10.4"
+1 -1
View File
@@ -16,7 +16,7 @@ export VECLIB_MAXIMUM_THREADS=1
export QCOM_PRIORITY=12
if [ -z "$AGNOS_VERSION" ]; then
export AGNOS_VERSION="15.1"
export AGNOS_VERSION="16"
fi
export STAGING_ROOT="/data/safe_staging"
+1 -1
Submodule panda updated: 5f3c09c910...f9cdec7f7b
+45 -43
View File
@@ -14,7 +14,7 @@ dependencies = [
"pyserial", # pigeond + qcomgpsd
"requests", # many one-off uses
"sympy", # rednose + friends
"crcmod", # cars + qcomgpsd
"crcmod-plus", # cars + qcomgpsd
"tqdm", # cars (fw_versions.py) on start + many one-off uses
# hardwared
@@ -49,7 +49,7 @@ dependencies = [
# logging
"pyzmq",
"sentry-sdk",
"xattr", # used in place of 'os.getxattr' for macos compatibility
"xattr", # used in place of 'os.getxattr' for macOS compatibility
# athena
"PyJWT",
@@ -72,7 +72,7 @@ dependencies = [
"zstandard",
# ui
"raylib < 5.5.0.3", # TODO: unpin when they fix https://github.com/electronstudio/raylib-python-cffi/issues/186
"raylib > 5.5.0.3",
"qrcode",
"mapbox-earcut",
]
@@ -87,7 +87,7 @@ docs = [
testing = [
"coverage",
"hypothesis ==6.47.*",
"mypy",
"ty",
"pytest",
"pytest-cpp",
"pytest-subtests",
@@ -107,15 +107,13 @@ dev = [
"av",
"azure-identity",
"azure-storage-blob",
"dbus-next", # TODO: remove once we moved everything to jeepney
"dictdiffer",
"jeepney",
"matplotlib",
"opencv-python-headless",
"parameterized >=0.8, <0.9",
"pyautogui",
"pygame",
"pyopencl; platform_machine != 'aarch64'", # broken on arm64
"pyopencl",
"pytools>=2025.1.6; platform_machine != 'aarch64'",
"pywinctl",
"pyprof2calltree",
@@ -181,42 +179,6 @@ ignore-words-list = "bu,ro,te,ue,alo,hda,ois,nam,nams,ned,som,parm,setts,inout,w
builtin = "clear,rare,informal,code,names,en-GB_to_en-US"
skip = "./third_party/*, ./tinygrad/*, ./tinygrad_repo/*, ./msgq/*, ./panda/*, ./opendbc/*, ./opendbc_repo/*, ./rednose/*, ./rednose_repo/*, ./teleoprtc/*, ./teleoprtc_repo/*, *.po, uv.lock, *.onnx, ./cereal/gen/*, */c_generated_code/*, docs/assets/*, tools/plotjuggler/layouts/*, selfdrive/assets/offroad/mici_fcc.html"
[tool.mypy]
python_version = "3.11"
exclude = [
"cereal/",
"msgq/",
"msgq_repo/",
"opendbc/",
"opendbc_repo/",
"panda/",
"rednose/",
"rednose_repo/",
"tinygrad/",
"tinygrad_repo/",
"teleoprtc/",
"teleoprtc_repo/",
"third_party/",
]
# third-party packages
ignore_missing_imports=true
# helpful warnings
warn_redundant_casts=true
warn_unreachable=true
warn_unused_ignores=true
# restrict dynamic typing
warn_return_any=true
# allow implicit optionals for default args
implicit_optional = true
local_partial_types=true
explicit_package_bases=true
disable_error_code = "annotation-unchecked"
# https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml
[tool.ruff]
indent-width = 2
@@ -275,3 +237,43 @@ lint.flake8-implicit-str-concat.allow-multiline = false
[tool.ruff.format]
quote-style = "preserve"
[tool.ty.src]
exclude = [
"cereal/",
"msgq/",
"msgq_repo/",
"opendbc/",
"opendbc_repo/",
"panda/",
"rednose/",
"rednose_repo/",
"tinygrad/",
"tinygrad_repo/",
"teleoprtc/",
"teleoprtc_repo/",
"third_party/",
]
[tool.ty.rules]
# Ignore unresolved imports for Cython-compiled modules (.pyx)
unresolved-import = "ignore"
# Ignore unresolved attributes - many from capnp and Cython modules
unresolved-attribute = "ignore"
# Ignore invalid method overrides - signature variance issues
invalid-method-override = "ignore"
# Ignore possibly-missing-attribute - too many false positives
possibly-missing-attribute = "ignore"
# Ignore invalid assignment - often intentional monkey-patching
invalid-assignment = "ignore"
# Ignore no-matching-overload - numpy/ctypes overload matching issues
no-matching-overload = "ignore"
# Ignore invalid-argument-type - many false positives from raylib, ctypes, numpy
invalid-argument-type = "ignore"
# Ignore call-non-callable - false positives from dynamic types
call-non-callable = "ignore"
# Ignore unsupported-operator - false positives from dynamic types
unsupported-operator = "ignore"
# Ignore not-subscriptable - false positives from dynamic types
not-subscriptable = "ignore"
# not-iterable errors are now fixed
+8 -9
View File
@@ -4,18 +4,17 @@
## release checklist
### Go to staging
- [ ] make a GitHub issue to track release
- [ ] make a GitHub issue to track release with this checklist
- [ ] create release master branch
- [ ] update RELEASES.md
- [ ] create a branch from upstream master named `zerotentwo` for release `v0.10.2`
- [ ] revert risky commits (double check with autonomy team)
- [ ] push the new branch
- [ ] push to staging:
- [ ] make sure you are on the newly created release master branch (`zerotentwo`)
- [ ] run `BRANCH=devel-staging release/build_stripped.sh`. Jenkins will then automatically build staging on device, run `test_onroad` and update the staging branch
- [ ] bump version on master: `common/version.h` and `RELEASES.md`
- [ ] build new userdata partition from `release3-staging`
- [ ] post on Discord, tag `@release crew`
Updating staging:
1. either rebase on master or cherry-pick changes
2. run this to update: `BRANCH=devel-staging release/build_devel.sh`
3. build new userdata partition from `release3-staging`
### Go to release
- [ ] before going to release, test the following:
- [ ] update from previous release -> new release
@@ -26,7 +25,7 @@ Updating staging:
- [ ] check sentry, MTBF, etc.
- [ ] stress test passes in production
- [ ] publish the blog post
- [ ] `git reset --hard origin/release3-staging`
- [ ] `git reset --hard origin/release-mici-staging`
- [ ] tag the release: `git tag v0.X.X <commit-hash> && git push origin v0.X.X`
- [ ] create GitHub release
- [ ] final test install on `openpilot.comma.ai`
+6 -6
View File
@@ -55,7 +55,7 @@ function run_tests() {
run "check_nomerge_comments" $DIR/check_nomerge_comments.sh $ALL_FILES
if [[ -z "$FAST" ]]; then
run "mypy" mypy $PYTHON_FILES
run "ty" ty check
run "codespell" codespell $ALL_FILES --ignore-words=$ROOT/.codespellignore
fi
@@ -69,7 +69,7 @@ function help() {
echo ""
echo -e "${BOLD}${UNDERLINE}Tests:${NC}"
echo -e " ${BOLD}ruff${NC}"
echo -e " ${BOLD}mypy${NC}"
echo -e " ${BOLD}ty${NC}"
echo -e " ${BOLD}codespell${NC}"
echo -e " ${BOLD}check_added_large_files${NC}"
echo -e " ${BOLD}check_shebang_scripts_are_executable${NC}"
@@ -81,11 +81,11 @@ function help() {
echo " Specify tests to skip separated by spaces"
echo ""
echo -e "${BOLD}${UNDERLINE}Examples:${NC}"
echo " op lint mypy ruff"
echo " Only run the mypy and ruff tests"
echo " op lint ty ruff"
echo " Only run the ty and ruff tests"
echo ""
echo " op lint --skip mypy ruff"
echo " Skip the mypy and ruff tests"
echo " op lint --skip ty ruff"
echo " Skip the ty and ruff tests"
echo ""
echo " op lint"
echo " Run all the tests"
+19 -54
View File
@@ -1,7 +1,8 @@
from cereal import car, log, custom
import cereal.messaging as messaging
from cereal import car, log
from opendbc.car import DT_CTRL, structs
from opendbc.car.car_helpers import interfaces
from opendbc.car.interfaces import MAX_CTRL_SPEED
from opendbc.car.toyota.values import ToyotaFlags
from openpilot.selfdrive.selfdrived.events import Events
@@ -11,33 +12,6 @@ EventName = log.OnroadEvent.EventName
NetworkLocation = structs.CarParams.NetworkLocation
# TODO: the goal is to abstract this file into the CarState struct and make events generic
class MockCarState:
def __init__(self):
self.sm = messaging.SubMaster(['gpsLocation', 'gpsLocationExternal'])
def update(self, CS: car.CarState, CS_SP: custom.CarStateSP):
self.sm.update(0)
gps_sock = 'gpsLocationExternal' if self.sm.recv_frame['gpsLocationExternal'] > 1 else 'gpsLocation'
CS.vEgo = self.sm[gps_sock].speed
CS.vEgoRaw = self.sm[gps_sock].speed
return CS, CS_SP
BRAND_EXTRA_GEARS = {
'ford': [GearShifter.low, GearShifter.manumatic],
'nissan': [GearShifter.brake],
'chrysler': [GearShifter.low],
'honda': [GearShifter.sport],
'toyota': [GearShifter.sport],
'gm': [GearShifter.sport, GearShifter.low, GearShifter.eco, GearShifter.manumatic],
'volkswagen': [GearShifter.eco, GearShifter.sport, GearShifter.manumatic],
'hyundai': [GearShifter.sport, GearShifter.manumatic]
}
class CarSpecificEvents:
def __init__(self, CP: structs.CarParams):
self.CP = CP
@@ -48,14 +22,12 @@ class CarSpecificEvents:
self.silent_steer_warning = True
def update(self, CS: car.CarState, CS_prev: car.CarState, CC: car.CarControl):
extra_gears = BRAND_EXTRA_GEARS.get(self.CP.brand, None)
if self.CP.brand in ('body', 'mock'):
events = Events()
return Events()
elif self.CP.brand == 'chrysler':
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears)
events = self.create_common_events(CS, CS_prev)
if self.CP.brand == 'chrysler':
# Low speed steer alert hysteresis logic
if self.CP.minSteerSpeed > 0. and CS.vEgo < (self.CP.minSteerSpeed + 0.5):
self.low_speed_alert = True
@@ -65,8 +37,6 @@ class CarSpecificEvents:
events.add(EventName.belowSteerSpeed)
elif self.CP.brand == 'honda':
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears, pcm_enable=False)
if self.CP.pcmCruise and CS.vEgo < self.CP.minEnableSpeed:
events.add(EventName.belowEngageSpeed)
@@ -87,11 +57,9 @@ class CarSpecificEvents:
elif self.CP.brand == 'toyota':
# TODO: when we check for unexpected disengagement, check gear not S1, S2, S3
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears)
if self.CP.openpilotLongitudinalControl:
# Only can leave standstill when planner wants to move
if CS.cruiseState.standstill and not CS.brakePressed and CC.cruiseControl.resume:
if CS.cruiseState.standstill and not CS.brakePressed and (CC.cruiseControl.resume or self.CP.flags & ToyotaFlags.HYBRID.value):
events.add(EventName.resumeRequired)
if CS.vEgo < self.CP.minEnableSpeed:
events.add(EventName.belowEngageSpeed)
@@ -103,8 +71,6 @@ class CarSpecificEvents:
events.add(EventName.manualRestart)
elif self.CP.brand == 'gm':
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears, pcm_enable=self.CP.pcmCruise)
# Enabling at a standstill with brake is allowed
# TODO: verify 17 Volt can enable for the first time at a stop and allow for all GMs
if CS.vEgo < self.CP.minEnableSpeed and not (CS.standstill and CS.brake >= 20 and
@@ -114,8 +80,6 @@ class CarSpecificEvents:
events.add(EventName.resumeRequired)
elif self.CP.brand == 'volkswagen':
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears, pcm_enable=self.CP.pcmCruise)
if self.CP.openpilotLongitudinalControl:
if CS.vEgo < self.CP.minEnableSpeed + 0.5:
events.add(EventName.belowEngageSpeed)
@@ -123,27 +87,26 @@ class CarSpecificEvents:
events.add(EventName.speedTooLow)
# TODO: this needs to be implemented generically in carState struct
# if CC.eps_timer_soft_disable_alert: # type: ignore[attr-defined]
# if CC.eps_timer_soft_disable_alert:
# events.add(EventName.steerTimeLimit)
elif self.CP.brand == 'hyundai':
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears, pcm_enable=self.CP.pcmCruise, allow_button_cancel=False)
else:
events = self.create_common_events(CS, CS_prev, extra_gears=extra_gears)
return events
def create_common_events(self, CS: structs.CarState, CS_prev: car.CarState, extra_gears: list | None = None, pcm_enable=True,
allow_button_cancel=True):
def create_common_events(self, CS: structs.CarState, CS_prev: car.CarState):
events = Events()
CI = interfaces[self.CP.carFingerprint]
# TODO: cleanup the honda-specific logic
pcm_enable = self.CP.pcmCruise and self.CP.brand != 'honda'
# TODO: on some hyundai cars, the cancel button is also the pause/resume button,
# so only use it for cancel when running openpilot longitudinal
allow_button_cancel = self.CP.brand != 'hyundai'
if CS.doorOpen:
events.add(EventName.doorOpen)
if CS.seatbeltUnlatched:
events.add(EventName.seatbeltNotLatched)
if CS.gearShifter != GearShifter.drive and (extra_gears is None or
CS.gearShifter not in extra_gears):
if CS.gearShifter != GearShifter.drive and CS.gearShifter not in CI.DRIVABLE_GEARS:
events.add(EventName.wrongGear)
if CS.gearShifter == GearShifter.reverse:
events.add(EventName.reverseGear)
@@ -157,6 +120,8 @@ class CarSpecificEvents:
events.add(EventName.stockFcw)
if CS.stockAeb:
events.add(EventName.stockAeb)
if CS.stockLkas:
events.add(EventName.stockLkas)
if CS.vEgo > MAX_CTRL_SPEED:
events.add(EventName.speedTooHigh)
if CS.cruiseState.nonAdaptive:
+1 -5
View File
@@ -19,7 +19,6 @@ from opendbc.car.car_helpers import get_car, interfaces
from opendbc.car.interfaces import CarInterfaceBase, RadarInterfaceBase
from openpilot.selfdrive.pandad import can_capnp_to_list, can_list_to_can_capnp
from openpilot.selfdrive.car.cruise import VCruiseHelper
from openpilot.selfdrive.car.car_specific import MockCarState
from openpilot.selfdrive.car.helpers import convert_carControlSP, convert_to_capnp
from openpilot.sunnypilot.mads.helpers import set_alternative_experience, set_car_specific_params
@@ -139,7 +138,7 @@ class Car:
safety_config.safetyModel = structs.CarParams.SafetyModel.noOutput
self.CP.safetyConfigs = [safety_config]
if self.CP.secOcRequired and not is_release:
if self.CP.secOcRequired:
# Copy user key if available
try:
with open("/cache/params/SecOCKey") as f:
@@ -179,7 +178,6 @@ class Car:
self.params.put_nonblocking("CarParamsSPCache", cp_sp_bytes)
self.params.put_nonblocking("CarParamsSPPersistent", cp_sp_bytes)
self.mock_carstate = MockCarState()
self.v_cruise_helper = VCruiseHelper(self.CP, self.CP_SP)
self.is_metric = self.params.get_bool("IsMetric")
@@ -200,8 +198,6 @@ class Car:
# Update carState from CAN
CS, CS_SP = self.CI.update(can_list)
CS_SP = convert_to_capnp(CS_SP)
if self.CP.brand == 'mock':
CS, CS_SP = self.mock_carstate.update(CS, CS_SP)
# Update radar tracks from CAN
RD: structs.RadarDataT | None = self.RI.update(can_list)
@@ -108,11 +108,11 @@ if __name__ == "__main__":
uds_client = UdsClient(panda, 0x7D0, bus=args.bus)
print("\n[START DIAGNOSTIC SESSION]")
session_type : SESSION_TYPE = 0x07 # type: ignore
session_type : SESSION_TYPE = 0x07
uds_client.diagnostic_session_control(session_type)
print("[HARDWARE/SOFTWARE VERSION]")
fw_version_data_id : DATA_IDENTIFIER_TYPE = 0xf100 # type: ignore
fw_version_data_id : DATA_IDENTIFIER_TYPE = 0xf100
fw_version = uds_client.read_data_by_identifier(fw_version_data_id)
print(fw_version)
if fw_version not in SUPPORTED_FW_VERSIONS.keys():
@@ -120,7 +120,7 @@ if __name__ == "__main__":
sys.exit(1)
print("[GET CONFIGURATION]")
config_data_id : DATA_IDENTIFIER_TYPE = 0x0142 # type: ignore
config_data_id : DATA_IDENTIFIER_TYPE = 0x0142
current_config = uds_client.read_data_by_identifier(config_data_id)
config_values = SUPPORTED_FW_VERSIONS[fw_version]
new_config = config_values.default_config if args.default else config_values.tracks_enabled
+5 -5
View File
@@ -55,7 +55,7 @@ if __name__ == "__main__":
sw_ver = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER).decode("utf-8")
component = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.SYSTEM_NAME_OR_ENGINE_TYPE).decode("utf-8")
odx_file = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.ODX_FILE).decode("utf-8").rstrip('\x00')
current_coding = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING) # type: ignore
current_coding = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING)
coding_text = current_coding.hex()
print("\nEPS diagnostic data\n")
@@ -126,9 +126,9 @@ if __name__ == "__main__":
new_coding = current_coding[0:coding_byte] + new_byte.to_bytes(1, "little") + current_coding[coding_byte+1:]
try:
seed = uds_client.security_access(ACCESS_TYPE_LEVEL_1.REQUEST_SEED) # type: ignore
seed = uds_client.security_access(ACCESS_TYPE_LEVEL_1.REQUEST_SEED)
key = struct.unpack("!I", seed)[0] + 28183 # yeah, it's like that
uds_client.security_access(ACCESS_TYPE_LEVEL_1.SEND_KEY, struct.pack("!I", key)) # type: ignore
uds_client.security_access(ACCESS_TYPE_LEVEL_1.SEND_KEY, struct.pack("!I", key))
except (NegativeResponseError, MessageTimeoutError):
print("Security access failed!")
print("Open the hood and retry (disables the \"diagnostic firewall\" on newer vehicles)")
@@ -148,7 +148,7 @@ if __name__ == "__main__":
uds_client.write_data_by_identifier(DATA_IDENTIFIER_TYPE.PROGRAMMING_DATE, prog_date)
tester_num = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER)
uds_client.write_data_by_identifier(DATA_IDENTIFIER_TYPE.REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER, tester_num)
uds_client.write_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING, new_coding) # type: ignore
uds_client.write_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING, new_coding)
except (NegativeResponseError, MessageTimeoutError):
print("Writing new configuration failed!")
print("Make sure the comma processes are stopped: tmux kill-session -t comma")
@@ -156,7 +156,7 @@ if __name__ == "__main__":
try:
# Read back result just to make 100% sure everything worked
current_coding_text = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING).hex() # type: ignore
current_coding_text = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING).hex()
print(f" New coding: {current_coding_text}")
except (NegativeResponseError, MessageTimeoutError):
print("Reading back updated coding failed!")
-1
View File
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
# type: ignore
'''
System tools like top/htop can only show current cpu usage values, so I write this script to do statistics jobs.
Features:
+2 -4
View File
@@ -99,8 +99,7 @@ def cycle_alerts(duration=200, is_metric=False):
alert = AM.process_alerts(frame, [])
print(alert)
for _ in range(duration):
dat = messaging.new_message()
dat.init('selfdriveState')
dat = messaging.new_message('selfdriveState')
dat.selfdriveState.enabled = False
if alert:
@@ -112,8 +111,7 @@ def cycle_alerts(duration=200, is_metric=False):
dat.selfdriveState.alertSound = alert.audible_alert
pm.send('selfdriveState', dat)
dat = messaging.new_message()
dat.init('deviceState')
dat = messaging.new_message('deviceState')
dat.deviceState.started = True
pm.send('deviceState', dat)
+6 -5
View File
@@ -28,11 +28,12 @@ def get_fingerprint(lr):
# TODO: also print the fw fingerprint merged with the existing ones
# show FW fingerprint
print("\nFW fingerprint:\n")
for f in fw:
print(f" (Ecu.{f.ecu}, {hex(f.address)}, {None if f.subAddress == 0 else f.subAddress}): [")
print(f" {f.fwVersion},")
print(" ],")
if fw:
print("\nFW fingerprint:\n")
for f in fw:
print(f" (Ecu.{f.ecu}, {hex(f.address)}, {None if f.subAddress == 0 else f.subAddress}): [")
print(f" {f.fwVersion},")
print(" ],")
print()
print(f"VIN: {vin}")
-1
View File
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
# type: ignore
import random
from collections import defaultdict
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
# type: ignore
import os
import argparse
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
# type: ignore
from collections import defaultdict
import argparse
+2 -2
View File
@@ -47,7 +47,7 @@ DEBUG = os.getenv("DEBUG") is not None
def is_calibration_valid(rpy: np.ndarray) -> bool:
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1]) # type: ignore
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])
def sanity_clip(rpy: np.ndarray) -> np.ndarray:
@@ -92,7 +92,7 @@ class Calibrator:
valid_blocks: int = 0,
wide_from_device_euler_init: np.ndarray = WIDE_FROM_DEVICE_EULER_INIT,
height_init: np.ndarray = HEIGHT_INIT,
smooth_from: np.ndarray = None) -> None:
smooth_from: np.ndarray | None = None) -> None:
if not np.isfinite(rpy_init).all():
self.rpy = RPY_INIT.copy()
else:
+1 -1
View File
@@ -94,7 +94,7 @@ class PointBuckets:
def add_point(self, x: float, y: float) -> None:
raise NotImplementedError
def get_points(self, num_points: int = None) -> Any:
def get_points(self, num_points: int | None = None) -> Any:
points = np.vstack([x.arr for x in self.buckets.values()])
if num_points is None:
return points
+2 -2
View File
@@ -127,8 +127,8 @@ class VehicleParamsLearner:
if not self.active:
# Reset time when stopped so uncertainty doesn't grow
self.kf.filter.set_filter_time(t) # type: ignore
self.kf.filter.reset_rewind() # type: ignore
self.kf.filter.set_filter_time(t)
self.kf.filter.reset_rewind()
def get_msg(self, valid: bool, debug: bool = False) -> capnp._DynamicStructBuilder:
x = self.kf.x
+1 -1
View File
@@ -35,7 +35,7 @@ MIN_BUCKET_POINTS = np.array([100, 300, 500, 500, 500, 500, 300, 100])
MIN_ENGAGE_BUFFER = 2 # secs
VERSION = 1 # bump this to invalidate old parameter caches
ALLOWED_CARS = ['toyota', 'hyundai', 'rivian', 'honda']
ALLOWED_CARS = ['toyota', 'hyundai', 'rivian', 'honda', 'volkswagen']
def slope2rot(slope):
+1 -1
View File
@@ -1,7 +1,7 @@
import os
import glob
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc', 'transformations')
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
lenv = env.Clone()
lenvCython = envCython.Clone()
+1 -1
View File
@@ -43,7 +43,7 @@ POLICY_PKL_PATH = Path(__file__).parent / 'models/driving_policy_tinygrad.pkl'
VISION_METADATA_PATH = Path(__file__).parent / 'models/driving_vision_metadata.pkl'
POLICY_METADATA_PATH = Path(__file__).parent / 'models/driving_policy_metadata.pkl'
LAT_SMOOTH_SECONDS = 0.1
LAT_SMOOTH_SECONDS = 0.0
LONG_SMOOTH_SECONDS = 0.3
MIN_LAT_CONTROL_SPEED = 0.3
+1 -1
View File
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:f8fe9a71b0fd428a045a82ed50790179f77aa664391198f078e11e7b2cb2c2d7
oid sha256:1edea5bb56f876db4cec97c150799513f6a59373f3ad152d55e4dcaab1b809e3
size 13926324
+1 -1
View File
@@ -14,7 +14,7 @@ with open(os.path.join(BASEDIR, "selfdrive/selfdrived/alerts_offroad.json")) as
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str | None = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
+12 -3
View File
@@ -303,6 +303,15 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
ET.NO_ENTRY: NoEntryAlert("Stock AEB: Risk of Collision"),
},
EventName.stockLkas: {
ET.PERMANENT: Alert(
"TAKE CONTROL",
"Stock LKAS: Lane Departure Detected",
AlertStatus.critical, AlertSize.full,
Priority.HIGH, VisualAlert.fcw, AudibleAlert.none, 2.),
ET.NO_ENTRY: NoEntryAlert("Stock LKAS: Lane Departure Detected"),
},
EventName.fcw: {
ET.PERMANENT: Alert(
"BRAKE!",
@@ -758,13 +767,13 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
# - CAN data is received, but some message are not received at the right frequency
# If you're not writing a new car port, this is usually cause by faulty wiring
EventName.canError: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("CAN Error"),
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Unknown Vehicle Variant"),
ET.PERMANENT: Alert(
"CAN Error: Check Connections",
"Unknown Vehicle Variant",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 1., creation_delay=1.),
ET.NO_ENTRY: NoEntryAlert("CAN Error: Check Connections"),
ET.NO_ENTRY: NoEntryAlert("Unknown Vehicle Variant"),
},
EventName.canBusMissing: {
+1 -1
View File
@@ -44,7 +44,7 @@ class FuzzyGenerator:
except capnp.lib.capnp.KjException:
return self.generate_struct(field.schema)
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str = None) -> st.SearchStrategy[dict[str, Any]]:
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str | None = None) -> st.SearchStrategy[dict[str, Any]]:
single_fill: tuple[str, ...] = (event,) if event else (self.draw(st.sampled_from(schema.union_fields)),) if schema.union_fields else ()
fields_to_generate = schema.non_union_fields + single_fill
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in fields_to_generate if not field.endswith('DEPRECATED')})
+2 -1
View File
@@ -1,5 +1,6 @@
from collections import defaultdict
from collections.abc import Callable
from typing import cast
import capnp
import functools
import traceback
@@ -69,7 +70,7 @@ def migrate(lr: LogIterable, migration_funcs: list[MigrationFunc]):
if migration.product in grouped: # skip if product already exists
continue
sorted_indices = sorted(ii for i in migration.inputs for ii in grouped[i])
sorted_indices = sorted(ii for i in cast(list[str], migration.inputs) for ii in grouped.get(i, []))
msg_gen = [(i, lr[i]) for i in sorted_indices]
r_ops, a_ops, d_ops = migration(msg_gen)
replace_ops.extend(r_ops)
@@ -614,9 +614,9 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args,
def replay_process(
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] = None,
fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None,
captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] | None = None,
fingerprint: str | None = None, return_all_logs: bool = False, custom_params: dict[str, Any] | None = None,
captured_output_store: dict[str, dict[str, str]] | None = None, disable_progress: bool = False
) -> list[capnp._DynamicStructReader]:
if isinstance(cfg, Iterable):
cfgs = list(cfg)
+1 -1
View File
@@ -1 +1 @@
b508f43fb0481bce0859c9b6ab4f45ee690b8dab
b259f6f8f099a9d82e4c65dd5deae2e4e293007b
+1 -1
View File
@@ -16,7 +16,7 @@ from openpilot.tools.lib.openpilotci import get_url
def regen_segment(
lr: LogIterable, frs: dict[str, Any] = None,
lr: LogIterable, frs: dict[str, Any] | None = None,
processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False
) -> list[capnp._DynamicStructReader]:
all_msgs = sorted(lr, key=lambda m: m.logMonoTime)
+1 -1
View File
@@ -19,7 +19,7 @@ SOURCES: list[AzureContainer] = [
DEST = OpenpilotCIContainer
def upload_route(path: str, exclude_patterns: Iterable[str] = None) -> None:
def upload_route(path: str, exclude_patterns: Iterable[str] | None = None) -> None:
if exclude_patterns is None:
exclude_patterns = [r'dcamera\.hevc']
+1 -1
View File
@@ -39,7 +39,7 @@ class HomeLayout(Widget):
self.current_state = HomeLayoutState.HOME
self.last_refresh = 0
self.settings_callback: callable | None = None
self.settings_callback: Callable[[], None] | None = None
self.update_available = False
self.alert_count = 0
+3 -5
View File
@@ -145,20 +145,18 @@ class SettingsLayout(Widget):
if panel.instance:
panel.instance.render(content_rect)
def _handle_mouse_release(self, mouse_pos: MousePos) -> bool:
def _handle_mouse_release(self, mouse_pos: MousePos) -> None:
# Check close button
if rl.check_collision_point_rec(mouse_pos, self._close_btn_rect):
if self._close_callback:
self._close_callback()
return True
return
# Check navigation buttons
for panel_type, panel_info in self._panels.items():
if rl.check_collision_point_rec(mouse_pos, panel_info.button_rect):
self.set_current_panel(panel_type)
return True
return False
return
def set_current_panel(self, panel_type: PanelType):
if panel_type != self._current_panel:
+3 -1
View File
@@ -1,5 +1,6 @@
from enum import IntEnum
import os
import requests
import threading
import time
@@ -29,6 +30,7 @@ class PrimeState:
def __init__(self):
self._params = Params()
self._lock = threading.Lock()
self._session = requests.Session() # reuse session to reduce SSL handshake overhead
self.prime_type: PrimeType = self._load_initial_state()
self._running = False
@@ -50,7 +52,7 @@ class PrimeState:
try:
identity_token = get_token(dongle_id)
response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token)
response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token, session=self._session)
if response.status_code == 200:
data = response.json()
is_paired = data.get("is_paired", False)
+15 -14
View File
@@ -169,8 +169,8 @@ class TrainingGuideDMTutorial(Widget):
def _update_state(self):
super()._update_state()
if device.awake:
ui_state.params.put_bool("IsDriverViewEnabled", True)
if device.awake and not ui_state.params.get_bool("IsDriverViewEnabled"):
ui_state.params.put_bool_nonblocking("IsDriverViewEnabled", True)
sm = ui_state.sm
if sm.recv_frame.get("driverMonitoringState", 0) == 0:
@@ -240,19 +240,20 @@ class TrainingGuideDMTutorial(Widget):
ring_color,
)
self._back_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._back_button.rect.height,
self._back_button.rect.width,
self._back_button.rect.height,
))
if self._dialog._camera_view.frame:
self._back_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._back_button.rect.height,
self._back_button.rect.width,
self._back_button.rect.height,
))
self._good_button.render(rl.Rectangle(
self._rect.x + self._rect.width - self._good_button.rect.width - 8,
self._rect.y + self._rect.height - self._good_button.rect.height,
self._good_button.rect.width,
self._good_button.rect.height,
))
self._good_button.render(rl.Rectangle(
self._rect.x + self._rect.width - self._good_button.rect.width - 8,
self._rect.y + self._rect.height - self._good_button.rect.height,
self._good_button.rect.width,
self._good_button.rect.height,
))
# rounded border
rl.draw_rectangle_rounded_lines_ex(self._rect, 0.2 * 1.02, 10, 50, rl.BLACK)
@@ -1,3 +1,4 @@
import requests
import threading
import time
import pyray as rl
@@ -44,6 +45,7 @@ class FirehoseLayoutBase(Widget):
def __init__(self):
super().__init__()
self._params = Params()
self._session = requests.Session() # reuse session to reduce SSL handshake overhead
self._segment_count = self._get_segment_count()
self._scroll_panel = GuiScrollPanel2(horizontal=False)
@@ -203,7 +205,7 @@ class FirehoseLayoutBase(Widget):
if not dongle_id or dongle_id == UNREGISTERED_DONGLE_ID:
return
identity_token = get_token(dongle_id)
response = api_get(f"v1/devices/{dongle_id}/firehose_stats", access_token=identity_token)
response = api_get(f"v1/devices/{dongle_id}/firehose_stats", access_token=identity_token, session=self._session)
if response.status_code == 200:
data = response.json()
self._segment_count = data.get("firehose", 0)
@@ -36,8 +36,6 @@ class WifiIcon(Widget):
super().__init__()
self.set_rect(rl.Rectangle(0, 0, 89, 64))
self._wifi_slash_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_slash.png", 89, 64)
self._wifi_none_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_none.png", 89, 64)
self._wifi_low_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_low.png", 89, 64)
self._wifi_medium_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_medium.png", 89, 64)
self._wifi_full_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_full.png", 89, 64)
@@ -57,17 +55,13 @@ class WifiIcon(Widget):
return
# Determine which wifi strength icon to use
strength = round(self._network.strength / 100 * 4)
if strength == 4:
strength = round(self._network.strength / 100 * 2)
if strength == 2:
strength_icon = self._wifi_full_txt
elif strength == 3:
elif strength == 1:
strength_icon = self._wifi_medium_txt
elif strength == 2:
strength_icon = self._wifi_low_txt
elif self._network.strength < 0:
strength_icon = self._wifi_slash_txt
else:
strength_icon = self._wifi_none_txt
strength_icon = self._wifi_low_txt
icon_x = int(self._rect.x + (self._rect.width - strength_icon.width * self._scale) // 2)
icon_y = int(self._rect.y + (self._rect.height - strength_icon.height * self._scale) // 2)
@@ -388,7 +382,7 @@ class WifiUIMici(BigMultiOptionDialog):
else:
network_button = WifiItem(network)
self.add_button(network_button)
self._scroller.add_widget(network_button)
# remove networks no longer present
self._scroller._items[:] = [btn for btn in self._scroller._items if btn.option in self._networks]
@@ -402,11 +396,10 @@ class WifiUIMici(BigMultiOptionDialog):
self._wifi_manager.connect_to_network(ssid, password)
self._update_buttons()
def _on_option_selected(self, option: str, smooth_scroll: bool = True):
super()._on_option_selected(option, smooth_scroll)
def _on_option_selected(self, option: str):
super()._on_option_selected(option)
# only open if button is already selected
if option in self._networks and option == self._selected_option:
if option in self._networks:
self._network_info_page.set_current_network(self._networks[option])
self._open_network_manage_page()
@@ -453,7 +446,7 @@ class WifiUIMici(BigMultiOptionDialog):
current_selection = self.get_selected_option()
if self._restore_selection and current_selection in self._networks:
self._scroller._layout()
BigMultiOptionDialog._on_option_selected(self, current_selection, smooth_scroll=False)
BigMultiOptionDialog._on_option_selected(self, current_selection)
self._restore_selection = None
super()._render(_)
@@ -20,6 +20,7 @@ from openpilot.common.transformations.orientation import rot_from_euler
from enum import IntEnum
if gui_app.sunnypilot_ui():
from openpilot.selfdrive.ui.sunnypilot.mici.onroad.hud_renderer import HudRendererSP as HudRenderer
from openpilot.selfdrive.ui.sunnypilot.ui_state import OnroadTimerStatus
OpState = log.SelfdriveState.OpenpilotState
+3 -1
View File
@@ -183,11 +183,13 @@ class HudRenderer(Widget):
def _draw_steering_wheel(self, rect: rl.Rectangle) -> None:
wheel_txt = self._txt_wheel_critical if self._show_wheel_critical else self._txt_wheel
bsm_detected = self._has_blind_spot_detected() if gui_app.sunnypilot_ui() else False
if self._show_wheel_critical:
self._wheel_alpha_filter.update(255)
self._wheel_y_filter.update(0)
else:
if ui_state.status == UIStatus.DISENGAGED:
if ui_state.status == UIStatus.DISENGAGED or bsm_detected:
self._wheel_alpha_filter.update(0)
self._wheel_y_filter.update(wheel_txt.height / 2)
else:
+8 -8
View File
@@ -71,7 +71,7 @@ class BigCircleButton(Widget):
class BigCircleToggle(BigCircleButton):
def __init__(self, icon: str, toggle_callback: Callable = None):
def __init__(self, icon: str, toggle_callback: Callable | None = None):
super().__init__(icon, False)
self._toggle_callback = toggle_callback
@@ -251,7 +251,7 @@ class BigButton(Widget):
class BigToggle(BigButton):
def __init__(self, text: str, value: str = "", initial_state: bool = False, toggle_callback: Callable = None):
def __init__(self, text: str, value: str = "", initial_state: bool = False, toggle_callback: Callable | None = None):
super().__init__(text, value, "")
self._checked = initial_state
self._toggle_callback = toggle_callback
@@ -288,8 +288,8 @@ class BigToggle(BigButton):
class BigMultiToggle(BigToggle):
def __init__(self, text: str, options: list[str], toggle_callback: Callable = None,
select_callback: Callable = None):
def __init__(self, text: str, options: list[str], toggle_callback: Callable | None = None,
select_callback: Callable | None = None):
super().__init__(text, "", toggle_callback=toggle_callback)
assert len(options) > 0
self._options = options
@@ -327,8 +327,8 @@ class BigMultiToggle(BigToggle):
class BigMultiParamToggle(BigMultiToggle):
def __init__(self, text: str, param: str, options: list[str], toggle_callback: Callable = None,
select_callback: Callable = None):
def __init__(self, text: str, param: str, options: list[str], toggle_callback: Callable | None = None,
select_callback: Callable | None = None):
super().__init__(text, options, toggle_callback, select_callback)
self._param = param
@@ -345,7 +345,7 @@ class BigMultiParamToggle(BigMultiToggle):
class BigParamControl(BigToggle):
def __init__(self, text: str, param: str, toggle_callback: Callable = None):
def __init__(self, text: str, param: str, toggle_callback: Callable | None = None):
super().__init__(text, "", toggle_callback=toggle_callback)
self.param = param
self.params = Params()
@@ -361,7 +361,7 @@ class BigParamControl(BigToggle):
# TODO: param control base class
class BigCircleParamControl(BigCircleToggle):
def __init__(self, icon: str, param: str, toggle_callback: Callable = None):
def __init__(self, icon: str, param: str, toggle_callback: Callable | None = None):
super().__init__(icon, toggle_callback)
self._param = param
self.params = Params()
+38 -17
View File
@@ -4,17 +4,17 @@ import pyray as rl
from typing import Union
from collections.abc import Callable
from typing import cast
from openpilot.selfdrive.ui.mici.widgets.side_button import SideButton
from openpilot.system.ui.widgets import Widget, NavWidget, DialogResult
from openpilot.system.ui.widgets.label import UnifiedLabel, gui_label
from openpilot.system.ui.widgets.mici_keyboard import MiciKeyboard
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.lib.wrap_text import wrap_text
from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos
from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos, MouseEvent
from openpilot.system.ui.widgets.scroller import Scroller
from openpilot.system.ui.widgets.slider import RedBigSlider, BigSlider
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.ui.mici.widgets.button import BigButton
from openpilot.selfdrive.ui.mici.widgets.side_button import SideButton
DEBUG = False
@@ -137,7 +137,7 @@ class BigInputDialog(BigDialogBase):
hint: str,
default_text: str = "",
minimum_length: int = 1,
confirm_callback: Callable[[str], None] = None):
confirm_callback: Callable[[str], None] | None = None):
super().__init__(None, None)
self._hint_label = UnifiedLabel(hint, font_size=35, text_color=rl.Color(255, 255, 255, int(255 * 0.35)),
font_weight=FontWeight.MEDIUM)
@@ -317,39 +317,36 @@ class BigMultiOptionDialog(BigDialogBase):
BACK_TOUCH_AREA_PERCENTAGE = 0.1
def __init__(self, options: list[str], default: str | None,
right_btn: str | None = 'check', right_btn_callback: Callable[[], None] = None):
right_btn: str | None = 'check', right_btn_callback: Callable[[], None] | None = None):
super().__init__(right_btn, right_btn_callback=right_btn_callback)
self._options = options
if default is not None:
assert default in options
self._default_option: str = default or (options[0] if len(options) > 0 else "")
self._selected_option: str = self._default_option
self._default_option: str | None = default
self._selected_option: str = self._default_option or (options[0] if len(options) > 0 else "")
self._last_selected_option: str = self._selected_option
# Widget doesn't differentiate between click and drag
self._can_click = True
self._scroller = Scroller([], horizontal=False, pad_start=100, pad_end=100, spacing=0, snap_items=True)
if self._right_btn is not None:
self._scroller.set_enabled(lambda: not cast(Widget, self._right_btn).is_pressed)
for option in options:
self.add_button(BigDialogOptionButton(option))
def add_button(self, button: BigDialogOptionButton):
def click_callback(_btn=button):
self._on_option_selected(_btn.option)
button.set_click_callback(click_callback)
self._scroller.add_widget(button)
self._scroller.add_widget(BigDialogOptionButton(option))
def show_event(self):
super().show_event()
self._scroller.show_event()
self._on_option_selected(self._default_option)
if self._default_option is not None:
self._on_option_selected(self._default_option)
def get_selected_option(self) -> str:
return self._selected_option
def _on_option_selected(self, option: str, smooth_scroll: bool = True):
def _on_option_selected(self, option: str):
y_pos = 0.0
for btn in self._scroller._items:
btn = cast(BigDialogOptionButton, btn)
@@ -365,11 +362,35 @@ class BigMultiOptionDialog(BigDialogBase):
y_pos = rect_center_y - (btn.rect.y + height / 2)
break
self._scroller.scroll_to(-y_pos, smooth=smooth_scroll)
self._scroller.scroll_to(-y_pos)
def _selected_option_changed(self):
pass
def _handle_mouse_press(self, mouse_pos: MousePos):
super()._handle_mouse_press(mouse_pos)
self._can_click = True
def _handle_mouse_event(self, mouse_event: MouseEvent) -> None:
super()._handle_mouse_event(mouse_event)
# # TODO: add generic _handle_mouse_click handler to Widget
if not self._scroller.scroll_panel.is_touch_valid():
self._can_click = False
def _handle_mouse_release(self, mouse_pos: MousePos):
super()._handle_mouse_release(mouse_pos)
if not self._can_click:
return
# select current option
for btn in self._scroller._items:
btn = cast(BigDialogOptionButton, btn)
if btn.option == self._selected_option:
self._on_option_selected(btn.option)
break
def _update_state(self):
super()._update_state()
@@ -193,7 +193,7 @@ class DeviceLayoutSP(DeviceLayout):
# Text & Color
offroad_mode_btn_text = tr("Exit Always Offroad") if always_offroad else tr("Enable Always Offroad")
offroad_mode_btn_style = ButtonStyle.NORMAL if always_offroad else ButtonStyle.DANGER
offroad_mode_btn_style = ButtonStyle.PRIMARY if always_offroad else ButtonStyle.DANGER
self._always_offroad_btn.action_item.left_button.set_text(offroad_mode_btn_text)
self._always_offroad_btn.action_item.left_button.set_button_style(offroad_mode_btn_style)
@@ -37,7 +37,7 @@ from openpilot.system.ui.widgets.scroller_tici import Scroller
OP.PANEL_COLOR = rl.Color(10, 10, 10, 255)
ICON_SIZE = 70
OP.PanelType = IntEnum( # type: ignore
OP.PanelType = IntEnum(
"PanelType",
[es.name for es in OP.PanelType] + [
"SUNNYLINK",
@@ -4,27 +4,147 @@ Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from openpilot.common.params import Params
from cereal import car
from enum import IntEnum
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp, simple_button_item_sp, option_item_sp, LineSeparatorSP
from openpilot.system.ui.widgets.scroller_tici import Scroller
from openpilot.system.ui.widgets import Widget
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.steering_sub_layouts.lane_change_settings import LaneChangeSettingsLayout
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.steering_sub_layouts.mads_settings import MadsSettingsLayout
from openpilot.selfdrive.ui.sunnypilot.layouts.settings.steering_sub_layouts.torque_settings import TorqueSettingsLayout
class PanelType(IntEnum):
STEERING = 0
MADS = 1
LANE_CHANGE = 2
TORQUE_CONTROL = 3
class SteeringLayout(Widget):
def __init__(self):
super().__init__()
self._params = Params()
self._current_panel = PanelType.STEERING
self._lane_change_settings_layout = LaneChangeSettingsLayout(lambda: self._set_current_panel(PanelType.STEERING))
self._mads_settings_layout = MadsSettingsLayout(lambda: self._set_current_panel(PanelType.STEERING))
self._torque_control_layout = TorqueSettingsLayout(lambda: self._set_current_panel(PanelType.STEERING))
items = self._initialize_items()
self._scroller = Scroller(items, line_separator=True, spacing=0)
self._scroller = Scroller(items, line_separator=False, spacing=0)
def _initialize_items(self):
items = [
self._mads_base_desc = tr("Enable the beloved MADS feature. " +
"Disable toggle to revert back to stock sunnypilot engagement/disengagement.")
self._mads_limited_desc = tr("This platform supports limited MADS settings.")
self._mads_full_desc = tr("This platform supports all MADS settings.")
self._mads_check_compat_desc = tr("Start the vehicle to check vehicle compatibility.")
self._mads_toggle = toggle_item_sp(
param="Mads",
title=lambda: tr("Modular Assistive Driving System (MADS)"),
description=self._mads_base_desc,
)
self._mads_settings_button = simple_button_item_sp(
button_text=lambda: tr("Customize MADS"),
button_width=800,
callback=lambda: self._set_current_panel(PanelType.MADS)
)
self._lane_change_settings_button = simple_button_item_sp(
button_text=lambda: tr("Customize Lane Change"),
button_width=800,
callback=lambda: self._set_current_panel(PanelType.LANE_CHANGE)
)
self._blinker_control_toggle = toggle_item_sp(
param="BlinkerPauseLateralControl",
description=lambda: tr("Pause lateral control with blinker when traveling below the desired speed selected."),
title=lambda: tr("Pause Lateral Control with Blinker"),
)
self._blinker_control_options = option_item_sp(
param="BlinkerMinLateralControlSpeed",
title=lambda: tr("Minimum Speed to Pause Lateral Control"),
min_value=0,
max_value=255,
value_change_step=5,
description="",
label_callback=lambda speed: f'{speed} {"km/h" if ui_state.is_metric else "mph"}',
)
self._torque_control_toggle = toggle_item_sp(
param="EnforceTorqueControl",
title=lambda: tr("Enforce Torque Lateral Control"),
description=lambda: tr("Enable this to enforce sunnypilot to steer with Torque lateral control."),
)
self._torque_customization_button = simple_button_item_sp(
button_text=lambda: tr("Customize Torque Params"),
button_width=850,
callback=lambda: self._set_current_panel(PanelType.TORQUE_CONTROL)
)
self._nnlc_toggle = toggle_item_sp(
param="NeuralNetworkLateralControl",
title=lambda: tr("Neural Network Lateral Control (NNLC)"),
description=""
)
items = [
self._mads_toggle,
self._mads_settings_button,
LineSeparatorSP(40),
self._lane_change_settings_button,
LineSeparatorSP(40),
self._blinker_control_toggle,
self._blinker_control_options,
LineSeparatorSP(40),
self._torque_control_toggle,
self._torque_customization_button,
LineSeparatorSP(40),
self._nnlc_toggle,
]
return items
def _set_current_panel(self, panel: PanelType):
self._current_panel = panel
def _update_state(self):
super()._update_state()
torque_allowed = True
if ui_state.CP is not None:
mads_main_desc = self._mads_limited_desc if self._mads_settings_layout._mads_limited_settings() else self._mads_full_desc
self._mads_toggle.set_description(f"<b>{mads_main_desc}</b><br><br>{self._mads_base_desc}")
if ui_state.CP.steerControlType == car.CarParams.SteerControlType.angle:
ui_state.params.remove("EnforceTorqueControl")
ui_state.params.remove("NeuralNetworkLateralControl")
torque_allowed = False
else:
self._mads_toggle.set_description(f"<b>{self._mads_check_compat_desc}</b><br><br>{self._mads_base_desc}")
ui_state.params.remove("EnforceTorqueControl")
ui_state.params.remove("NeuralNetworkLateralControl")
torque_allowed = False
self._mads_toggle.action_item.set_enabled(ui_state.is_offroad())
self._mads_settings_button.action_item.set_enabled(ui_state.is_offroad() and self._mads_toggle.action_item.get_state())
self._blinker_control_options.set_visible(self._blinker_control_toggle.action_item.get_state())
enforce_torque_enabled = self._torque_control_toggle.action_item.get_state()
nnlc_enabled = self._nnlc_toggle.action_item.get_state()
self._nnlc_toggle.action_item.set_enabled(ui_state.is_offroad() and torque_allowed and not enforce_torque_enabled)
self._torque_control_toggle.action_item.set_enabled(ui_state.is_offroad() and torque_allowed and not nnlc_enabled)
self._torque_customization_button.action_item.set_enabled(self._torque_control_toggle.action_item.get_state())
def _render(self, rect):
self._scroller.render(rect)
if self._current_panel == PanelType.LANE_CHANGE:
self._lane_change_settings_layout.render(rect)
elif self._current_panel == PanelType.MADS:
self._mads_settings_layout.render(rect)
elif self._current_panel == PanelType.TORQUE_CONTROL:
self._torque_control_layout.render(rect)
else:
self._scroller.render(rect)
def show_event(self):
self._set_current_panel(PanelType.STEERING)
self._scroller.show_event()
@@ -0,0 +1,81 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections.abc import Callable
import pyray as rl
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp, option_item_sp, LineSeparatorSP
from openpilot.system.ui.widgets.network import NavButton
from openpilot.system.ui.widgets.scroller_tici import Scroller
from openpilot.system.ui.widgets import Widget
from openpilot.sunnypilot.selfdrive.controls.lib.auto_lane_change import AutoLaneChangeMode
class LaneChangeSettingsLayout(Widget):
def __init__(self, back_btn_callback: Callable):
super().__init__()
self._back_button = NavButton(tr("Back"))
self._back_button.set_click_callback(back_btn_callback)
items = self._initialize_items()
self._scroller = Scroller(items, line_separator=False, spacing=0)
def _initialize_items(self):
self._lane_change_timer = option_item_sp(
title=lambda: tr("Auto Lane Change by Blinker"),
param="AutoLaneChangeTimer",
description=lambda: tr("Set a timer to delay the auto lane change operation when the blinker is used. " +
"No nudge on the steering wheel is required to auto lane change if a timer is set. Default is Nudge.<br>" +
"Please use caution when using this feature. Only use the blinker when traffic and road conditions permit."),
min_value=-1,
max_value=5,
value_change_step=1,
label_callback=(lambda x:
tr("Off") if x == -1 else
tr("Nudge") if x == 0 else
tr("Nudgeless") if x == 1 else
f"0.5 {tr('s')}" if x == 2 else
f"1 {tr('s')}" if x == 3 else
f"2 {tr('s')}" if x == 4 else
f"3 {tr('s')}")
)
self._bsm_delay = toggle_item_sp(
param="AutoLaneChangeBsmDelay",
title=lambda: tr("Auto Lane Change: Delay with Blind Spot"),
description=lambda: tr("Toggle to enable a delay timer for seamless lane changes when blind spot monitoring " +
"(BSM) detects a obstructing vehicle, ensuring safe maneuvering."),
)
items = [
self._lane_change_timer,
LineSeparatorSP(40),
self._bsm_delay,
]
return items
def _update_state(self):
super()._update_state()
self._update_toggles()
def _render(self, rect):
self._back_button.set_position(self._rect.x, self._rect.y + 20)
self._back_button.render()
# subtract button
content_rect = rl.Rectangle(rect.x, rect.y + self._back_button.rect.height + 40, rect.width, rect.height - self._back_button.rect.height - 40)
self._scroller.render(content_rect)
def show_event(self):
self._scroller.show_event()
def _update_toggles(self):
enable_bsm = ui_state.CP and ui_state.CP.enableBsm
if not enable_bsm and ui_state.params.get_bool("AutoLaneChangeBsmDelay"):
ui_state.params.remove("AutoLaneChangeBsmDelay")
self._bsm_delay.action_item.set_enabled(enable_bsm and ui_state.params.get("AutoLaneChangeTimer", return_default=True) > AutoLaneChangeMode.NUDGE)
@@ -0,0 +1,135 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections.abc import Callable
import pyray as rl
from opendbc.sunnypilot.car.tesla.values import TeslaFlagsSP
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.multilang import tr, tr_noop
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.network import NavButton
from openpilot.system.ui.widgets.scroller_tici import Scroller
from openpilot.system.ui.sunnypilot.widgets.list_view import multiple_button_item_sp, toggle_item_sp
MADS_STEERING_MODE_OPTIONS = [
(tr("Remain Active"), tr_noop("Remain Active: ALC will remain active when the brake pedal is pressed.")),
(tr("Pause"), tr_noop("Pause: ALC will pause when the brake pedal is pressed.")),
(tr("Disengage"), tr_noop("Disengage: ALC will disengage when the brake pedal is pressed.")),
]
MADS_MAIN_CRUISE_BASE_DESC = tr("Note: For vehicles without LFA/LKAS button, disabling this will prevent lateral control engagement.")
MADS_UNIFIED_ENGAGEMENT_MODE_BASE_DESC = "{engage}<br><h4>{note}</h4>".format(
engage=tr("Engage lateral and longitudinal control with cruise control engagement."),
note=tr("Note: Once lateral control is engaged via UEM, it will remain engaged until it is manually disabled via the MADS button or car shut off."),
)
STATUS_CHECK_COMPATIBILITY = tr("Start the vehicle to check vehicle compatibility.")
DEFAULT_TO_OFF = tr("This feature defaults to OFF, and does not allow selection due to vehicle limitations.")
DEFAULT_TO_ON = tr("This feature defaults to ON, and does not allow selection due to vehicle limitations.")
STATUS_DISENGAGE_ONLY = tr("This platform only supports Disengage mode due to vehicle limitations.")
class MadsSettingsLayout(Widget):
def __init__(self, back_btn_callback: Callable):
super().__init__()
self._back_button = NavButton(tr("Back"))
self._back_button.set_click_callback(back_btn_callback)
self._initialize_items()
self._scroller = Scroller(self.items, line_separator=True, spacing=0)
def _initialize_items(self):
self._main_cruise_toggle = toggle_item_sp(
title=lambda: tr("Toggle with Main Cruise"),
description=MADS_MAIN_CRUISE_BASE_DESC,
param="MadsMainCruiseAllowed",
)
self._unified_engagement_toggle = toggle_item_sp(
title=lambda: tr("Unified Engagement Mode (UEM)"),
description=MADS_UNIFIED_ENGAGEMENT_MODE_BASE_DESC,
param="MadsUnifiedEngagementMode"
)
self._steering_mode = multiple_button_item_sp(
param="MadsSteeringMode",
title=lambda: tr("Steering Mode on Brake Pedal"),
description="",
buttons=[opt[0] for opt in MADS_STEERING_MODE_OPTIONS],
inline=False,
button_width=350,
callback=self._update_steering_mode_description,
)
self.items = [
self._main_cruise_toggle,
self._unified_engagement_toggle,
self._steering_mode,
]
def _update_state(self):
super()._update_state()
self._update_toggles()
def _render(self, rect):
self._back_button.set_position(self._rect.x, self._rect.y + 20)
self._back_button.render()
# subtract button
content_rect = rl.Rectangle(rect.x, rect.y + self._back_button.rect.height + 40, rect.width, rect.height - self._back_button.rect.height - 40)
self._scroller.render(content_rect)
def show_event(self):
self._scroller.show_event()
@staticmethod
def _mads_limited_settings() -> bool:
brand = ""
if ui_state.is_offroad():
bundle = ui_state.params.get("CarPlatformBundle")
if bundle:
brand = bundle.get("brand", "")
if not brand:
brand = ui_state.CP.brand if ui_state.CP else ""
if brand == "rivian":
return True
elif brand == "tesla":
return not (ui_state.CP_SP and ui_state.CP_SP.flags & TeslaFlagsSP.HAS_VEHICLE_BUS)
return False
def _update_steering_mode_description(self, button_index: int):
base_desc = tr("Choose how Automatic Lane Centering (ALC) behaves after the brake pedal is manually pressed in sunnypilot.")
result = base_desc + "<br><br>"
for opt in MADS_STEERING_MODE_OPTIONS:
desc = "<b>" + opt[1] + "</b>" if button_index == MADS_STEERING_MODE_OPTIONS.index(opt) else opt[1]
result += desc + "<br>"
self._steering_mode.set_description(result)
self._steering_mode.show_description(True)
def _update_toggles(self):
self._update_steering_mode_description(self._steering_mode.action_item.get_selected_button())
if self._mads_limited_settings():
ui_state.params.remove("MadsMainCruiseAllowed")
ui_state.params.put_bool("MadsUnifiedEngagementMode", True)
ui_state.params.put("MadsSteeringMode", 2)
self._main_cruise_toggle.action_item.set_enabled(False)
self._main_cruise_toggle.action_item.set_state(False)
self._main_cruise_toggle.set_description("<b>" + DEFAULT_TO_OFF + "</b><br>" + MADS_MAIN_CRUISE_BASE_DESC)
self._unified_engagement_toggle.action_item.set_enabled(False)
self._unified_engagement_toggle.action_item.set_state(True)
self._unified_engagement_toggle.set_description("<b>" + DEFAULT_TO_ON + "</b><br>" + MADS_UNIFIED_ENGAGEMENT_MODE_BASE_DESC)
self._steering_mode.action_item.set_enabled(False)
self._steering_mode.set_description(STATUS_DISENGAGE_ONLY)
self._steering_mode.action_item.set_selected_button(2)
else:
self._main_cruise_toggle.action_item.set_enabled(True)
self._main_cruise_toggle.set_description(MADS_MAIN_CRUISE_BASE_DESC)
self._unified_engagement_toggle.action_item.set_enabled(True)
self._unified_engagement_toggle.set_description(MADS_UNIFIED_ENGAGEMENT_MODE_BASE_DESC)
self._steering_mode.action_item.set_enabled(True)
@@ -0,0 +1,115 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from collections.abc import Callable
import pyray as rl
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp, option_item_sp
from openpilot.system.ui.widgets.network import NavButton
from openpilot.system.ui.widgets.scroller_tici import Scroller
from openpilot.system.ui.widgets import Widget
class TorqueSettingsLayout(Widget):
def __init__(self, back_btn_callback: Callable):
super().__init__()
self._back_button = NavButton(tr("Back"))
self._back_button.set_click_callback(back_btn_callback)
items = self._initialize_items()
self._scroller = Scroller(items, line_separator=True, spacing=0)
def _initialize_items(self):
self._self_tune_toggle = toggle_item_sp(
param="LiveTorqueParamsToggle",
title=lambda: tr("Self-Tune"),
description=lambda: tr("Enables self-tune for Torque lateral control for platforms that do not use " +
"Torque lateral control by default."),
)
self._relaxed_tune_toggle = toggle_item_sp(
param="LiveTorqueParamsRelaxedToggle",
title=lambda: tr("Less Restrict Settings for Self-Tune (Beta)"),
description=lambda: tr("Less strict settings when using Self-Tune. This allows torqued to be more " +
"forgiving when learning values."),
)
self._custom_tune_toggle = toggle_item_sp(
param="CustomTorqueParams",
title=lambda: tr("Enable Custom Tuning"),
description=lambda: tr("Enables custom tuning for Torque lateral control. " +
"Modifying Lateral Acceleration Factor and Friction below will override the offline values " +
"indicated in the YAML files within \"opendbc/car/torque_data\". " +
"The values will also be used live when \"Manual Real-Time Tuning\" toggle is enabled."),
)
self._torque_prams_override_toggle = toggle_item_sp(
param="TorqueParamsOverrideEnabled",
title=lambda: tr("Manual Real-Time Tuning"),
description=lambda: tr("Enforces the torque lateral controller to use the fixed values instead of the learned " +
"values from Self-Tune. Enabling this toggle overrides Self-Tune values."),
)
self._torque_lat_accel_factor = option_item_sp(
title=lambda: tr("Lateral Acceleration Factor"),
param="TorqueParamsOverrideLatAccelFactor",
description="",
min_value=1,
max_value=500,
value_change_step=1,
label_callback=(lambda x: f"{x/100} m/s^2"),
use_float_scaling=True
)
self._torque_friction = option_item_sp(
title=lambda: tr("Friction"),
param="TorqueParamsOverrideFriction",
description="",
min_value=1,
max_value=100,
value_change_step=1,
label_callback=(lambda x: f"{x/100}"),
use_float_scaling=True
)
items = [
self._self_tune_toggle,
self._relaxed_tune_toggle,
self._custom_tune_toggle,
self._torque_prams_override_toggle,
self._torque_lat_accel_factor,
self._torque_friction,
]
return items
def _update_state(self):
super()._update_state()
if not ui_state.params.get_bool("LiveTorqueParamsToggle"):
ui_state.params.remove("LiveTorqueParamsRelaxedToggle")
self._relaxed_tune_toggle.action_item.set_state(False)
self._self_tune_toggle.action_item.set_enabled(ui_state.is_offroad())
self._relaxed_tune_toggle.action_item.set_enabled(ui_state.is_offroad() and self._self_tune_toggle.action_item.get_state())
self._custom_tune_toggle.action_item.set_enabled(ui_state.is_offroad())
custom_tune_enabled = self._custom_tune_toggle.action_item.get_state()
self._torque_prams_override_toggle.set_visible(custom_tune_enabled)
self._torque_lat_accel_factor.set_visible(custom_tune_enabled)
self._torque_friction.set_visible(custom_tune_enabled)
self._torque_prams_override_toggle.action_item.set_enabled(ui_state.is_offroad())
sliders_enabled = self._torque_prams_override_toggle.action_item.get_state() or ui_state.is_offroad()
self._torque_lat_accel_factor.action_item.set_enabled(sliders_enabled)
self._torque_friction.action_item.set_enabled(sliders_enabled)
title_text = tr("Real-Time & Offline") if ui_state.params.get("TorqueParamsOverrideEnabled") else tr("Offline Only")
self._torque_lat_accel_factor.set_title(lambda: tr("Lateral Acceleration Factor") + " (" + title_text + ")")
self._torque_friction.set_title(lambda: tr("Friction") + " (" + title_text + ")")
def _render(self, rect):
self._back_button.set_position(self._rect.x, self._rect.y + 20)
self._back_button.render()
# subtract button
content_rect = rl.Rectangle(rect.x, rect.y + self._back_button.rect.height + 40, rect.width, rect.height - self._back_button.rect.height - 40)
self._scroller.render(content_rect)
def show_event(self):
self._scroller.show_event()
@@ -12,7 +12,7 @@ from openpilot.selfdrive.ui.sunnypilot.mici.layouts.sunnylink import SunnylinkLa
ICON_SIZE = 70
OP.PanelType = IntEnum( # type: ignore
OP.PanelType = IntEnum(
"PanelType",
[es.name for es in OP.PanelType] + [
"SUNNYLINK",
@@ -0,0 +1,27 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.mici.onroad.hud_renderer import HudRenderer
from openpilot.selfdrive.ui.sunnypilot.onroad.blind_spot_indicators import BlindSpotIndicators
class HudRendererSP(HudRenderer):
def __init__(self):
super().__init__()
self.blind_spot_indicators = BlindSpotIndicators()
def _update_state(self) -> None:
super()._update_state()
self.blind_spot_indicators.update()
def _render(self, rect: rl.Rectangle) -> None:
super()._render(rect)
self.blind_spot_indicators.render(rect)
def _has_blind_spot_detected(self) -> bool:
return self.blind_spot_indicators.detected
@@ -0,0 +1,49 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app
from openpilot.common.filter_simple import FirstOrderFilter
class BlindSpotIndicators:
def __init__(self):
self._txt_blind_spot_left: rl.Texture = gui_app.texture('icons_mici/onroad/blind_spot_left.png', 108, 128)
self._txt_blind_spot_right: rl.Texture = gui_app.texture('icons_mici/onroad/blind_spot_right.png', 108, 128)
self._blind_spot_left_alpha_filter = FirstOrderFilter(0, 0.15, 1 / gui_app.target_fps)
self._blind_spot_right_alpha_filter = FirstOrderFilter(0, 0.15, 1 / gui_app.target_fps)
def update(self) -> None:
sm = ui_state.sm
CS = sm['carState']
self._blind_spot_left_alpha_filter.update(1.0 if CS.leftBlindspot else 0.0)
self._blind_spot_right_alpha_filter.update(1.0 if CS.rightBlindspot else 0.0)
@property
def detected(self) -> bool:
return self._blind_spot_left_alpha_filter.x > 0.01 or self._blind_spot_right_alpha_filter.x > 0.01
def render(self, rect: rl.Rectangle) -> None:
BLIND_SPOT_MARGIN_X = 20 # Distance from edge of screen
BLIND_SPOT_Y_OFFSET = 100 # Distance from top of screen
if self._blind_spot_left_alpha_filter.x > 0.01:
pos_x = int(rect.x + BLIND_SPOT_MARGIN_X)
pos_y = int(rect.y + BLIND_SPOT_Y_OFFSET)
alpha = int(255 * self._blind_spot_left_alpha_filter.x)
color = rl.Color(255, 255, 255, alpha)
rl.draw_texture(self._txt_blind_spot_left, pos_x, pos_y, color)
if self._blind_spot_right_alpha_filter.x > 0.01:
pos_x = int(rect.x + rect.width - BLIND_SPOT_MARGIN_X - self._txt_blind_spot_right.width)
pos_y = int(rect.y + BLIND_SPOT_Y_OFFSET)
alpha = int(255 * self._blind_spot_right_alpha_filter.x)
color = rl.Color(255, 255, 255, alpha)
rl.draw_texture(self._txt_blind_spot_right, pos_x, pos_y, color)
@@ -6,15 +6,36 @@ See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.selfdrive.ui.onroad.hud_renderer import HudRenderer
from openpilot.selfdrive.ui.sunnypilot.onroad.developer_ui import DeveloperUiRenderer
from openpilot.selfdrive.ui.sunnypilot.onroad.road_name import RoadNameRenderer
from openpilot.selfdrive.ui.sunnypilot.onroad.rocket_fuel import RocketFuel
from openpilot.selfdrive.ui.sunnypilot.onroad.speed_limit import SpeedLimitRenderer
from openpilot.selfdrive.ui.sunnypilot.onroad.turn_signal import TurnSignalController
class HudRendererSP(HudRenderer):
def __init__(self):
super().__init__()
self.developer_ui = DeveloperUiRenderer()
self.road_name_renderer = RoadNameRenderer()
self.rocket_fuel = RocketFuel()
self.speed_limit_renderer = SpeedLimitRenderer()
self.turn_signal_controller = TurnSignalController()
def _update_state(self) -> None:
super()._update_state()
self.road_name_renderer.update()
self.speed_limit_renderer.update()
self.turn_signal_controller.update()
def _render(self, rect: rl.Rectangle) -> None:
super()._render(rect)
self.developer_ui.render(rect)
self.road_name_renderer.render(rect)
self.speed_limit_renderer.render(rect)
self.turn_signal_controller.render(rect)
if ui_state.rocket_fuel:
self.rocket_fuel.render(rect, ui_state.sm)
@@ -18,11 +18,12 @@ class RainbowPath:
BASE_ALPHA = 0.8
ALPHA_FADE = 0.3 # Alpha reduction from bottom to top
def __init__(self, num_segments: int = None, speed: float = None, saturation: float = None, lightness: float = None):
self.num_segments = num_segments if num_segments is not None else self.DEFAULT_NUM_SEGMENTS
self.speed = speed if speed is not None else self.DEFAULT_SPEED
self.saturation = saturation if saturation is not None else self.DEFAULT_SATURATION
self.lightness = lightness if lightness is not None else self.DEFAULT_LIGHTNESS
def __init__(self, num_segments: int = DEFAULT_NUM_SEGMENTS, speed: float = DEFAULT_SPEED,
saturation: float = DEFAULT_SATURATION, lightness: float = DEFAULT_LIGHTNESS):
self.num_segments = num_segments
self.speed = speed
self.saturation = saturation
self.lightness = lightness
def set_speed(self, speed: float):
self.speed = speed
@@ -0,0 +1,56 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.widgets import Widget
class RoadNameRenderer(Widget):
def __init__(self):
super().__init__()
self.road_name = ""
self.is_metric = False
self.font_demi = gui_app.font(FontWeight.SEMI_BOLD)
def update(self):
sm = ui_state.sm
if sm.recv_frame["carState"] < ui_state.started_frame:
return
self.is_metric = ui_state.is_metric
if sm.updated["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
self.road_name = lmd.roadName
def _render(self, rect: rl.Rectangle):
if not self.road_name:
return
text = self.road_name
text_size = measure_text_cached(self.font_demi, text, 46)
padding = 40
rect_width = max(200, min(text_size.x + padding, rect.width - 40))
road_rect = rl.Rectangle(rect.x + rect.width / 2 - rect_width / 2, rect.y - 4, rect_width, 60)
rl.draw_rectangle_rounded(road_rect, 0.2, 10, rl.Color(0, 0, 0, 120))
max_text_width = road_rect.width - 20
if text_size.x > max_text_width:
while text_size.x > max_text_width and len(text) > 3:
text = text[:-1]
text_size = measure_text_cached(self.font_demi, text + "...", 46)
text = text + "..."
sz = measure_text_cached(self.font_demi, text, 46)
origin = rl.Vector2(road_rect.x + road_rect.width / 2 - sz.x / 2, road_rect.y + road_rect.height / 2 - sz.y / 2)
rl.draw_text_ex(self.font_demi, text, origin, 46, 0, rl.Color(255, 255, 255, 200))
@@ -0,0 +1,45 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
class RocketFuel:
def __init__(self):
self.vc_accel = 0.0
def render(self, rect: rl.Rectangle, sm) -> None:
vc_accel0 = sm['carState'].aEgo
# Smooth the acceleration
self.vc_accel = self.vc_accel + (vc_accel0 - self.vc_accel) / 5.0
hha = 0.0
color = rl.Color(0, 0, 0, 0) # Transparent by default
if self.vc_accel > 0:
hha = 0.85 - 0.1 / self.vc_accel # only extend up to 85%
color = rl.Color(0, 245, 0, 200)
elif self.vc_accel < 0:
hha = 0.85 + 0.1 / self.vc_accel # only extend up to 85%
color = rl.Color(245, 0, 0, 200)
if hha < 0:
hha = 0.0
hha = hha * rect.height
wp = 28.0
# Draw
rect_h = rect.height
if self.vc_accel > 0:
ra_y = rect_h / 2.0 - hha / 2.0
else:
ra_y = rect_h / 2.0
if hha > 0:
rl.draw_rectangle(int(rect.x), int(rect.y + ra_y), int(wp), int(hha / 2.0), color)
@@ -0,0 +1,281 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from dataclasses import dataclass
import math
import pyray as rl
from cereal import custom
from openpilot.common.constants import CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.ui.onroad.hud_renderer import UI_CONFIG
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.common import Mode as SpeedLimitMode
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.lib.text_measure import measure_text_cached
from openpilot.system.ui.widgets import Widget
METER_TO_FOOT = 3.28084
METER_TO_MILE = 0.000621371
AHEAD_THRESHOLD = 5
AssistState = custom.LongitudinalPlanSP.SpeedLimit.AssistState
SpeedLimitSource = custom.LongitudinalPlanSP.SpeedLimit.Source
@dataclass(frozen=True)
class Colors:
WHITE = rl.WHITE
BLACK = rl.BLACK
RED = rl.RED
GREY = rl.Color(145, 155, 149, 255)
DARK_GREY = rl.Color(77, 77, 77, 255)
SUB_BG = rl.Color(0, 0, 0, 180)
MUTCD_LINES = rl.Color(255, 255, 255, 100)
class SpeedLimitRenderer(Widget):
def __init__(self):
super().__init__()
self.speed_limit = 0.0
self.speed_limit_last = 0.0
self.speed_limit_offset = 0.0
self.speed_limit_valid = False
self.speed_limit_last_valid = False
self.speed_limit_final_last = 0.0
self.speed_limit_source = SpeedLimitSource.none
self.speed_limit_assist_state = AssistState.disabled
self.speed_limit_ahead = 0.0
self.speed_limit_ahead_dist = 0.0
self.speed_limit_ahead_dist_prev = 0.0
self.speed_limit_ahead_valid = False
self.speed_limit_ahead_frame = 0
self.assist_frame = 0
self.speed = 0.0
self.set_speed = 0.0
self.font_bold = gui_app.font(FontWeight.BOLD)
self.font_demi = gui_app.font(FontWeight.SEMI_BOLD)
self.font_norm = gui_app.font(FontWeight.NORMAL)
self._sign_alpha_filter = FirstOrderFilter(1.0, 0.5, 1 / gui_app.target_fps)
arrow_size = 90
self._arrow_up = gui_app.texture("../../sunnypilot/selfdrive/assets/img_plus_arrow_up.png", arrow_size, arrow_size)
self._arrow_down = gui_app.texture("../../sunnypilot/selfdrive/assets/img_minus_arrow_down.png", arrow_size, arrow_size)
@property
def speed_conv(self):
return CV.MS_TO_KPH if ui_state.is_metric else CV.MS_TO_MPH
def update(self):
sm = ui_state.sm
if sm.recv_frame["carState"] < ui_state.started_frame:
return
if sm.updated["longitudinalPlanSP"]:
lp_sp = sm["longitudinalPlanSP"]
resolver = lp_sp.speedLimit.resolver
assist = lp_sp.speedLimit.assist
self.speed_limit = resolver.speedLimit * self.speed_conv
self.speed_limit_last = resolver.speedLimitLast * self.speed_conv
self.speed_limit_offset = resolver.speedLimitOffset * self.speed_conv
self.speed_limit_valid = resolver.speedLimitValid
self.speed_limit_last_valid = resolver.speedLimitLastValid
self.speed_limit_final_last = resolver.speedLimitFinalLast * self.speed_conv
self.speed_limit_source = resolver.source
self.speed_limit_assist_state = assist.state
if sm.updated["liveMapDataSP"]:
lmd = sm["liveMapDataSP"]
self.speed_limit_ahead_valid = lmd.speedLimitAheadValid
self.speed_limit_ahead = lmd.speedLimitAhead * self.speed_conv
self.speed_limit_ahead_dist = lmd.speedLimitAheadDistance
if self.speed_limit_ahead_dist < self.speed_limit_ahead_dist_prev and self.speed_limit_ahead_frame < AHEAD_THRESHOLD:
self.speed_limit_ahead_frame += 1
elif self.speed_limit_ahead_dist > self.speed_limit_ahead_dist_prev and self.speed_limit_ahead_frame > 0:
self.speed_limit_ahead_frame -= 1
self.speed_limit_ahead_dist_prev = self.speed_limit_ahead_dist
cs = sm["carState"]
self.set_speed = cs.cruiseState.speed * self.speed_conv
v_ego = cs.vEgoCluster if cs.vEgoCluster != 0.0 else cs.vEgo
self.speed = max(0.0, v_ego * self.speed_conv)
@staticmethod
def _draw_text_centered(font, text, size, pos_center, color):
sz = measure_text_cached(font, text, size)
rl.draw_text_ex(font, text, rl.Vector2(pos_center.x - sz.x / 2, pos_center.y - sz.y / 2), size, 0, color)
def _render(self, rect: rl.Rectangle):
width = UI_CONFIG.set_speed_width_metric if ui_state.is_metric else UI_CONFIG.set_speed_width_imperial
x = rect.x + 60 + width + 30 - 6
y = rect.y + 45 - 6
sign_rect = rl.Rectangle(x, y, width, UI_CONFIG.set_speed_height + 6 * 2)
if self.speed_limit_assist_state == AssistState.preActive:
self.assist_frame += 1
pulse_value = 0.65 + 0.35 * math.sin(self.assist_frame * math.pi / gui_app.target_fps)
alpha = self._sign_alpha_filter.update(pulse_value)
else:
self.assist_frame = 0
alpha = self._sign_alpha_filter.update(1.0)
if ui_state.speed_limit_mode != SpeedLimitMode.off:
self._draw_sign_main(sign_rect, alpha)
if self.speed_limit_assist_state == AssistState.preActive:
self._draw_pre_active_arrow(sign_rect)
else:
self._draw_ahead_info(sign_rect)
def _draw_sign_main(self, rect, alpha=1.0):
speed_limit_warning_enabled = ui_state.speed_limit_mode >= SpeedLimitMode.warning
has_limit = self.speed_limit_valid or self.speed_limit_last_valid
is_overspeed = has_limit and round(self.speed_limit_final_last) < round(self.speed)
limit_str = str(round(self.speed_limit_last)) if has_limit else "---"
sub_text = ""
if self.speed_limit_offset != 0:
sign = "" if self.speed_limit_offset > 0 else "-"
sub_text = f"{sign}{round(abs(self.speed_limit_offset))}"
txt_color = Colors.BLACK
if speed_limit_warning_enabled and is_overspeed:
txt_color = Colors.RED
elif not self.speed_limit_valid:
txt_color = Colors.GREY
if ui_state.is_metric:
self._render_vienna(rect, limit_str, sub_text, txt_color, has_limit, alpha)
else:
self._render_mutcd(rect, limit_str, sub_text, txt_color, has_limit, alpha)
def _draw_pre_active_arrow(self, sign_rect):
set_speed_rounded = round(self.set_speed)
limit_rounded = round(self.speed_limit_final_last)
bounce_frequency = 2.0 * math.pi / (gui_app.target_fps * 2.5)
bounce_offset = int(20 * math.sin(self.assist_frame * bounce_frequency))
sign_margin = 12
arrow_spacing = int(sign_margin * 1.4)
arrow_x = sign_rect.x + sign_rect.width + arrow_spacing
if set_speed_rounded < limit_rounded:
arrow_y = sign_rect.y + (sign_rect.height - self._arrow_up.height) / 2 + bounce_offset
rl.draw_texture(self._arrow_up, int(arrow_x), int(arrow_y), rl.WHITE)
elif set_speed_rounded > limit_rounded:
arrow_y = sign_rect.y + (sign_rect.height - self._arrow_down.height) / 2 - bounce_offset
rl.draw_texture(self._arrow_down, int(arrow_x), int(arrow_y), rl.WHITE)
def _render_vienna(self, rect, val, sub, color, has_limit, alpha=1.0):
center = rl.Vector2(rect.x + rect.width / 2, rect.y + rect.height / 2)
radius = (rect.width + 18) / 2
white = rl.Color(255, 255, 255, int(255 * alpha))
red = rl.Color(255, 0, 0, int(255 * alpha))
if hasattr(color, 'r'):
text_color = rl.Color(color.r, color.g, color.b, int(255 * alpha))
else:
text_color = rl.Color(color[0], color[1], color[2], int(255 * alpha))
black = rl.Color(0, 0, 0, int(255 * alpha))
dark_grey = rl.Color(77, 77, 77, int(255 * alpha))
rl.draw_circle_v(center, radius, white)
rl.draw_ring(center, radius * 0.80, radius, 0, 360, 36, red)
f_size = 70 if len(val) >= 3 else 85
self._draw_text_centered(self.font_bold, val, f_size, center, text_color)
if sub and has_limit:
s_radius = radius * 0.4
s_center = rl.Vector2(rect.x + rect.width - s_radius / 2, rect.y + s_radius / 2)
rl.draw_circle_v(s_center, s_radius, black)
rl.draw_ring(s_center, s_radius - 3, s_radius, 0, 360, 36, dark_grey)
f_scale = 0.5 if len(sub) < 3 else 0.45
self._draw_text_centered(self.font_bold, sub, int(s_radius * 2 * f_scale), s_center, white)
def _render_mutcd(self, rect, val, sub, color, has_limit, alpha=1.0):
white = rl.Color(255, 255, 255, int(255 * alpha))
black = rl.Color(0, 0, 0, int(255 * alpha))
dark_grey = rl.Color(77, 77, 77, int(255 * alpha))
if hasattr(color, 'r'):
text_color = rl.Color(color.r, color.g, color.b, int(255 * alpha))
else:
text_color = rl.Color(color[0], color[1], color[2], int(255 * alpha))
rl.draw_rectangle_rounded(rect, 0.35, 10, white)
inner = rl.Rectangle(rect.x + 10, rect.y + 10, rect.width - 20, rect.height - 20)
rl.draw_rectangle_rounded_lines_ex(inner, 0.35, 10, 4, black)
self._draw_text_centered(self.font_demi, "SPEED", 40, rl.Vector2(rect.x + rect.width / 2, rect.y + 40), black)
self._draw_text_centered(self.font_demi, "LIMIT", 40, rl.Vector2(rect.x + rect.width / 2, rect.y + 80), black)
self._draw_text_centered(self.font_bold, val, 90, rl.Vector2(rect.x + rect.width / 2, rect.y + 150), text_color)
if sub and has_limit:
box_sz = rect.width * 0.3
overlap = box_sz * 0.2
s_rect = rl.Rectangle(rect.x + rect.width - box_sz / 1.5 + overlap, rect.y - box_sz / 1.25 + overlap, box_sz, box_sz)
rl.draw_rectangle_rounded(s_rect, 0.35, 10, black)
rl.draw_rectangle_rounded_lines_ex(s_rect, 0.35, 10, 6, dark_grey)
f_scale = 0.6 if len(sub) < 3 else 0.475
self._draw_text_centered(self.font_bold, sub, int(box_sz * f_scale), rl.Vector2(s_rect.x + box_sz / 2, s_rect.y + box_sz / 2), white)
def _draw_ahead_info(self, sign_rect):
source_is_map = self.speed_limit_source == SpeedLimitSource.map
valid = self.speed_limit_ahead_valid and self.speed_limit_ahead > 0 and self.speed_limit_ahead != self.speed_limit
if not (valid and source_is_map):
return
rect = rl.Rectangle(sign_rect.x + (sign_rect.width - 170) / 2, sign_rect.y + sign_rect.height + 10, 170, 160)
rl.draw_rectangle_rounded(rect, 0.35, 10, Colors.SUB_BG)
rl.draw_rectangle_rounded_lines_ex(rect, 0.35, 10, 3, Colors.MUTCD_LINES)
mid_x = rect.x + rect.width / 2
self._draw_text_centered(self.font_demi, "AHEAD", 40, rl.Vector2(mid_x, rect.y + 28), Colors.GREY)
self._draw_text_centered(self.font_bold, str(round(self.speed_limit_ahead)), 70, rl.Vector2(mid_x, rect.y + 82), Colors.WHITE)
self._draw_text_centered(self.font_norm, self._format_dist(self.speed_limit_ahead_dist), 36, rl.Vector2(mid_x, rect.y + 134), Colors.GREY)
@staticmethod
def _format_dist(d):
# metric
if ui_state.is_metric:
if d < 50:
return tr("Near")
if d >= 1000:
return f"{d / 1000:.1f} km"
d_rounded = round(d, -1) if d < 200 else round(d, -2)
return f"{int(d_rounded)} m"
# imperial
d_ft = d * METER_TO_FOOT
if d_ft < 100:
return tr("Near")
if d_ft >= 900:
return f"{d * METER_TO_MILE:.1f} mi"
if d_ft < 500:
return f"{int(round(d_ft / 50) * 50)} ft"
return f"{int(round(d_ft / 100) * 100)} ft"
@@ -0,0 +1,160 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import pyray as rl
import time
from dataclasses import dataclass
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.selfdrive.ui.mici.onroad.alert_renderer import IconSide, TURN_SIGNAL_BLINK_PERIOD
from openpilot.system.ui.lib.application import gui_app
from openpilot.system.ui.widgets import Widget
from openpilot.common.filter_simple import FirstOrderFilter
@dataclass(frozen=True)
class TurnSignalConfig:
left_x: int = 80
left_y: int = 190
right_x: int = 80
right_y: int = 190
size: int = 150
class TurnSignalWidget(Widget):
def __init__(self, direction: IconSide):
super().__init__()
self._direction = direction
self._active = False
self._type = 'signal'
self._turn_signal_timer = 0.0
self._turn_signal_alpha_filter = FirstOrderFilter(0.0, 0.3, 1 / gui_app.target_fps)
self._signal_texture = gui_app.texture(f'icons_mici/onroad/turn_signal_{direction}.png', 120, 109)
self._blind_spot_texture = gui_app.texture(f'icons_mici/onroad/blind_spot_{direction}.png', 120, 109)
self._texture = self._signal_texture
def _render(self, _):
if not self._active:
return
if self._type == 'signal':
if time.monotonic() - self._turn_signal_timer > TURN_SIGNAL_BLINK_PERIOD:
self._turn_signal_timer = time.monotonic()
self._turn_signal_alpha_filter.x = 255 * 2
else:
self._turn_signal_alpha_filter.update(255 * 0.2)
icon_alpha = int(min(self._turn_signal_alpha_filter.x, 255))
else:
icon_alpha = 255
self._texture = self._blind_spot_texture if self._type == 'blind_spot' else self._signal_texture
if self._texture:
pos_x = int(self._rect.x + (self._rect.width - self._texture.width) / 2)
pos_y = int(self._rect.y + (self._rect.height - self._texture.height) / 2)
color = rl.Color(255, 255, 255, icon_alpha)
rl.draw_texture(self._texture, pos_x, pos_y, color)
def activate(self, _type: str = 'signal'):
if not self._active or self._type != _type:
self._turn_signal_timer = 0.0
self._active = True
self._type = _type
def deactivate(self):
self._active = False
self._turn_signal_timer = 0.0
class TurnSignalController:
def __init__(self, config: TurnSignalConfig | None = None):
self._config = config or TurnSignalConfig()
self._left_signal = TurnSignalWidget(direction=IconSide.left)
self._right_signal = TurnSignalWidget(direction=IconSide.right)
self._last_icon_side = None
def update(self):
sm = ui_state.sm
ss = sm['selfdriveState']
event_name = ss.alertType.split('/')[0] if ss.alertType else ''
if event_name == 'preLaneChangeLeft':
self._last_icon_side = IconSide.left
self._left_signal.activate('signal')
self._right_signal.deactivate()
elif event_name == 'preLaneChangeRight':
self._last_icon_side = IconSide.right
self._right_signal.activate('signal')
self._left_signal.deactivate()
elif event_name == 'laneChange':
if self._last_icon_side == IconSide.left:
self._left_signal.activate('signal')
self._right_signal.deactivate()
elif self._last_icon_side == IconSide.right:
self._right_signal.activate('signal')
self._left_signal.deactivate()
elif event_name == 'laneChangeBlocked':
CS = sm['carState']
if CS.leftBlinker:
icon_side = IconSide.left
elif CS.rightBlinker:
icon_side = IconSide.right
else:
icon_side = self._last_icon_side
if icon_side == IconSide.left:
self._left_signal.activate('blind_spot')
self._right_signal.deactivate()
elif icon_side == IconSide.right:
self._right_signal.activate('blind_spot')
self._left_signal.deactivate()
else:
self._last_icon_side = None
CS = sm['carState']
if CS.leftBlindspot:
self._left_signal.activate('blind_spot')
elif CS.leftBlinker:
self._left_signal.activate('signal')
else:
self._left_signal.deactivate()
if CS.rightBlindspot:
self._right_signal.activate('blind_spot')
elif CS.rightBlinker:
self._right_signal.activate('signal')
else:
self._right_signal.deactivate()
def render(self, rect: rl.Rectangle):
x = rect.x + rect.width / 2
left_x = x - self._config.left_x - self._config.size
left_y = rect.y + self._config.left_y
right_x = x + self._config.right_x
right_y = rect.y + self._config.right_y
if self._left_signal._active:
self._left_signal.render(rl.Rectangle(left_x, left_y, self._config.size, self._config.size))
if self._right_signal._active:
self._right_signal.render(rl.Rectangle(right_x, right_y, self._config.size, self._config.size))
@property
def config(self) -> TurnSignalConfig:
return self._config
@config.setter
def config(self, new_config: TurnSignalConfig):
self._config = new_config
+2
View File
@@ -122,10 +122,12 @@ class UIStateSP:
self.CP_SP = messaging.log_from_bytes(CP_SP_bytes, custom.CarParamsSP)
self.sunnylink_enabled = self.params.get_bool("SunnylinkEnabled")
self.developer_ui = self.params.get("DevUIInfo")
self.rocket_fuel = self.params.get_bool("RocketFuel")
self.rainbow_path = self.params.get_bool("RainbowMode")
self.chevron_metrics = self.params.get("ChevronInfo")
self.active_bundle = self.params.get("ModelManager_ActiveBundle")
self.custom_interactive_timeout = self.params.get("InteractivityTimeout", return_default=True)
self.speed_limit_mode = self.params.get("SpeedLimitMode", return_default=True)
# Onroad Screen Brightness
self.onroad_brightness = int(float(self.params.get("OnroadScreenOffBrightness", return_default=True)))
+3
View File
@@ -0,0 +1,3 @@
# Multilanguage
[![languages](https://raw.githubusercontent.com/commaai/openpilot/badges/translation_badge.svg)](#)
+3 -3
View File
@@ -200,7 +200,7 @@ msgstr "CONNECT"
#: system/ui/widgets/network.py:369
#, python-format
msgid "CONNECTING..."
msgstr "CONNECTING..."
msgstr "连接中..."
#: system/ui/widgets/confirm_dialog.py:23 system/ui/widgets/option_dialog.py:35
#: system/ui/widgets/keyboard.py:81 system/ui/widgets/network.py:318
@@ -1001,7 +1001,7 @@ msgstr "用于“{}”"
#: selfdrive/ui/onroad/hud_renderer.py:177
#, python-format
msgid "km/h"
msgstr "公里/时"
msgstr "km/h"
#: system/ui/widgets/network.py:204
#, python-format
@@ -1021,7 +1021,7 @@ msgstr "计量"
#: selfdrive/ui/onroad/hud_renderer.py:177
#, python-format
msgid "mph"
msgstr "英里/时"
msgstr "mph"
#: selfdrive/ui/layouts/settings/software.py:20
#, python-format
+3 -3
View File
@@ -200,7 +200,7 @@ msgstr "CONNECT"
#: system/ui/widgets/network.py:369
#, python-format
msgid "CONNECTING..."
msgstr "CONNECTING..."
msgstr "連線中..."
#: system/ui/widgets/confirm_dialog.py:23 system/ui/widgets/option_dialog.py:35
#: system/ui/widgets/keyboard.py:81 system/ui/widgets/network.py:318
@@ -1000,7 +1000,7 @@ msgstr "適用於「{}」"
#: selfdrive/ui/onroad/hud_renderer.py:177
#, python-format
msgid "km/h"
msgstr "公里/時"
msgstr "km/h"
#: system/ui/widgets/network.py:204
#, python-format
@@ -1020,7 +1020,7 @@ msgstr "計量"
#: selfdrive/ui/onroad/hud_renderer.py:177
#, python-format
msgid "mph"
msgstr "英里/時"
msgstr "mph"
#: selfdrive/ui/layouts/settings/software.py:20
#, python-format
+1 -1
View File
@@ -18,7 +18,7 @@ OPENAI_PROMPT = "You are a professional translator from English to {language} (I
"The following sentence or word is in the GUI of a software called openpilot, translate it accordingly."
def get_language_files(languages: list[str] = None) -> dict[str, pathlib.Path]:
def get_language_files(languages: list[str] | None = None) -> dict[str, pathlib.Path]:
files = {}
with open(TRANSLATIONS_LANGUAGES) as fp:
+2 -1
View File
@@ -1,3 +1,4 @@
SConscript(['common/transformations/SConscript'])
SConscript(['modeld/SConscript'])
SConscript(['modeld_v2/SConscript'])
SConscript(['selfdrive/locationd/SConscript'])
SConscript(['selfdrive/locationd/SConscript'])
@@ -0,0 +1,4 @@
Import('env')
transformations = env.Library('transformations', ['orientation.cc', 'coordinates.cc'])
Export('transformations')
@@ -1,6 +1,6 @@
#define _USE_MATH_DEFINES
#include "common/transformations/coordinates.hpp"
#include "sunnypilot/common/transformations/coordinates.hpp"
#include <iostream>
#include <cmath>
@@ -4,8 +4,8 @@
#include <cmath>
#include <eigen3/Eigen/Dense>
#include "common/transformations/orientation.hpp"
#include "common/transformations/coordinates.hpp"
#include "sunnypilot/common/transformations/orientation.hpp"
#include "sunnypilot/common/transformations/coordinates.hpp"
Eigen::Quaterniond ensure_unique(const Eigen::Quaterniond &quat) {
if (quat.w() > 0){
@@ -141,4 +141,3 @@ Eigen::Vector3d ned_euler_from_ecef(const ECEF &ecef_init, const Eigen::Vector3d
return {phi, theta, psi};
}
@@ -1,6 +1,6 @@
#pragma once
#include <eigen3/Eigen/Dense>
#include "common/transformations/coordinates.hpp"
#include "sunnypilot/common/transformations/coordinates.hpp"
Eigen::Quaterniond ensure_unique(const Eigen::Quaterniond &quat);
+2 -2
View File
@@ -55,7 +55,7 @@ def cleanup_old_osm_data(files_to_remove: list[str]) -> None:
shutil.rmtree(file, ignore_errors=False)
def request_refresh_osm_location_data(nations: list[str], states: list[str] = None) -> None:
def request_refresh_osm_location_data(nations: list[str], states: list[str] | None = None) -> None:
params.put("OsmDownloadedDate", str(datetime.now().timestamp()))
params.put_bool("OsmDbUpdatesCheck", False)
@@ -68,7 +68,7 @@ def request_refresh_osm_location_data(nations: list[str], states: list[str] = No
mem_params.put("OSMDownloadLocations", osm_download_locations)
def filter_nations_and_states(nations: list[str], states: list[str] = None) -> tuple[list[str], list[str]]:
def filter_nations_and_states(nations: list[str], states: list[str] | None = None) -> tuple[list[str], list[str]]:
"""Filters and prepares nation and state data for OSM map download.
If the nation is 'US' and a specific state is provided, the nation 'US' is removed from the list.
+1 -1
View File
@@ -1,4 +1,4 @@
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc', 'transformations')
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
lenv = env.Clone()
lenvCython = envCython.Clone()
+1 -1
View File
@@ -1,7 +1,7 @@
import os
import glob
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc', 'transformations')
Import('env', 'envCython', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
lenv = env.Clone()
lenvCython = envCython.Clone()
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
# type: ignore
import os
import time
+1 -1
View File
@@ -1 +1 @@
6168bc755ea17aececa535e8b94e6c798e5e855bfc47be19220d5cbc08483332
0cde93a9d20f35619182e704261ef8f6a849149fe5849ba2d355b0b2c89af53c
+4 -4
View File
@@ -8,18 +8,18 @@
#include <string>
#include "cereal/messaging/messaging.h"
#include "common/transformations/coordinates.hpp"
#include "common/transformations/orientation.hpp"
#include "common/params.h"
#include "common/swaglog.h"
#include "common/timing.h"
#include "common/util.h"
#include "sunnypilot/common/transformations/coordinates.hpp"
#include "sunnypilot/common/transformations/orientation.hpp"
#include "sunnypilot/system/sensord/sensors/constants.h"
#define VISION_DECIMATION 2
#define SENSOR_DECIMATION 10
#include "sunnypilot/selfdrive/locationd/models/live_kf.h"
#define VISION_DECIMATION 2
#define SENSOR_DECIMATION 10
#define POSENET_STD_HIST_HALF 20
enum LocalizerGnssSource {
+2 -2
View File
@@ -24,11 +24,11 @@ class SunnylinkApi(BaseApi):
self.spinner = None
self.params = Params()
def api_get(self, endpoint, method='GET', timeout=10, access_token=None, json=None, **kwargs):
def api_get(self, endpoint, method='GET', timeout=10, access_token=None, session=None, json=None, **kwargs):
if not self.params.get_bool("SunnylinkEnabled"):
return None
return super().api_get(endpoint, method, timeout, access_token, json, **kwargs)
return super().api_get(endpoint, method, timeout, access_token, session, json, **kwargs)
def resume_queued(self, timeout=10, **kwargs):
sunnylinkId, commaId = self._resolve_dongle_ids()
+1 -1
View File
@@ -281,7 +281,7 @@ def startLocalProxy(global_end_event: threading.Event, remote_ws_uri: str, local
return start_local_proxy_shim(global_end_event, local_port, ws)
def main(exit_event: threading.Event = None):
def main(exit_event: threading.Event | None = None):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:

Some files were not shown because too many files have changed in this diff Show More