mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-28 01:52:06 +08:00
Ruff: b905 (strict zip) (#29336)
* added mutable default args * most of the Bs * add comment about lrucache * b905 old-commit-hash: e4ead4f1830618f5f61978491930cb0b8d36ab26
This commit is contained in:
+1
-1
@@ -198,7 +198,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
# https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml
|
||||
[tool.ruff]
|
||||
select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A", "B"]
|
||||
ignore = ["W292", "E741", "E402", "C408", "ISC003", "B027", "B024", "B905"]
|
||||
ignore = ["W292", "E741", "E402", "C408", "ISC003", "B027", "B024"]
|
||||
line-length = 160
|
||||
target-version="py311"
|
||||
exclude = [
|
||||
|
||||
@@ -25,7 +25,7 @@ def _create_radar_can_parser(car_fingerprint):
|
||||
messages = list(zip(RADAR_MSGS_C +
|
||||
RADAR_MSGS_D,
|
||||
[20] * msg_n + # 20Hz (0.05s)
|
||||
[20] * msg_n)) # 20Hz (0.05s)
|
||||
[20] * msg_n, strict=True)) # 20Hz (0.05s)
|
||||
|
||||
return CANParser(DBC[car_fingerprint]['radar'], messages, 1)
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ DELPHI_MRR_RADAR_MSG_COUNT = 64
|
||||
|
||||
def _create_delphi_esr_radar_can_parser(CP) -> CANParser:
|
||||
msg_n = len(DELPHI_ESR_RADAR_MSGS)
|
||||
messages = list(zip(DELPHI_ESR_RADAR_MSGS, [20] * msg_n))
|
||||
messages = list(zip(DELPHI_ESR_RADAR_MSGS, [20] * msg_n, strict=True))
|
||||
|
||||
return CANParser(RADAR.DELPHI_ESR, messages, CanBus(CP).radar)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ def create_radar_can_parser(car_fingerprint):
|
||||
['TrkRange'] * NUM_SLOTS + ['TrkRangeRate'] * NUM_SLOTS +
|
||||
['TrkRangeAccel'] * NUM_SLOTS + ['TrkAzimuth'] * NUM_SLOTS +
|
||||
['TrkWidth'] * NUM_SLOTS + ['TrkObjectID'] * NUM_SLOTS,
|
||||
[RADAR_HEADER_MSG] * 7 + radar_targets * 6))
|
||||
[RADAR_HEADER_MSG] * 7 + radar_targets * 6, strict=True))
|
||||
|
||||
messages = list({(s[1], 14) for s in signals})
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ def _create_radar_can_parser(car_fingerprint):
|
||||
|
||||
msg_a_n = len(RADAR_A_MSGS)
|
||||
msg_b_n = len(RADAR_B_MSGS)
|
||||
messages = list(zip(RADAR_A_MSGS + RADAR_B_MSGS, [20] * (msg_a_n + msg_b_n)))
|
||||
messages = list(zip(RADAR_A_MSGS + RADAR_B_MSGS, [20] * (msg_a_n + msg_b_n), strict=True))
|
||||
|
||||
return CANParser(DBC[car_fingerprint]['radar'], messages, 1)
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ if __name__ == "__main__":
|
||||
logs = [args.route[0]]
|
||||
else:
|
||||
r = Route(args.route[0])
|
||||
logs = [q_log if r_log is None else r_log for (q_log, r_log) in zip(r.qlog_paths(), r.log_paths())]
|
||||
logs = [q_log if r_log is None else r_log for (q_log, r_log) in zip(r.qlog_paths(), r.log_paths(), strict=True)]
|
||||
|
||||
if len(args.route) == 2 and logs:
|
||||
n = int(args.route[1])
|
||||
|
||||
@@ -27,7 +27,7 @@ def make_pie(msgs, typ):
|
||||
sizes_large = [(k, sz) for (k, sz) in sizes if sz >= total * MIN_SIZE / 100]
|
||||
sizes_large += [('other', sum(sz for (_, sz) in sizes if sz < total * MIN_SIZE / 100))]
|
||||
|
||||
labels, sizes = zip(*sizes_large)
|
||||
labels, sizes = zip(*sizes_large, strict=True)
|
||||
|
||||
plt.figure()
|
||||
plt.title(f"{typ}")
|
||||
|
||||
@@ -191,9 +191,9 @@ class LocKalman():
|
||||
|
||||
# Observation matrix modifier
|
||||
H_mod_sym = sp.Matrix(np.zeros((dim_state, dim_state_err)))
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs):
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs, strict=True):
|
||||
H_mod_sym[p_idx[0]:p_idx[1], p_err_idx[0]:p_err_idx[1]] = np.eye(p_idx[1] - p_idx[0])
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs):
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs, strict=True):
|
||||
H_mod_sym[q_idx[0]:q_idx[1], q_err_idx[0]:q_err_idx[1]] = 0.5 * quat_matrix_r(state[q_idx[0]:q_idx[1]])[:, 1:]
|
||||
|
||||
# these error functions are defined so that say there
|
||||
@@ -205,17 +205,17 @@ class LocKalman():
|
||||
delta_x = sp.MatrixSymbol('delta_x', dim_state_err, 1)
|
||||
|
||||
err_function_sym = sp.Matrix(np.zeros((dim_state, 1)))
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs):
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs, strict=True):
|
||||
delta_quat = sp.Matrix(np.ones(4))
|
||||
delta_quat[1:, :] = sp.Matrix(0.5 * delta_x[q_err_idx[0]: q_err_idx[1], :])
|
||||
err_function_sym[q_idx[0]:q_idx[1], 0] = quat_matrix_r(nom_x[q_idx[0]:q_idx[1], 0]) * delta_quat
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs):
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs, strict=True):
|
||||
err_function_sym[p_idx[0]:p_idx[1], :] = sp.Matrix(nom_x[p_idx[0]:p_idx[1], :] + delta_x[p_err_idx[0]:p_err_idx[1], :])
|
||||
|
||||
inv_err_function_sym = sp.Matrix(np.zeros((dim_state_err, 1)))
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs):
|
||||
for p_idx, p_err_idx in zip(p_idxs, p_err_idxs, strict=True):
|
||||
inv_err_function_sym[p_err_idx[0]:p_err_idx[1], 0] = sp.Matrix(-nom_x[p_idx[0]:p_idx[1], 0] + true_x[p_idx[0]:p_idx[1], 0])
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs):
|
||||
for q_idx, q_err_idx in zip(q_idxs, q_err_idxs, strict=True):
|
||||
delta_quat = quat_matrix_r(nom_x[q_idx[0]:q_idx[1], 0]).T * true_x[q_idx[0]:q_idx[1], 0]
|
||||
inv_err_function_sym[q_err_idx[0]:q_err_idx[1], 0] = sp.Matrix(2 * delta_quat[1:])
|
||||
|
||||
|
||||
@@ -173,7 +173,7 @@ class TestLaikad(unittest.TestCase):
|
||||
self.assertTrue(kf_valid)
|
||||
|
||||
def test_laika_online_nav_only(self):
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs]):
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
|
||||
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV, use_qcom=use_qcom)
|
||||
# Disable fetch_orbits to test NAV only
|
||||
correct_msgs = verify_messages(logs, laikad)
|
||||
@@ -215,7 +215,7 @@ class TestLaikad(unittest.TestCase):
|
||||
|
||||
def test_get_navs_in_process(self):
|
||||
for auto_fetch_navs in [True, False]:
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs]):
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
|
||||
laikad = Laikad(auto_update=False, use_qcom=use_qcom, auto_fetch_navs=auto_fetch_navs)
|
||||
has_navs = False
|
||||
has_fix = False
|
||||
@@ -248,7 +248,7 @@ class TestLaikad(unittest.TestCase):
|
||||
|
||||
def test_cache(self):
|
||||
use_qcom = True
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs]):
|
||||
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
|
||||
Params().remove(EPHEMERIS_CACHE)
|
||||
laikad = Laikad(auto_update=True, save_ephemeris=True, use_qcom=use_qcom)
|
||||
def wait_for_cache():
|
||||
|
||||
@@ -63,7 +63,7 @@ class PointBuckets:
|
||||
def __init__(self, x_bounds, min_points, min_points_total):
|
||||
self.x_bounds = x_bounds
|
||||
self.buckets = {bounds: NPQueue(maxlen=POINTS_PER_BUCKET, rowsize=3) for bounds in x_bounds}
|
||||
self.buckets_min_points = dict(zip(x_bounds, min_points))
|
||||
self.buckets_min_points = dict(zip(x_bounds, min_points, strict=True))
|
||||
self.min_points_total = min_points_total
|
||||
|
||||
def bucket_lengths(self):
|
||||
@@ -73,7 +73,8 @@ class PointBuckets:
|
||||
return sum(self.bucket_lengths())
|
||||
|
||||
def is_valid(self):
|
||||
return all(len(v) >= min_pts for v, min_pts in zip(self.buckets.values(), self.buckets_min_points.values())) and (self.__len__() >= self.min_points_total)
|
||||
return all(len(v) >= min_pts for v, min_pts in zip(self.buckets.values(), self.buckets_min_points.values(), strict=True)) \
|
||||
and (self.__len__() >= self.min_points_total)
|
||||
|
||||
def add_point(self, x, y):
|
||||
for bound_min, bound_max in self.x_bounds:
|
||||
|
||||
@@ -36,16 +36,16 @@ def run_loop(m, tf8_input=False):
|
||||
|
||||
# run once to initialize CUDA provider
|
||||
if "CUDAExecutionProvider" in m.get_providers():
|
||||
m.run(None, dict(zip(keys, [np.zeros(shp, dtype=itp) for shp, itp in zip(ishapes, itypes)])))
|
||||
m.run(None, dict(zip(keys, [np.zeros(shp, dtype=itp) for shp, itp in zip(ishapes, itypes, strict=True)], strict=True)))
|
||||
|
||||
print("ready to run onnx model", keys, ishapes, file=sys.stderr)
|
||||
while 1:
|
||||
inputs = []
|
||||
for k, shp, itp in zip(keys, ishapes, itypes):
|
||||
for k, shp, itp in zip(keys, ishapes, itypes, strict=True):
|
||||
ts = np.product(shp)
|
||||
#print("reshaping %s with offset %d" % (str(shp), offset), file=sys.stderr)
|
||||
inputs.append(read(ts, (k=='input_img' and tf8_input)).reshape(shp).astype(itp))
|
||||
ret = m.run(None, dict(zip(keys, inputs)))
|
||||
ret = m.run(None, dict(zip(keys, inputs, strict=True)))
|
||||
#print(ret, file=sys.stderr)
|
||||
for r in ret:
|
||||
write(r.astype(np.float32))
|
||||
|
||||
@@ -70,7 +70,7 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
|
||||
raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}\n\t\t{cnt1}\n\t\t{cnt2}")
|
||||
|
||||
diff = []
|
||||
for msg1, msg2 in zip(log1, log2):
|
||||
for msg1, msg2 in zip(log1, log2, strict=True):
|
||||
if msg1.which() != msg2.which():
|
||||
raise Exception("msgs not aligned between logs")
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ if __name__ == "__main__":
|
||||
failed = True
|
||||
diff += 'amount of frames not equal\n'
|
||||
|
||||
for i, (frame, cmp_frame) in enumerate(zip(frames, cmp_frames)):
|
||||
for i, (frame, cmp_frame) in enumerate(zip(frames, cmp_frames, strict=True)):
|
||||
for j in range(3):
|
||||
fr = frame[j]
|
||||
cmp_f = cmp_frame[j]
|
||||
@@ -159,7 +159,7 @@ if __name__ == "__main__":
|
||||
diff += f'different at a large amount of pixels ({diff_len})\n'
|
||||
else:
|
||||
diff += 'different at (frame, yuv, pixel, ref, HEAD):\n'
|
||||
for k in zip(*np.nonzero(frame_diff)):
|
||||
for k in zip(*np.nonzero(frame_diff), strict=True):
|
||||
diff += f'{i}, {yuv_i[j]}, {k}, {cmp_f[k]}, {fr[k]}\n'
|
||||
|
||||
if failed:
|
||||
|
||||
@@ -52,7 +52,7 @@ def sync_to_ci_public(route):
|
||||
return True
|
||||
|
||||
print(f"Uploading {route}")
|
||||
for (source_account, source_bucket), source_key in zip(SOURCES, source_keys):
|
||||
for (source_account, source_bucket), source_key in zip(SOURCES, source_keys, strict=True):
|
||||
print(f"Trying {source_account}/{source_bucket}")
|
||||
cmd = [
|
||||
"azcopy",
|
||||
|
||||
@@ -54,7 +54,7 @@ def get_tombstones():
|
||||
with os.scandir(folder) as d:
|
||||
|
||||
# Loop over first 1000 directory entries
|
||||
for _, f in zip(range(1000), d):
|
||||
for _, f in zip(range(1000), d, strict=False):
|
||||
if f.name.startswith("tombstone"):
|
||||
files.append((f.path, int(f.stat().st_ctime)))
|
||||
elif f.name.endswith(".crash") and f.stat().st_mode == 0o100640:
|
||||
|
||||
@@ -40,7 +40,7 @@ if __name__ == "__main__":
|
||||
# Get content-length for each chunk
|
||||
with multiprocessing.Pool() as pool:
|
||||
szs = list(tqdm(pool.imap(get_chunk_download_size, to), total=len(to)))
|
||||
chunk_sizes = {t.sha: sz for (t, sz) in zip(to, szs)}
|
||||
chunk_sizes = {t.sha: sz for (t, sz) in zip(to, szs, strict=True)}
|
||||
|
||||
sources: Dict[str, List[int]] = {
|
||||
'seed': [],
|
||||
|
||||
@@ -161,7 +161,7 @@ def init_plots(arr, name_to_arr_idx, plot_xlims, plot_ylims, plot_names, plot_co
|
||||
idxs.append(name_to_arr_idx[item])
|
||||
plot_select.append(i)
|
||||
axs[i].set_title(", ".join(f"{nm} ({cl})"
|
||||
for (nm, cl) in zip(pl_list, plot_colors[i])), fontsize=10)
|
||||
for (nm, cl) in zip(pl_list, plot_colors[i], strict=True)), fontsize=10)
|
||||
axs[i].tick_params(axis="x", colors="white")
|
||||
axs[i].tick_params(axis="y", colors="white")
|
||||
axs[i].title.set_color("white")
|
||||
@@ -205,11 +205,11 @@ def plot_model(m, img, calibration, top_down):
|
||||
px, py_bottom = to_topdown_pt(x - x_std, y)
|
||||
top_down[1][int(round(px - 4)):int(round(px + 4)), py_top:py_bottom] = find_color(top_down[0], YELLOW)
|
||||
|
||||
for path, prob, _ in zip(m.laneLines, m.laneLineProbs, m.laneLineStds):
|
||||
for path, prob, _ in zip(m.laneLines, m.laneLineProbs, m.laneLineStds, strict=True):
|
||||
color = (0, int(255 * prob), 0)
|
||||
draw_path(path, color, img, calibration, top_down, YELLOW)
|
||||
|
||||
for edge, std in zip(m.roadEdges, m.roadEdgeStds):
|
||||
for edge, std in zip(m.roadEdges, m.roadEdgeStds, strict=True):
|
||||
prob = max(1 - std, 0)
|
||||
color = (int(255 * prob), 0, 0)
|
||||
draw_path(edge, color, img, calibration, top_down, RED)
|
||||
|
||||
Reference in New Issue
Block a user