TSK Manager v0.11.0

This commit is contained in:
Calvin Park
2025-11-15 18:11:00 -08:00
parent 8069b6611d
commit 209db8f320
44 changed files with 3768 additions and 0 deletions
View File
+53
View File
@@ -0,0 +1,53 @@
# tsk/common/env.py
import os
import time
def is_agnos():
return os.path.exists("/AGNOS")
COMMA_DATA_DIR = "/data" if is_agnos() else f"{os.path.expanduser('~')}/comma_data"
CONTINUE_FILE = f"{COMMA_DATA_DIR}/continue.sh"
OPENPILOT_DIR = f"{COMMA_DATA_DIR}/openpilot"
PAYLOAD_PATH = "/data/openpilot/tsk/common/payload.bin"
RECOMMENDED_OP_USER = "commaai"
RECOMMENDED_OP_BRANCH = "nightly-dev"
RECOMMENDED_OP_DIR = f"{COMMA_DATA_DIR}/tsk-recommended"
ALTERNATE_OP_USER = "sunnypilot"
ALTERNATE_OP_BRANCH = "staging"
ALTERNATE_OP_DIR = f"{COMMA_DATA_DIR}/tsk-alternate"
def is_calvins_comma() -> bool:
try:
with open("/persist/comma/dongle_id") as f:
content = f.read()
if "2decf199" in content or "eecdfcc" in content:
return True
except:
pass
return False
def is_cache_dir_new() -> bool:
try:
cache_dir = "/cache/params"
mod_time = os.path.getmtime(cache_dir)
age = time.time() - mod_time
day = 60 * 60 * 24
return age < day
except:
pass
return False
def is_in_car() -> bool:
return False
+315
View File
@@ -0,0 +1,315 @@
#!/usr/bin/env python3
import struct
import time
from subprocess import check_output, CalledProcessError
from Crypto.Cipher import AES
from tqdm import tqdm
from opendbc.car.isotp import isotp_send
from opendbc.car.structs import CarParams
from opendbc.car.uds import UdsClient, ACCESS_TYPE, SESSION_TYPE, DATA_IDENTIFIER_TYPE, SERVICE_TYPE, \
ROUTINE_CONTROL_TYPE, InvalidServiceIdError, MessageTimeoutError, NegativeResponseError
from panda import Panda
from tsk.common.env import is_agnos, PAYLOAD_PATH
class NotAGNOSError(Exception):
def __str__(self) -> str:
return "Can't run TSK Extractor outside of a comma device."
class BoarddNotRunningError(Exception):
pass
class RetryError(Exception):
def __init__(self, message: str):
self.message: str = message
def __str__(self) -> str:
return f"{self.message}\n\nTry again. If the problem persists, turn off the car, put it back into 'Not Ready to Drive' mode, and then try again."
def format_version_for_error_display(version1, version2=None, length=8):
version_str = ""
version1_str = str(version1)
if version1_str.startswith("b'"):
version1_str = version1_str[2:]
version_str = version1_str[:length]
if version2 and version1 != version2:
version2_str = str(version2)
if version2_str.startswith("b'"):
version2_str = version2_str[2:]
version_str += ", " + version2_str[:length]
return version_str
class TSKExtractor:
ADDR = 0x7a1
DEBUG = False
BUS = 0
SEED_KEY_SECRET = b'\xf0\x5f\x36\xb7\xd7\x8c\x03\xe2\x4a\xb4\xfa\xef\x2a\x57\xd0\x44'
# These are the key and IV used to encrypt the payload in build_payload.py
DID_201_KEY = b'\x00' * 16
DID_202_IV = b'\x00' * 16
# Confirmed working on the following versions
APPLICATION_VERSIONS = {
b'\x018965B4209000\x00\x00\x00\x00': b'\x01!!!!!!!!!!!!!!!!', # 2021 RAV4 Prime
b'\x018965B4233100\x00\x00\x00\x00': b'\x01!!!!!!!!!!!!!!!!', # 2023 RAV4 Prime
b'\x018965B4509100\x00\x00\x00\x00': b'\x01!!!!!!!!!!!!!!!!', # 2021 Sienna
}
KEY_STRUCT_SIZE = 0x20
CHECKSUM_OFFSET = 0x1d
SECOC_KEY_SIZE = 0x10
SECOC_KEY_OFFSET = 0x0c
@classmethod
def _get_key_struct(cls, data, key_no):
return data[key_no * cls.KEY_STRUCT_SIZE: (key_no + 1) * cls.KEY_STRUCT_SIZE]
@classmethod
def _verify_checksum(cls, key_struct):
checksum = sum(key_struct[:cls.CHECKSUM_OFFSET])
checksum = ~checksum & 0xff
return checksum == key_struct[cls.CHECKSUM_OFFSET]
@classmethod
def _get_secoc_key(cls, key_struct):
return key_struct[cls.SECOC_KEY_OFFSET:cls.SECOC_KEY_OFFSET + cls.SECOC_KEY_SIZE]
@classmethod
def hack(cls):
"""Initializes the ECU connection and checks if boardd is running."""
if not is_agnos():
raise NotAGNOSError
try:
check_output(["pidof", "boardd"])
# This shouldn't happen since we never started boardd
raise BoarddNotRunningError("boardd is running, kill openpilot and run again")
except CalledProcessError as e:
if e.returncode != 1: # 1 == no process found (boardd not running)
raise e
except FileNotFoundError:
pass
panda = Panda()
panda.flash() # no-op if firmware is already up to date; required because TSKM kills pandad before it can flash
panda.set_safety_mode(CarParams.SafetyModel.elm327)
uds_client = UdsClient(panda, cls.ADDR, cls.ADDR + 8, cls.BUS, timeout=0.1, response_pending_timeout=0.1)
print("Getting application versions...")
try:
app_version = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
print(f" - APPLICATION_SOFTWARE_IDENTIFICATION (application): {str(app_version)}")
except (AssertionError, InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError("Car not detected")
if app_version not in cls.APPLICATION_VERSIONS:
print(f"Unexpected application version (ignored): {str(app_version)}")
# Mandatory flow of diagnostic sessions
try:
uds_client.diagnostic_session_control(SESSION_TYPE.DEFAULT)
uds_client.diagnostic_session_control(SESSION_TYPE.EXTENDED_DIAGNOSTIC)
uds_client.diagnostic_session_control(SESSION_TYPE.PROGRAMMING)
uds_client.diagnostic_session_control(SESSION_TYPE.DEFAULT)
uds_client.diagnostic_session_control(SESSION_TYPE.EXTENDED_DIAGNOSTIC)
except (InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError("Car not in 'Not Ready To Drive' mode")
# Get bootloader version
try:
bl_version = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
except (AssertionError, InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError(f"Can't read bootloader version ({format_version_for_error_display(app_version)})")
print(f" - APPLICATION_SOFTWARE_IDENTIFICATION (bootloader) {str(bl_version)}")
try:
if bl_version != cls.APPLICATION_VERSIONS[app_version]:
print(f"Unexpected bootloader version (ignored): {str(bl_version)}")
except KeyError as e: # In case app_version is not found at all
print(f"Unexpected bootloader version (ignored): {str(e)}")
# Go back to programming session
try:
uds_client.diagnostic_session_control(SESSION_TYPE.PROGRAMMING)
except (InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError("Can't enter programming session for reading bootloader version")
# Security Access - Request Seed
try:
seed_payload = b"\x00" * 16
seed = uds_client.security_access(ACCESS_TYPE.REQUEST_SEED, data_record=seed_payload)
key = AES.new(cls.SEED_KEY_SECRET, AES.MODE_ECB).decrypt(seed_payload)
key = AES.new(key, AES.MODE_ECB).encrypt(seed)
print("\nSecurity Access...")
print(" - SEED:", seed.hex())
print(" - KEY:", key.hex())
# Security Access - Send Key
uds_client.security_access(ACCESS_TYPE.SEND_KEY, key)
print(" - Key OK!")
except (InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError("Security Access failed")
# Security Access - Send Key
print("\nPreparing to upload payload...")
try:
# Write something to DID 203, not sure why but needed for state machine
uds_client.write_data_by_identifier(0x203, b"\x00" * 5)
# Write KEY and IV to DID 201/202, prerequisite for request download
print(" - Write data by identifier 0x201", cls.DID_201_KEY.hex())
uds_client.write_data_by_identifier(0x201, cls.DID_201_KEY)
print(" - Write data by identifier 0x202", cls.DID_202_IV.hex())
uds_client.write_data_by_identifier(0x202, cls.DID_202_IV)
# Request download to RAM
data = b"\x01" # [1] Format
data += b"\x46" # [2] 4 size bytes, 6 address bytes
data += b"\x01" # [3] memoryIdentifier
data += b"\x00" # [4]
data += struct.pack('!I', 0xfebf0000) # [5] Address
data += struct.pack('!I', 0x1000) # [9] Size
print("\nUpload payload...")
print(" - Request download")
resp = uds_client._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, data=data)
# Upload payload
payload = open(PAYLOAD_PATH, "rb").read()
assert len(payload) == 0x1000
chunk_size = 0x400
for i in range(len(payload) // chunk_size):
print(f" - Transfer data {i}")
uds_client.transfer_data(i + 1, payload[i * chunk_size:(i + 1) * chunk_size])
uds_client.request_transfer_exit()
print("\nVerify payload...")
# Routine control 0x10f0
# [0] 0x31 (routine control)
# [1] 0x01 (start)
# [2] 0x10f0 (routine identifier)
# [4] 0x45 (format, 4 size bytes, 5 address bytes)
# [5] 0x0
# [6] mem addr
# [10] mem addr
data = b"\x45\x00"
data += struct.pack('!I', 0xfebf0000)
data += struct.pack('!I', 0x1000)
uds_client.routine_control(ROUTINE_CONTROL_TYPE.START, 0x10f0, data)
print(" - Routine control 0x10f0 OK!")
except (InvalidServiceIdError, MessageTimeoutError, NegativeResponseError):
raise RetryError("Payload upload failed")
print("\nTrigger payload...")
# Now we trigger the payload by trying to erase
# [0] 0x31 (routine control)
# [1] 0x01 (start)
# [2] 0xff00 (routine identifier)
# [4] 0x45 (format, 4 size bytes, 5 address bytes)
# [5] 0x0
# [6] mem addr
# [10] mem addr
data = b"\x45\x00"
data += struct.pack('!I', 0xe0000)
data += struct.pack('!I', 0x8000)
# Manually send so we don't get stuck waiting for the response
erase = b"\x31\x01\xff\x00" + data
isotp_send(panda, erase, cls.ADDR, bus=cls.BUS)
print("\nDumping keys...")
start = 0xfebe6e34
end = 0xfebe6ff4
start_time = time.time()
timeout = 30
extracted = b""
with open(f'data_{start:08x}_{end:08x}.bin', 'wb') as f:
with tqdm(total=end - start) as pbar:
while start < end:
current_time = time.time()
if current_time - start_time > timeout:
raise RetryError("Key dumping timed out")
for addr, *_, data, bus in panda.can_recv():
if bus != cls.BUS:
continue
if data == b"\x03\x7f\x31\x78\x00\x00\x00\x00": # Skip response pending
continue
if addr != cls.ADDR + 8:
continue
if cls.DEBUG:
print(f"{data.hex()}")
ptr = struct.unpack("<I", data[:4])[0]
assert (ptr >> 8) == start & 0xffffff # Check lower 24 bits of address
extracted += data[4:]
f.write(data[4:])
f.flush()
start += 4
pbar.update(4)
start_time = time.time()
key_1_ok = cls._verify_checksum(cls._get_key_struct(extracted, 1))
key_4_ok = cls._verify_checksum(cls._get_key_struct(extracted, 4))
if not key_1_ok or not key_4_ok:
raise RetryError(f"SecOC key checksum verification failed ({format_version_for_error_display(app_version, bl_version)})")
key_1 = cls._get_secoc_key(cls._get_key_struct(extracted, 1))
key_4 = cls._get_secoc_key(cls._get_key_struct(extracted, 4))
print("\nECU_MASTER_KEY ", key_1.hex())
print("SecOC Key (KEY_4)", key_4.hex())
return key_4.hex()
@classmethod
def run(cls):
try:
secoc_key = cls.hack()
except (BoarddNotRunningError, RetryError):
raise
except Exception as e:
e.add_note("\n\n!!!! Unexpected error. Please take a photo, post it on #toyota-security, and ping @calvinspark\n")
raise
print("SecOC key extracted successfully")
print("!!!! Take a photo of this screen")
return secoc_key
+137
View File
@@ -0,0 +1,137 @@
# tsk/common/key_file_manager.py
import os
import re
import threading
import time
from tsk.c3.ui.layout import Theme
from tsk.common.env import is_agnos
class KeyFileManager:
DATA_PARAMS_D_SECOCKEY_PATH = "/data/params/d/SecOCKey"
CACHE_PARAMS_SECOCKEY_PATH = "/cache/params/SecOCKey"
HOME_SECOCKEY_PATH = os.path.expanduser("~/SecOCKey")
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.installed_key = cls._read_key_from_files()
threading.Thread(target=cls._instance._update_key_status_loop, daemon=True).start()
return cls._instance
@staticmethod
def _is_key_valid(key: str) -> bool:
"""Checks if the key is a valid 32-character lowercase hexadecimal string."""
if not isinstance(key, str):
return False
if len(key) != 32:
return False
pattern = r"^[0-9a-f]{32}$"
return bool(re.match(pattern, key))
@staticmethod
def _read_key_from_file(file_path: str) -> str | None:
"""
Reads and validates a key from the given file path.
If the key is invalid, the file is deleted.
Returns:
The key if it's valid, None otherwise.
"""
if not os.path.exists(file_path):
return None
try:
with open(file_path, "r") as f:
key = f.read().strip()
if KeyFileManager._is_key_valid(key):
return key
else:
# Key is invalid, delete the file
try:
os.remove(file_path)
print(f"Deleted invalid key file: {file_path} which contained {key}")
except Exception as e:
print(f"Error deleting invalid key file {file_path}: {e}")
return None
except Exception as e:
print(f"Error reading key file {file_path}: {e}")
return None # Return None on any error
@staticmethod
def _read_key_from_files() -> str | None:
"""Reads the key from the appropriate file(s) based on the AGNOS environment."""
if not is_agnos():
return KeyFileManager._read_key_from_file(KeyFileManager.HOME_SECOCKEY_PATH)
data_params_d_secockey = KeyFileManager._read_key_from_file(KeyFileManager.DATA_PARAMS_D_SECOCKEY_PATH)
cache_params_secockey = KeyFileManager._read_key_from_file(KeyFileManager.CACHE_PARAMS_SECOCKEY_PATH)
existing_key = cache_params_secockey or data_params_d_secockey
if not existing_key:
return None
# Write the existing key to missing files
if data_params_d_secockey != existing_key:
KeyFileManager._write_key_to_file(KeyFileManager.DATA_PARAMS_D_SECOCKEY_PATH, existing_key)
if cache_params_secockey != existing_key:
KeyFileManager._write_key_to_file(KeyFileManager.CACHE_PARAMS_SECOCKEY_PATH, existing_key)
return existing_key
@staticmethod
def _write_key_to_file(file_path: str, key: str) -> None:
"""Writes the key to the specified file path."""
print(f"Writing key to file: {key} {file_path}")
try:
with open(file_path, "w") as f:
f.write(key)
except Exception as e:
print(f"Error writing key to file {file_path}: {e}")
def _update_key_status_loop(self) -> None:
"""Periodically updates the key status."""
while True:
self.installed_key = KeyFileManager._read_key_from_files()
time.sleep(Theme.status_update_interval) # Check every x second
def install_key(self, key: str) -> None:
"""Installs the key by writing it to the appropriate file(s) based on the AGNOS environment."""
if not KeyFileManager._is_key_valid(key):
print("Invalid key format. Installation aborted.")
return
if not is_agnos():
KeyFileManager._write_key_to_file(KeyFileManager.HOME_SECOCKEY_PATH, key)
KeyFileManager._installed_key = KeyFileManager._read_key_from_files()
return
KeyFileManager._write_key_to_file(KeyFileManager.DATA_PARAMS_D_SECOCKEY_PATH, key)
KeyFileManager._write_key_to_file(KeyFileManager.CACHE_PARAMS_SECOCKEY_PATH, key)
self.installed_key = KeyFileManager._read_key_from_files()
def uninstall_key(self) -> None:
"""Deletes the key from the appropriate file(s) based on the AGNOS environment."""
def _delete_file(file_path: str):
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"Deleted key file: {file_path}")
except Exception as e:
print(f"Error deleting key file {file_path}: {e}")
if not is_agnos():
_delete_file(KeyFileManager.HOME_SECOCKEY_PATH)
else:
_delete_file(KeyFileManager.DATA_PARAMS_D_SECOCKEY_PATH)
_delete_file(KeyFileManager.CACHE_PARAMS_SECOCKEY_PATH)
self.installed_key = KeyFileManager._read_key_from_files()
Binary file not shown.
+60
View File
@@ -0,0 +1,60 @@
# tsk/common/widget.py
"""
TSK Widget abstraction layer.
This module provides TSK-specific widgets that extend openpilot's Widget system.
By creating this abstraction layer, we insulate the TSK application from changes
in the underlying openpilot UI library.
"""
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.scroller import _Scroller
__all__ = ['TSKWidget', 'Scroller']
# ============================================================================
# Scroller factory function
# ============================================================================
# This replaces the direct import of openpilot's Scroller widget.
#
# WHY: openpilot's Scroller API changes frequently between releases.
# - In v0.10.x: Scroller(items, horizontal=True, pad_start=10, pad_end=10)
# - In v0.11.x: Scroller split into Scroller (wrapper) and _Scroller (impl).
# The wrapper no longer accepts items in the constructor, removed pad_start/
# pad_end (now just pad), and doesn't expose scroll_panel or
# set_scrolling_enabled.
#
# This function wraps _Scroller (the actual implementation) so that:
# 1. TSK code can call Scroller(items, ...) like a normal constructor
# 2. The returned object has scroll_panel, set_scrolling_enabled, etc.
# 3. When comma changes the API again, we only fix this one place
#
# USAGE: from tsk.common.widget import Scroller
# scroller = Scroller([widget1, widget2], horizontal=True, pad=10)
# scroller.scroll_panel.get_offset()
# scroller.set_scrolling_enabled(True)
# ============================================================================
def Scroller(items, **kwargs):
return _Scroller(items, **kwargs)
class TSKWidget(Widget):
"""
Base class for all TSK widgets.
This extends openpilot's Widget class and provides TSK-specific
functionality and customization. All TSK UI components should
extend this class instead of using OpenPilotWidget directly.
Benefits:
- Isolation from openpilot UI changes
- TSK-specific features can be added here
- Consistent behavior across all TSK widgets
- Easier testing and mocking
"""
def __init__(self):
super().__init__()
# TSK-specific initialization can go here
# For example: logging, analytics, custom state management