Files
StarPilot/selfdrive/car/ecu_addrs.py
T
Shane Smiskol 2f35ca57b1 FPv2: log all present ECU addresses (#24916)
* eliminate brands based on ECUs that respond to tester present

* make it work

* Add type hint for can message

Use make_can_msg

* Only query for addresses in fingerprints, and account for different busses

* These need to be addresses, not response addresses

* We need to listen to response addresses, not query addresses

* add to files_common

* Unused Optional
Drain sock raw

* add logging

* only query essential ecus

comments

* simplify get_brand_candidates(), keep track of multiple request variants per make and request each subaddress

* fixes

make dat bytes

bus is src

Fix check

* (addr, subaddr, bus) can be common across brands, add a match to each brand

* fix length

* query subaddrs in sequence

* fix

* candidate if a platform is a subset of responding ecu addresses

comment

comment

* do logging for shadow mode

* log responses so we can calculate candidates offline

* get has_subaddress from response set

* one liner

* fix mypy

* set to default at top

* always log for now

* log to make sure it's taking exactly timeout time

* import time

* fix logging

* 0.1 timeout

* clean up

Co-authored-by: Greg Hogan <gregjhogan@gmail.com>
old-commit-hash: cccab50b166272d13ab51eab1eb10e21beca604e
2022-06-20 22:14:13 -07:00

91 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python3
import capnp
import time
import traceback
from typing import Optional, Set, Tuple
import cereal.messaging as messaging
from panda.python.uds import SERVICE_TYPE
from selfdrive.car import make_can_msg
from selfdrive.boardd.boardd import can_list_to_can_capnp
from system.swaglog import cloudlog
def make_tester_present_msg(addr, bus, subaddr=None):
dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0]
if subaddr is not None:
dat.insert(0, subaddr)
dat.extend([0x0] * (8 - len(dat)))
return make_can_msg(addr, bytes(dat), bus)
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: Optional[int] = None) -> bool:
# ISO-TP messages are always padded to 8 bytes
# tester present response is always a single frame
dat_offset = 1 if subaddr is not None else 0
if len(msg.dat) == 8 and 1 <= msg.dat[dat_offset] <= 7:
# success response
if msg.dat[dat_offset + 1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40):
return True
# error response
if msg.dat[dat_offset + 1] == 0x7F and msg.dat[dat_offset + 2] == SERVICE_TYPE.TESTER_PRESENT:
return True
return False
def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[Tuple[int, Optional[int], int]]:
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
queries: Set[Tuple[int, Optional[int], int]] = {(addr, None, bus) for addr in addr_list}
responses = queries
return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[Tuple[int, Optional[int], int]],
responses: Set[Tuple[int, Optional[int], int]], timeout: float = 1, debug: bool = False) -> Set[Tuple[int, Optional[int], int]]:
ecu_responses: Set[Tuple[int, Optional[int], int]] = set() # set((addr, subaddr, bus),)
try:
msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]
messaging.drain_sock_raw(logcan)
sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
start_time = time.monotonic()
while time.monotonic() - start_time < timeout:
can_packets = messaging.drain_sock(logcan, wait_for_one=True)
for packet in can_packets:
for msg in packet.can:
subaddr = None if (msg.address, None, msg.src) in responses else msg.dat[0]
if (msg.address, subaddr, msg.src) in responses and is_tester_present_response(msg, subaddr):
if debug:
print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}")
if (msg.address, subaddr, msg.src) in ecu_responses:
print(f"Duplicate ECU address: {hex(msg.address)}")
ecu_responses.add((msg.address, subaddr, msg.src))
except Exception:
cloudlog.warning(f"ECU addr scan exception: {traceback.format_exc()}")
return ecu_responses
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Get addresses of all ECUs')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
logcan = messaging.sub_sock('can')
sendcan = messaging.pub_sock('sendcan')
time.sleep(1.0)
print("Getting ECU addresses ...")
ecu_addrs = get_all_ecu_addrs(logcan, sendcan, 1, debug=args.debug)
print()
print("Found ECUs on addresses:")
for addr, subaddr, bus in ecu_addrs:
msg = f" 0x{hex(addr)}"
if subaddr is not None:
msg += f" (sub-address: 0x{hex(subaddr)})"
print(msg)