mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-23 06:52:07 +08:00
FW_QUERY_CONFIGS: type annotate (#31265)
* annotate * fix * clean up * test * clean up * space * fix
This commit is contained in:
@@ -97,15 +97,12 @@ class FwQueryConfig:
|
||||
new_request.bus += 4
|
||||
self.requests.append(new_request)
|
||||
|
||||
def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, include_ecu_type: bool = True,
|
||||
include_extra_ecus: bool = True) -> set[EcuAddrSubAddr] | set[AddrType]:
|
||||
def get_all_ecus(self, offline_fw_versions: OfflineFwVersions,
|
||||
include_extra_ecus: bool = True) -> set[EcuAddrSubAddr]:
|
||||
# Add ecus in database + extra ecus
|
||||
brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus}
|
||||
|
||||
if include_extra_ecus:
|
||||
brand_ecus |= set(self.extra_ecus)
|
||||
|
||||
if not include_ecu_type:
|
||||
return {(addr, subaddr) for _, addr, subaddr in brand_ecus}
|
||||
|
||||
return brand_ecus
|
||||
|
||||
@@ -8,7 +8,7 @@ import panda.python.uds as uds
|
||||
from cereal import car
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
|
||||
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType
|
||||
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig
|
||||
from openpilot.selfdrive.car.interfaces import get_interface_attr
|
||||
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
|
||||
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
||||
@@ -18,7 +18,7 @@ Ecu = car.CarParams.Ecu
|
||||
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa]
|
||||
FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug]
|
||||
|
||||
FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True)
|
||||
FW_QUERY_CONFIGS: dict[str, FwQueryConfig] = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True)
|
||||
VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True)
|
||||
|
||||
MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e}
|
||||
@@ -204,7 +204,7 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
|
||||
def get_brand_ecu_matches(ecu_rx_addrs: Set[EcuAddrBusType]) -> dict[str, set[AddrType]]:
|
||||
"""Returns dictionary of brands and matches with ECUs in their FW versions"""
|
||||
|
||||
brand_addrs = {brand: config.get_all_ecus(VERSIONS[brand], include_ecu_type=False) for
|
||||
brand_addrs = {brand: {(addr, subaddr) for _, addr, subaddr in config.get_all_ecus(VERSIONS[brand])} for
|
||||
brand, config in FW_QUERY_CONFIGS.items()}
|
||||
brand_matches: dict[str, set[AddrType]] = {brand: set() for brand, _, _ in REQUESTS}
|
||||
|
||||
@@ -287,8 +287,8 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
|
||||
# Get versions and build capnp list to put into CarParams
|
||||
car_fw = []
|
||||
requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)]
|
||||
for addr in tqdm(addrs, disable=not progress):
|
||||
for addr_chunk in chunks(addr):
|
||||
for addr_group in tqdm(addrs, disable=not progress): # split by subaddr, if any
|
||||
for addr_chunk in chunks(addr_group):
|
||||
for brand, config, r in requests:
|
||||
# Skip query if no panda available
|
||||
if r.bus > num_pandas * 4 - 1:
|
||||
|
||||
@@ -150,7 +150,7 @@ class TestFwFingerprint(unittest.TestCase):
|
||||
# Ensure each brand has at least 1 ECU to query, and extra ECU retrieval
|
||||
for brand, config in FW_QUERY_CONFIGS.items():
|
||||
self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0)
|
||||
self.assertEqual(config.get_all_ecus({}, include_ecu_type=True), set(config.extra_ecus))
|
||||
self.assertEqual(config.get_all_ecus({}), set(config.extra_ecus))
|
||||
self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0)
|
||||
|
||||
def test_fw_request_ecu_whitelist(self):
|
||||
|
||||
Reference in New Issue
Block a user