mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-03 12:32:06 +08:00
firmware fingerprinting: order brand requests (#23311)
Co-authored-by: Shane Smiskol <shane@smiskol.com> old-commit-hash: 89d1d9f6df6e1fbe65609fa8f93702479620a50e
This commit is contained in:
@@ -8,7 +8,7 @@ from system.version import is_comma_remote, is_tested_branch
|
||||
from selfdrive.car.interfaces import get_interface_attr
|
||||
from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
|
||||
from selfdrive.car.vin import get_vin, VIN_UNKNOWN
|
||||
from selfdrive.car.fw_versions import get_fw_versions, match_fw_to_car, get_present_ecus
|
||||
from selfdrive.car.fw_versions import get_fw_versions_ordered, match_fw_to_car, get_present_ecus
|
||||
from system.swaglog import cloudlog
|
||||
import cereal.messaging as messaging
|
||||
from selfdrive.car import gen_empty_fingerprint
|
||||
@@ -99,7 +99,7 @@ def fingerprint(logcan, sendcan):
|
||||
cloudlog.warning("Getting VIN & FW versions")
|
||||
_, vin = get_vin(logcan, sendcan, bus)
|
||||
ecu_rx_addrs = get_present_ecus(logcan, sendcan)
|
||||
car_fw = get_fw_versions(logcan, sendcan)
|
||||
car_fw = get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs)
|
||||
|
||||
exact_fw_match, fw_candidates = match_fw_to_car(car_fw)
|
||||
else:
|
||||
|
||||
@@ -225,6 +225,15 @@ def build_fw_dict(fw_versions, filter_brand=None):
|
||||
return fw_versions_dict
|
||||
|
||||
|
||||
def get_brand_addrs():
|
||||
versions = get_interface_attr('FW_VERSIONS', ignore_none=True)
|
||||
brand_addrs = defaultdict(set)
|
||||
for brand, cars in versions.items():
|
||||
for fw in cars.values():
|
||||
brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()}
|
||||
return brand_addrs
|
||||
|
||||
|
||||
def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None):
|
||||
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version
|
||||
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
|
||||
@@ -236,7 +245,7 @@ def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None):
|
||||
# time and only one is in our database.
|
||||
exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug]
|
||||
|
||||
# Build lookup table from (addr, subaddr, fw) to list of candidate cars
|
||||
# Build lookup table from (addr, sub_addr, fw) to list of candidate cars
|
||||
all_fw_versions = defaultdict(list)
|
||||
for candidate, fw_by_addr in FW_VERSIONS.items():
|
||||
if candidate == exclude:
|
||||
@@ -361,24 +370,59 @@ def get_present_ecus(logcan, sendcan):
|
||||
return ecu_responses
|
||||
|
||||
|
||||
def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progress=False):
|
||||
ecu_types = {}
|
||||
def get_brand_ecu_matches(ecu_rx_addrs):
|
||||
"""Returns dictionary of brands and matches with ECUs in their FW versions"""
|
||||
|
||||
brand_addrs = get_brand_addrs()
|
||||
brand_matches = {r.brand: set() for r in REQUESTS}
|
||||
|
||||
brand_rx_offsets = set((r.brand, r.rx_offset) for r in REQUESTS)
|
||||
for addr, sub_addr, _ in ecu_rx_addrs:
|
||||
# Since we can't know what request an ecu responded to, add matches for all possible rx offsets
|
||||
for brand, rx_offset in brand_rx_offsets:
|
||||
a = (uds.get_rx_addr_for_tx_addr(addr, -rx_offset), sub_addr)
|
||||
if a in brand_addrs[brand]:
|
||||
brand_matches[brand].add(a)
|
||||
|
||||
return brand_matches
|
||||
|
||||
|
||||
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, debug=False, progress=False):
|
||||
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
|
||||
|
||||
all_car_fw = []
|
||||
brand_matches = get_brand_ecu_matches(ecu_rx_addrs)
|
||||
|
||||
for brand in sorted(brand_matches, key=lambda b: len(brand_matches[b]), reverse=True):
|
||||
car_fw = get_fw_versions(logcan, sendcan, brand=brand, timeout=timeout, debug=debug, progress=progress)
|
||||
all_car_fw.extend(car_fw)
|
||||
matches = match_fw_to_car_exact(build_fw_dict(car_fw))
|
||||
if len(matches) == 1:
|
||||
break
|
||||
|
||||
return all_car_fw
|
||||
|
||||
|
||||
def get_fw_versions(logcan, sendcan, brand=None, extra=None, timeout=0.1, debug=False, progress=False):
|
||||
versions = get_interface_attr('FW_VERSIONS', ignore_none=True)
|
||||
if brand is not None:
|
||||
versions = {brand: versions[brand]}
|
||||
|
||||
if extra is not None:
|
||||
versions.update(extra)
|
||||
|
||||
# Extract ECU addresses to query from fingerprints
|
||||
# ECUs using a subaddress need be queried one by one, the rest can be done in parallel
|
||||
addrs = []
|
||||
parallel_addrs = []
|
||||
|
||||
versions = get_interface_attr('FW_VERSIONS', ignore_none=True)
|
||||
if extra is not None:
|
||||
versions.update(extra)
|
||||
ecu_types = {}
|
||||
|
||||
for brand, brand_versions in versions.items():
|
||||
for c in brand_versions.values():
|
||||
for ecu_type, addr, sub_addr in c.keys():
|
||||
a = (brand, addr, sub_addr)
|
||||
if a not in ecu_types:
|
||||
ecu_types[(addr, sub_addr)] = ecu_type
|
||||
ecu_types[a] = ecu_type
|
||||
|
||||
if sub_addr is None:
|
||||
if a not in parallel_addrs:
|
||||
@@ -390,17 +434,17 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr
|
||||
addrs.insert(0, parallel_addrs)
|
||||
|
||||
fw_versions = {}
|
||||
for i, addr in enumerate(tqdm(addrs, disable=not progress)):
|
||||
requests = [r for r in REQUESTS if brand is None or r.brand == brand]
|
||||
for addr in tqdm(addrs, disable=not progress):
|
||||
for addr_chunk in chunks(addr):
|
||||
for r in REQUESTS:
|
||||
for r in requests:
|
||||
try:
|
||||
addrs = [(a, s) for (b, a, s) in addr_chunk if b in (r.brand, 'any') and
|
||||
(len(r.whitelist_ecus) == 0 or ecu_types[(a, s)] in r.whitelist_ecus)]
|
||||
(len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)]
|
||||
|
||||
if addrs:
|
||||
query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug)
|
||||
t = 2 * timeout if i == 0 else timeout
|
||||
fw_versions.update({(r.brand, addr): (version, r) for addr, version in query.get_data(t).items()})
|
||||
fw_versions.update({(r.brand, addr): (version, r) for addr, version in query.get_data(timeout).items()})
|
||||
except Exception:
|
||||
cloudlog.warning(f"FW query exception: {traceback.format_exc()}")
|
||||
|
||||
@@ -409,7 +453,7 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr
|
||||
for (brand, addr), (version, request) in fw_versions.items():
|
||||
f = car.CarParams.CarFw.new_message()
|
||||
|
||||
f.ecu = ecu_types[addr]
|
||||
f.ecu = ecu_types[(brand, addr[0], addr[1])]
|
||||
f.fwVersion = version
|
||||
f.address = addr[0]
|
||||
f.responseAddress = uds.get_rx_addr_for_tx_addr(addr[0], request.rx_offset)
|
||||
@@ -433,6 +477,7 @@ if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Get firmware version of ECUs')
|
||||
parser.add_argument('--scan', action='store_true')
|
||||
parser.add_argument('--debug', action='store_true')
|
||||
parser.add_argument('--brand', help='Only query addresses/with requests for this brand')
|
||||
args = parser.parse_args()
|
||||
|
||||
logcan = messaging.sub_sock('can')
|
||||
@@ -458,7 +503,7 @@ if __name__ == "__main__":
|
||||
print()
|
||||
|
||||
t = time.time()
|
||||
fw_vers = get_fw_versions(logcan, sendcan, extra=extra, debug=args.debug, progress=True)
|
||||
fw_vers = get_fw_versions(logcan, sendcan, brand=args.brand, extra=extra, debug=args.debug, progress=True)
|
||||
_, candidates = match_fw_to_car(fw_vers)
|
||||
|
||||
print()
|
||||
|
||||
Reference in New Issue
Block a user