From ddf8abc14a272d5e95ffd2569d1ddfd2195c56f5 Mon Sep 17 00:00:00 2001 From: Trey Moen <50057480+greatgitsby@users.noreply.github.com> Date: Sun, 22 Feb 2026 10:34:37 -0700 Subject: [PATCH] Revert "feat(lpa): `at` client + list profiles (#37271)" (#37322) This reverts commit 8bca2ca7588bae62a860b1e90a835e1678367596. --- system/hardware/tici/lpa.py | 229 +----------------------------------- 1 file changed, 2 insertions(+), 227 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 4d649fda8..9bd9d8c7b 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -1,237 +1,12 @@ -# SGP.22 v2.3: https://www.gsma.com/solutions-and-impact/technologies/esim/wp-content/uploads/2021/07/SGP.22-v2.3.pdf - -import atexit -import base64 -import os -import serial -import sys - -from collections.abc import Generator - from openpilot.system.hardware.base import LPABase, Profile -DEFAULT_DEVICE = "/dev/ttyUSB2" -DEFAULT_BAUD = 9600 -DEFAULT_TIMEOUT = 5.0 -# https://euicc-manual.osmocom.org/docs/lpa/applet-id/ -ISDR_AID = "A0000005591010FFFFFFFF8900000100" -ES10X_MSS = 120 -DEBUG = os.environ.get("DEBUG") == "1" - -# TLV Tags -TAG_ICCID = 0x5A -TAG_PROFILE_INFO_LIST = 0xBF2D - -STATE_LABELS = {0: "disabled", 1: "enabled", 255: "unknown"} -ICON_LABELS = {0: "jpeg", 1: "png", 255: "unknown"} -CLASS_LABELS = {0: "test", 1: "provisioning", 2: "operational", 255: "unknown"} - - -def b64e(data: bytes) -> str: - return base64.b64encode(data).decode("ascii") - - -class AtClient: - def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: - self.serial = serial.Serial(device, baudrate=baud, timeout=timeout) - self.debug = debug - self.channel: str | None = None - self.serial.reset_input_buffer() - - def close(self) -> None: - try: - if self.channel: - self.query(f"AT+CCHC={self.channel}") - self.channel = None - finally: - self.serial.close() - - def send(self, cmd: str) -> None: - if self.debug: - print(f">> {cmd}", file=sys.stderr) - self.serial.write((cmd + "\r").encode("ascii")) - - def expect(self) -> list[str]: - lines: list[str] = [] - while True: - raw = self.serial.readline() - if not raw: - raise TimeoutError("AT command timed out") - line = raw.decode(errors="ignore").strip() - if not line: - continue - if self.debug: - print(f"<< {line}", file=sys.stderr) - if line == "OK": - return lines - if line == "ERROR" or line.startswith("+CME ERROR"): - raise RuntimeError(f"AT command failed: {line}") - lines.append(line) - - def query(self, cmd: str) -> list[str]: - self.send(cmd) - return self.expect() - - def open_isdr(self) -> None: - # close any stale logical channel from a previous crashed session - try: - self.query("AT+CCHC=1") - except RuntimeError: - pass - for line in self.query(f'AT+CCHO="{ISDR_AID}"'): - if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()): - self.channel = ch - return - raise RuntimeError("Failed to open ISD-R application") - - def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]: - if not self.channel: - raise RuntimeError("Logical channel is not open") - hex_payload = apdu.hex().upper() - for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): - if line.startswith("+CGLA:"): - parts = line.split(":", 1)[1].split(",", 1) - if len(parts) == 2: - data = bytes.fromhex(parts[1].strip().strip('"')) - if len(data) >= 2: - return data[:-2], data[-2], data[-1] - raise RuntimeError("Missing +CGLA response") - - -# --- TLV utilities --- - -def iter_tlv(data: bytes, with_positions: bool = False) -> Generator: - idx, length = 0, len(data) - while idx < length: - start_pos = idx - tag = data[idx] - idx += 1 - if tag & 0x1F == 0x1F: # Multi-byte tag - tag_value = tag - while idx < length: - next_byte = data[idx] - idx += 1 - tag_value = (tag_value << 8) | next_byte - if not (next_byte & 0x80): - break - else: - tag_value = tag - if idx >= length: - break - size = data[idx] - idx += 1 - if size & 0x80: # Multi-byte length - num_bytes = size & 0x7F - if idx + num_bytes > length: - break - size = int.from_bytes(data[idx : idx + num_bytes], "big") - idx += num_bytes - if idx + size > length: - break - value = data[idx : idx + size] - idx += size - yield (tag_value, value, start_pos, idx) if with_positions else (tag_value, value) - - -def find_tag(data: bytes, target: int) -> bytes | None: - return next((v for t, v in iter_tlv(data) if t == target), None) - - -def tbcd_to_string(raw: bytes) -> str: - return "".join(str(n) for b in raw for n in (b & 0x0F, b >> 4) if n <= 9) - - -# Profile field decoders: TLV tag -> (field_name, decoder) -_PROFILE_FIELDS = { - TAG_ICCID: ("iccid", tbcd_to_string), - 0x4F: ("isdpAid", lambda v: v.hex().upper()), - 0x9F70: ("profileState", lambda v: STATE_LABELS.get(v[0], "unknown")), - 0x90: ("profileNickname", lambda v: v.decode("utf-8", errors="ignore") or None), - 0x91: ("serviceProviderName", lambda v: v.decode("utf-8", errors="ignore") or None), - 0x92: ("profileName", lambda v: v.decode("utf-8", errors="ignore") or None), - 0x93: ("iconType", lambda v: ICON_LABELS.get(v[0], "unknown")), - 0x94: ("icon", b64e), - 0x95: ("profileClass", lambda v: CLASS_LABELS.get(v[0], "unknown")), -} - - -def _decode_profile_fields(data: bytes) -> dict: - """Parse known profile metadata TLV fields into a dict.""" - result = {} - for tag, value in iter_tlv(data): - if (field := _PROFILE_FIELDS.get(tag)): - result[field[0]] = field[1](value) - return result - - -# --- ES10x command transport --- - -def es10x_command(client: AtClient, data: bytes) -> bytes: - response = bytearray() - sequence = 0 - offset = 0 - while offset < len(data): - chunk = data[offset : offset + ES10X_MSS] - offset += len(chunk) - is_last = offset == len(data) - apdu = bytes([0x80, 0xE2, 0x91 if is_last else 0x11, sequence & 0xFF, len(chunk)]) + chunk - segment, sw1, sw2 = client.send_apdu(apdu) - response.extend(segment) - while True: - if sw1 == 0x61: # More data available - segment, sw1, sw2 = client.send_apdu(bytes([0x80, 0xC0, 0x00, 0x00, sw2 or 0])) - response.extend(segment) - continue - if (sw1 & 0xF0) == 0x90: - break - raise RuntimeError(f"APDU failed with SW={sw1:02X}{sw2:02X}") - sequence += 1 - return bytes(response) - - -# --- Profile operations --- - -def decode_profiles(blob: bytes) -> list[dict]: - root = find_tag(blob, TAG_PROFILE_INFO_LIST) - if root is None: - raise RuntimeError("Missing ProfileInfoList") - list_ok = find_tag(root, 0xA0) - if list_ok is None: - return [] - defaults = {name: None for name, _ in _PROFILE_FIELDS.values()} - return [{**defaults, **_decode_profile_fields(value)} for tag, value in iter_tlv(list_ok) if tag == 0xE3] - - -def list_profiles(client: AtClient) -> list[dict]: - return decode_profiles(es10x_command(client, TAG_PROFILE_INFO_LIST.to_bytes(2, "big") + b"\x00")) - - class TiciLPA(LPABase): - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - def __init__(self): - if hasattr(self, '_client'): - return - self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT, debug=DEBUG) - self._client.open_isdr() - atexit.register(self._client.close) + pass def list_profiles(self) -> list[Profile]: - return [ - Profile( - iccid=p.get("iccid", ""), - nickname=p.get("profileNickname") or "", - enabled=p.get("profileState") == "enabled", - provider=p.get("serviceProviderName") or "", - ) - for p in list_profiles(self._client) - ] + return [] def get_active_profile(self) -> Profile | None: return None