esim: implement download profile (#37806)
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
import atexit
|
||||
import base64
|
||||
import fcntl
|
||||
import hashlib
|
||||
import math
|
||||
import os
|
||||
import requests
|
||||
@@ -43,15 +44,24 @@ DEBUG = os.environ.get("DEBUG") == "1"
|
||||
# TLV Tags
|
||||
TAG_ICCID = 0x5A
|
||||
TAG_STATUS = 0x80
|
||||
TAG_PROFILE_INFO_LIST = 0xBF2D
|
||||
TAG_SET_NICKNAME = 0xBF29
|
||||
TAG_ENABLE_PROFILE = 0xBF31
|
||||
TAG_DELETE_PROFILE = 0xBF33
|
||||
TAG_EUICC_INFO = 0xBF20
|
||||
TAG_PREPARE_DOWNLOAD = 0xBF21
|
||||
TAG_BPP_COMMAND = 0xBF23
|
||||
TAG_PROFILE_METADATA = 0xBF25
|
||||
TAG_INSTALL_RESULT_DATA = 0xBF27
|
||||
TAG_LIST_NOTIFICATION = 0xBF28
|
||||
TAG_SET_NICKNAME = 0xBF29
|
||||
TAG_RETRIEVE_NOTIFICATION = 0xBF2B
|
||||
TAG_PROFILE_INFO_LIST = 0xBF2D
|
||||
TAG_EUICC_CHALLENGE = 0xBF2E
|
||||
TAG_NOTIFICATION_METADATA = 0xBF2F
|
||||
TAG_NOTIFICATION_SENT = 0xBF30
|
||||
TAG_ENABLE_PROFILE = 0xBF31
|
||||
TAG_DELETE_PROFILE = 0xBF33
|
||||
TAG_BPP = 0xBF36
|
||||
TAG_PROFILE_INSTALL_RESULT = 0xBF37
|
||||
TAG_AUTH_SERVER = 0xBF38
|
||||
TAG_CANCEL_SESSION = 0xBF41
|
||||
TAG_OK = 0xA0
|
||||
|
||||
PROFILE_OK = 0x00
|
||||
@@ -63,6 +73,29 @@ PROFILE_ERROR_CODES = {
|
||||
0x03: "disallowedByPolicy", 0x04: "wrongProfileReenabling",
|
||||
PROFILE_CAT_BUSY: "catBusy", 0x06: "undefinedError",
|
||||
}
|
||||
AUTH_SERVER_ERROR_CODES = {
|
||||
0x01: "eUICCVerificationFailed", 0x02: "eUICCCertificateExpired",
|
||||
0x03: "eUICCCertificateRevoked", 0x05: "invalidServerSignature",
|
||||
0x06: "euiccCiPKUnknown", 0x0A: "matchingIdRefused",
|
||||
0x10: "insufficientMemory",
|
||||
}
|
||||
BPP_COMMAND_NAMES = {
|
||||
0: "initialiseSecureChannel", 1: "configureISDP", 2: "storeMetadata",
|
||||
3: "storeMetadata2", 4: "replaceSessionKeys", 5: "loadProfileElements",
|
||||
}
|
||||
BPP_ERROR_REASONS = {
|
||||
1: "incorrectInputValues", 2: "invalidSignature", 3: "invalidTransactionId",
|
||||
4: "unsupportedCrtValues", 5: "unsupportedRemoteOperationType",
|
||||
6: "unsupportedProfileClass", 7: "scp03tStructureError", 8: "scp03tSecurityError",
|
||||
9: "iccidAlreadyExistsOnEuicc", 10: "insufficientMemoryForProfile",
|
||||
11: "installInterrupted", 12: "peProcessingError", 13: "dataMismatch",
|
||||
14: "invalidNAA",
|
||||
}
|
||||
BPP_ERROR_MESSAGES = {
|
||||
9: "This eSIM profile is already installed on this device.",
|
||||
10: "Not enough memory on the eUICC to install this profile.",
|
||||
12: "Profile installation failed. The QR code may have already been used.",
|
||||
}
|
||||
|
||||
# SGP.22 §5.2.6 — SM-DP+ reason/subject codes mapped to user-friendly messages
|
||||
ES9P_ERROR_MESSAGES: dict[tuple[str, str], str] = {
|
||||
@@ -459,6 +492,222 @@ def process_notifications(client: AtClient) -> None:
|
||||
print(f"notification {seq_number} failed: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
# --- Authentication & Download ---
|
||||
|
||||
def get_challenge_and_info(client: AtClient) -> tuple[bytes, bytes]:
|
||||
challenge_resp = es10x_command(client, encode_tlv(TAG_EUICC_CHALLENGE, b""))
|
||||
challenge = require_tag(require_tag(challenge_resp, TAG_EUICC_CHALLENGE, "GetEuiccDataResponse"),
|
||||
TAG_STATUS, "challenge in response")
|
||||
info_resp = es10x_command(client, encode_tlv(TAG_EUICC_INFO, b""))
|
||||
require_tag(info_resp, TAG_EUICC_INFO, "GetEuiccInfo1Response")
|
||||
return challenge, info_resp
|
||||
|
||||
|
||||
def authenticate_server(client: AtClient, b64_signed1: str, b64_sig1: str, b64_pk_id: str, b64_cert: str, matching_id: str) -> str:
|
||||
tac = bytes([0x35, 0x29, 0x06, 0x11])
|
||||
device_info = encode_tlv(TAG_STATUS, tac) + encode_tlv(0xA1, b"")
|
||||
ctx_inner = encode_tlv(TAG_STATUS, matching_id.encode("utf-8")) + encode_tlv(0xA1, device_info)
|
||||
content = b64d(b64_signed1) + b64d(b64_sig1) + b64d(b64_pk_id) + b64d(b64_cert) + encode_tlv(0xA0, ctx_inner)
|
||||
response = es10x_command(client, encode_tlv(TAG_AUTH_SERVER, content))
|
||||
root = require_tag(response, TAG_AUTH_SERVER, "AuthenticateServerResponse")
|
||||
error_tag = find_tag(root, 0xA1)
|
||||
if error_tag is not None:
|
||||
code = int.from_bytes(error_tag, "big") if error_tag else 0
|
||||
raise RuntimeError(f"AuthenticateServer rejected by eUICC: {AUTH_SERVER_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})")
|
||||
return b64e(response)
|
||||
|
||||
|
||||
def prepare_download(client: AtClient, b64_signed2: str, b64_sig2: str, b64_cert: str, cc: str | None = None) -> str:
|
||||
smdp_signed2 = b64d(b64_signed2)
|
||||
smdp_signature2 = b64d(b64_sig2)
|
||||
smdp_certificate = b64d(b64_cert)
|
||||
smdp_signed2_root = find_tag(smdp_signed2, 0x30)
|
||||
if smdp_signed2_root is None:
|
||||
raise RuntimeError("Invalid smdpSigned2")
|
||||
transaction_id = find_tag(smdp_signed2_root, TAG_STATUS)
|
||||
cc_required_flag = find_tag(smdp_signed2_root, 0x01)
|
||||
if transaction_id is None or cc_required_flag is None:
|
||||
raise RuntimeError("Invalid smdpSigned2")
|
||||
content = smdp_signed2 + smdp_signature2
|
||||
if int.from_bytes(cc_required_flag, "big") != 0:
|
||||
if not cc:
|
||||
raise RuntimeError("Confirmation code required but not provided")
|
||||
content += encode_tlv(0x04, hashlib.sha256(hashlib.sha256(cc.encode("utf-8")).digest() + transaction_id).digest())
|
||||
content += smdp_certificate
|
||||
response = es10x_command(client, encode_tlv(TAG_PREPARE_DOWNLOAD, content))
|
||||
require_tag(response, TAG_PREPARE_DOWNLOAD, "PrepareDownloadResponse")
|
||||
return b64e(response)
|
||||
|
||||
|
||||
def _parse_tlv_header_len(data: bytes) -> int:
|
||||
tag_len = 2 if data[0] & 0x1F == 0x1F else 1
|
||||
length_byte = data[tag_len]
|
||||
return tag_len + (1 + (length_byte & 0x7F) if length_byte & 0x80 else 1)
|
||||
|
||||
|
||||
def _split_bpp(bpp: bytes) -> list[bytes]:
|
||||
"""Split a BoundProfilePackage into APDU chunks per SGP.22 §5.7.6."""
|
||||
root_value = None
|
||||
for tag, value, start, end in iter_tlv(bpp, with_positions=True):
|
||||
if tag == TAG_BPP:
|
||||
root_value = value
|
||||
val_start = start + _parse_tlv_header_len(bpp[start:end])
|
||||
break
|
||||
if root_value is None:
|
||||
raise RuntimeError("Invalid BoundProfilePackage")
|
||||
|
||||
chunks: list[bytes] = []
|
||||
for tag, value, start, end in iter_tlv(root_value, with_positions=True):
|
||||
if tag == TAG_BPP_COMMAND:
|
||||
chunks.append(bpp[0 : val_start + end])
|
||||
elif tag in (0xA0, 0xA2):
|
||||
chunks.append(bpp[val_start + start : val_start + end])
|
||||
elif tag in (0xA1, 0xA3):
|
||||
hdr_len = _parse_tlv_header_len(root_value[start:end])
|
||||
chunks.append(bpp[val_start + start : val_start + start + hdr_len])
|
||||
for _, _, cs, ce in iter_tlv(value, with_positions=True):
|
||||
chunks.append(value[cs:ce])
|
||||
return chunks
|
||||
|
||||
|
||||
def _parse_install_result(response: bytes) -> dict[str, Any] | None:
|
||||
"""Parse a ProfileInstallResult from an APDU response, or None if not present."""
|
||||
root = find_tag(response, TAG_PROFILE_INSTALL_RESULT)
|
||||
if not root:
|
||||
return None
|
||||
result_data = find_tag(root, TAG_INSTALL_RESULT_DATA)
|
||||
if not result_data:
|
||||
return None
|
||||
result: dict[str, Any] = {"seqNumber": 0, "success": False, "bppCommandId": None, "errorReason": None}
|
||||
notif_meta = find_tag(result_data, TAG_NOTIFICATION_METADATA)
|
||||
if notif_meta:
|
||||
seq_num = find_tag(notif_meta, TAG_STATUS)
|
||||
if seq_num:
|
||||
result["seqNumber"] = int.from_bytes(seq_num, "big")
|
||||
final_result = find_tag(result_data, 0xA2)
|
||||
if final_result:
|
||||
for tag, value in iter_tlv(final_result):
|
||||
if tag == 0xA0:
|
||||
result["success"] = True
|
||||
elif tag == 0xA1:
|
||||
bpp_cmd = find_tag(value, TAG_STATUS)
|
||||
if bpp_cmd:
|
||||
result["bppCommandId"] = int.from_bytes(bpp_cmd, "big")
|
||||
err = find_tag(value, 0x81)
|
||||
if err:
|
||||
result["errorReason"] = int.from_bytes(err, "big")
|
||||
return result
|
||||
|
||||
|
||||
def load_bpp(client: AtClient, b64_bpp: str) -> dict:
|
||||
bpp = b64d(b64_bpp)
|
||||
result = None
|
||||
for chunk in _split_bpp(bpp):
|
||||
response = es10x_command(client, chunk)
|
||||
if response:
|
||||
result = _parse_install_result(response) or result
|
||||
|
||||
if result is None:
|
||||
raise RuntimeError("Profile installation failed: no result from eUICC")
|
||||
if not result["success"] and result["errorReason"] is not None:
|
||||
msg = BPP_ERROR_MESSAGES.get(result["errorReason"])
|
||||
if not msg:
|
||||
cmd_name = BPP_COMMAND_NAMES.get(result["bppCommandId"], f"unknown({result['bppCommandId']})")
|
||||
err_name = BPP_ERROR_REASONS.get(result["errorReason"], f"unknown({result['errorReason']})")
|
||||
msg = f"Profile installation failed at {cmd_name}: {err_name}"
|
||||
raise RuntimeError(msg)
|
||||
if not result["success"]:
|
||||
raise RuntimeError("Profile installation failed: no result from eUICC")
|
||||
return result
|
||||
|
||||
|
||||
def parse_metadata(b64_metadata: str) -> dict:
|
||||
root = find_tag(b64d(b64_metadata), TAG_PROFILE_METADATA)
|
||||
if root is None:
|
||||
raise RuntimeError("Invalid profileMetadata")
|
||||
return decode_struct(root, PROFILE)
|
||||
|
||||
|
||||
def cancel_session(client: AtClient, transaction_id: bytes, reason: int = 127) -> str:
|
||||
content = encode_tlv(0x80, transaction_id) + encode_tlv(0x81, bytes([reason]))
|
||||
response = es10x_command(client, encode_tlv(TAG_CANCEL_SESSION, content))
|
||||
return b64e(response)
|
||||
|
||||
|
||||
def parse_lpa_activation_code(activation_code: str) -> tuple[str, str]:
|
||||
"""Parse 'LPA:1$smdp.example.com$MATCHING-ID' into (smdp_address, matching_id)."""
|
||||
if not activation_code.startswith("LPA:"):
|
||||
raise ValueError("Invalid activation code format")
|
||||
parts = activation_code[4:].split("$")
|
||||
if len(parts) != 3:
|
||||
raise ValueError("Invalid activation code format")
|
||||
return parts[1], parts[2]
|
||||
|
||||
|
||||
def _b64_field(data: dict, key: str) -> str:
|
||||
return base64_trim(data[key])
|
||||
|
||||
|
||||
def _cancel_session_safe(client: AtClient, smdp: str, tx_id: str, session: requests.Session) -> None:
|
||||
b64_cancel = ""
|
||||
try:
|
||||
b64_cancel = cancel_session(client, b64d(tx_id))
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
es9p_request(smdp, "cancelSession", {"transactionId": tx_id, "cancelSessionResponse": b64_cancel}, "CancelSession", session=session)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def download_profile(client: AtClient, activation_code: str) -> str:
|
||||
"""Download and install an eSIM profile. Returns the ICCID of the installed profile."""
|
||||
if not system_time_valid():
|
||||
raise RuntimeError("System time is not set; TLS certificate validation requires a valid clock")
|
||||
smdp, matching_id = parse_lpa_activation_code(activation_code)
|
||||
challenge, euicc_info = get_challenge_and_info(client)
|
||||
session = requests.Session()
|
||||
tx_id = None
|
||||
|
||||
try:
|
||||
# step 1: initiate authentication
|
||||
auth = es9p_request(smdp, "initiateAuthentication", {
|
||||
"smdpAddress": smdp, "euiccChallenge": b64e(challenge),
|
||||
"euiccInfo1": b64e(euicc_info), "matchingId": matching_id,
|
||||
}, "Authentication", session=session)
|
||||
tx_id = _b64_field(auth, "transactionId")
|
||||
|
||||
# step 2: authenticate server
|
||||
b64_auth = authenticate_server(client,
|
||||
_b64_field(auth, "serverSigned1"), _b64_field(auth, "serverSignature1"),
|
||||
_b64_field(auth, "euiccCiPKIdToBeUsed"), _b64_field(auth, "serverCertificate"),
|
||||
matching_id)
|
||||
|
||||
# step 3: authenticate client + get metadata
|
||||
cli = es9p_request(smdp, "authenticateClient", {
|
||||
"transactionId": tx_id, "authenticateServerResponse": b64_auth,
|
||||
}, "Authentication", session=session)
|
||||
iccid = parse_metadata(_b64_field(cli, "profileMetadata"))["iccid"]
|
||||
|
||||
# step 4: prepare download
|
||||
b64_prep = prepare_download(client,
|
||||
_b64_field(cli, "smdpSigned2"), _b64_field(cli, "smdpSignature2"),
|
||||
_b64_field(cli, "smdpCertificate"))
|
||||
|
||||
# step 5: get and install bound profile package
|
||||
bpp = es9p_request(smdp, "getBoundProfilePackage", {
|
||||
"transactionId": tx_id, "prepareDownloadResponse": b64_prep,
|
||||
}, "GetBoundProfilePackage", session=session)
|
||||
load_bpp(client, _b64_field(bpp, "boundProfilePackage"))
|
||||
return iccid
|
||||
except Exception:
|
||||
if tx_id:
|
||||
_cancel_session_safe(client, smdp, tx_id, session)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
class TiciLPA(LPABase):
|
||||
def __init__(self):
|
||||
if hasattr(self, '_client'):
|
||||
@@ -515,7 +764,10 @@ class TiciLPA(LPABase):
|
||||
raise LPAError(f"DeleteProfile failed: {PROFILE_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})")
|
||||
|
||||
def download_profile(self, qr: str, nickname: str | None = None) -> None:
|
||||
return None
|
||||
with self._acquire_channel():
|
||||
iccid = download_profile(self._client, qr)
|
||||
if nickname and iccid:
|
||||
set_profile_nickname(self._client, iccid, nickname)
|
||||
|
||||
def nickname_profile(self, iccid: str, nickname: str) -> None:
|
||||
with self._acquire_channel():
|
||||
|
||||
Reference in New Issue
Block a user