mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-06-08 10:54:55 +08:00
examples: Make can_bit_transition.py support can_logger.py (#1198)
* Add Time field to can_logger.py
* Update can_bit_transition to support can_logger format
* Fix csv opening
Using `rb` to open csv file will throw the exception:
_csv.Error: iterator should return strings,
not bytes (the file should be opened in text mode)
Fix it with `r` and `newline=''`
This commit is contained in:
@@ -2,6 +2,22 @@
|
||||
import csv
|
||||
import sys
|
||||
|
||||
CSV_KEYS = {
|
||||
"logger": {
|
||||
"time": "Time",
|
||||
"message_id": "MessageID",
|
||||
"data": "Message",
|
||||
"bus": "Bus"
|
||||
},
|
||||
"cabana": {
|
||||
"time": "time",
|
||||
"message_id": "addr",
|
||||
"data": "data",
|
||||
"bus": "bus"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Message():
|
||||
"""Details about a specific message ID."""
|
||||
|
||||
@@ -29,27 +45,34 @@ class Info():
|
||||
|
||||
def load(self, filename, start, end):
|
||||
"""Given a CSV file, adds information about message IDs and their values."""
|
||||
with open(filename, 'rb') as inp:
|
||||
reader = csv.reader(inp)
|
||||
next(reader, None) # skip the CSV header
|
||||
with open(filename, newline='') as inp:
|
||||
reader = csv.DictReader(inp)
|
||||
dtype = None
|
||||
for row in reader:
|
||||
if not len(row):
|
||||
continue
|
||||
time = float(row[0])
|
||||
bus = int(row[2])
|
||||
if dtype is None:
|
||||
dtype = "logger" if "Bus" in row else "cabana"
|
||||
|
||||
time = float(row[CSV_KEYS[dtype]["time"]])
|
||||
bus = int(row[CSV_KEYS[dtype]["bus"]])
|
||||
if time < start or bus > 127:
|
||||
continue
|
||||
elif time > end:
|
||||
break
|
||||
if row[1].startswith('0x'):
|
||||
message_id = row[1][2:] # remove leading '0x'
|
||||
|
||||
message_id = row[CSV_KEYS[dtype]["message_id"]]
|
||||
if message_id.startswith('0x'):
|
||||
message_id = message_id[2:] # remove leading '0x'
|
||||
else:
|
||||
message_id = hex(int(row[1]))[2:] # old message IDs are in decimal
|
||||
message_id = hex(int(message_id))[2:] # old message IDs are in decimal
|
||||
message_id = '%s:%s' % (bus, message_id)
|
||||
if row[3].startswith('0x'):
|
||||
data = row[3][2:] # remove leading '0x'
|
||||
|
||||
data = row[CSV_KEYS[dtype]["data"]]
|
||||
if data.startswith('0x'):
|
||||
data = data[2:] # remove leading '0x'
|
||||
else:
|
||||
data = row[3]
|
||||
data = data
|
||||
new_message = False
|
||||
if message_id not in self.messages:
|
||||
self.messages[message_id] = Message(message_id)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import csv
|
||||
import time
|
||||
from panda import Panda
|
||||
|
||||
def can_logger():
|
||||
@@ -10,18 +11,20 @@ def can_logger():
|
||||
outputfile = open('output.csv', 'w')
|
||||
csvwriter = csv.writer(outputfile)
|
||||
# Write Header
|
||||
csvwriter.writerow(['Bus', 'MessageID', 'Message', 'MessageLength'])
|
||||
csvwriter.writerow(['Bus', 'MessageID', 'Message', 'MessageLength', 'Time'])
|
||||
print("Writing csv file output.csv. Press Ctrl-C to exit...\n")
|
||||
|
||||
bus0_msg_cnt = 0
|
||||
bus1_msg_cnt = 0
|
||||
bus2_msg_cnt = 0
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
can_recv = p.can_recv()
|
||||
|
||||
for address, _, dat, src in can_recv:
|
||||
csvwriter.writerow([str(src), str(hex(address)), f"0x{dat.hex()}", len(dat)])
|
||||
csvwriter.writerow(
|
||||
[str(src), str(hex(address)), f"0x{dat.hex()}", len(dat), str(time.time() - start_time)])
|
||||
|
||||
if src == 0:
|
||||
bus0_msg_cnt += 1
|
||||
|
||||
Reference in New Issue
Block a user