Files
onepilot/selfdrive/c3_client.py
T
2026-05-31 17:03:18 +08:00

667 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
C3 Client — 部署到 C3 设备上
与 C3 Director 服务端通信
支持远程指令执行、tmux 故障诊断、心跳保活
通过 openpilot 的 PythonProcess 自动启动:
PythonProcess("c3-client", "selfdrive.c3_client", always_run),
零外部依赖(内嵌纯 Python 版 WebSocket 客户端,纯标准库实现)
"""
import asyncio
import base64
import json
import os
import random
import struct
import subprocess
import sys
import traceback
import urllib.parse
from datetime import datetime
# openpilot 基础模块
from openpilot.system.hardware import HARDWARE, PC
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
# ================= 内嵌 WebSocket 客户端(纯标准库) =================
class _WebSocketError(Exception):
pass
class _WebSocketClient:
"""异步 WebSocket 客户端(RFC 6455),仅依赖 Python 标准库"""
def __init__(self, url, ping_interval=20, ping_timeout=15):
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("ws",):
raise ValueError(f"不支持的协议: {parsed.scheme}")
self.host = parsed.hostname
self.port = parsed.port or 80
self.path = parsed.path or "/"
if parsed.query:
self.path += "?" + parsed.query
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self._reader = None
self._writer = None
self._closed = False
self._recv_queue = asyncio.Queue()
self._pong_event = asyncio.Event()
async def connect(self):
"""建立 TCP + WebSocket 握手"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=10
)
# 生成 Sec-WebSocket-Key
rand_bytes = bytes(random.randint(0, 255) for _ in range(16))
ws_key = base64.b64encode(rand_bytes).decode()
request = (
f"GET {self.path} HTTP/1.1\r\n"
f"Host: {self.host}:{self.port}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {ws_key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"\r\n"
)
self._writer.write(request.encode())
await self._writer.drain()
# 读取响应头
response = b""
while b"\r\n\r\n" not in response:
chunk = await asyncio.wait_for(self._reader.read(4096), timeout=10)
if not chunk:
raise _WebSocketError("连接关闭")
response += chunk
header_text = response.decode("utf-8", errors="replace")
status_line = header_text.split("\r\n")[0]
if "101" not in status_line:
raise _WebSocketError(f"握手失败: {status_line}")
self._recv_task = asyncio.create_task(self._recv_loop())
self._ping_task = asyncio.create_task(self._ping_loop())
except asyncio.TimeoutError:
raise _WebSocketError("连接或握手超时")
async def _recv_loop(self):
"""持续读取 WebSocket 帧"""
try:
while not self._closed:
frame = await self._read_frame()
if frame is None:
break
opcode = frame[0] & 0x0F
payload = frame[1]
if opcode in (0x0, 0x1): # 继续帧 / 文本帧
await self._recv_queue.put(("text", payload.decode("utf-8", errors="replace")))
elif opcode == 0x8: # 关闭帧
await self._send_close()
self._closed = True
break
elif opcode == 0x9: # Ping → 自动 Pong
await self._send_frame(0xA, payload)
elif opcode == 0xA: # Pong
self._pong_event.set()
except (asyncio.CancelledError, ConnectionError):
pass
except Exception as e:
print(f"[C3] WebSocket 接收异常: {e}")
finally:
self._closed = True
async def _read_frame(self):
"""读取一个 WebSocket 帧"""
header = await self._read_exact(2)
if not header:
return None
first_byte = header[0]
second_byte = header[1]
length = second_byte & 0x7F
if length == 126:
ext = await self._read_exact(2)
length = struct.unpack("!H", ext)[0]
elif length == 127:
ext = await self._read_exact(8)
length = struct.unpack("!Q", ext)[0]
mask_key = await self._read_exact(4) if (second_byte & 0x80) else None
payload = await self._read_exact(length)
if mask_key:
payload = bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload))
return (first_byte, payload)
async def _read_exact(self, n):
"""精确读取 n 字节"""
data = b""
while len(data) < n:
chunk = await self._reader.read(n - len(data))
if not chunk:
raise _WebSocketError("连接断开")
data += chunk
return data
async def send(self, text):
"""发送文本消息"""
payload = text.encode("utf-8") if isinstance(text, str) else text
await self._send_frame(0x1, payload)
async def _send_frame(self, opcode, payload):
"""发送 WebSocket 帧(客户端必须 mask"""
header = bytearray()
header.append(0x80 | opcode) # FIN + opcode
length = len(payload)
if length < 126:
header.append(0x80 | length)
elif length < 65536:
header.append(0x80 | 126)
header.extend(struct.pack("!H", length))
else:
header.append(0x80 | 127)
header.extend(struct.pack("!Q", length))
mask_key = bytes(random.randint(0, 255) for _ in range(4))
header.extend(mask_key)
header.extend(bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload)))
self._writer.write(bytes(header))
await self._writer.drain()
async def _send_close(self):
try:
self._writer.write(bytes([0x88, 0x00]))
await self._writer.drain()
except Exception:
pass
async def _ping_loop(self):
try:
while not self._closed:
await asyncio.sleep(self.ping_interval)
self._pong_event.clear()
await self._send_frame(0x9, b"")
try:
await asyncio.wait_for(self._pong_event.wait(), timeout=self.ping_timeout)
except asyncio.TimeoutError:
print("[C3] Ping 超时")
break
except asyncio.CancelledError:
pass
except Exception as e:
print(f"[C3] Ping 循环异常: {e}")
finally:
self._closed = True
async def recv(self):
"""接收一条消息"""
while not self._closed:
msg_type, text = await self._recv_queue.get()
if msg_type == "text":
return text
raise ConnectionError("连接已关闭")
def __aiter__(self):
return self._async_iterator()
async def _async_iterator(self):
while not self._closed:
try:
yield await self.recv()
except ConnectionError:
break
async def close(self):
self._closed = True
if self._writer:
await self._send_close()
try:
self._writer.close()
await self._writer.wait_closed()
except Exception:
pass
for attr in ("_recv_task", "_ping_task"):
if hasattr(self, attr):
getattr(self, attr).cancel()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *args):
await self.close()
# ================= 配置 =================
SERVER_URL = "ws://1.15.136.221:8500"
HEARTBEAT_INTERVAL = 5
RECONNECT_DELAY = 5
# ================= 设备标识 =================
_params = Params()
def get_serial():
try:
return HARDWARE.get_serial()
except Exception:
return os.uname().nodename
def get_dongle_id():
try:
dongle = _params.get("DongleId")
return dongle if dongle else ""
except Exception:
return ""
def get_device_type():
try:
if hasattr(HARDWARE, 'get_device_type'):
dt = HARDWARE.get_device_type()
return dt if dt else ""
return ""
except Exception:
return ""
def get_git_branch():
try:
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True, text=True, timeout=5,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
return result.stdout.strip() if result.returncode == 0 else ""
except Exception:
return ""
def get_car_platform():
try:
bundle = _params.get("CarPlatformBundle")
if bundle:
if isinstance(bundle, dict):
return bundle.get("name", "")
import json
return json.loads(bundle).get("name", "")
# 如果没有强制指定车型,从 CarParams 读取实际车型
try:
cp = _params.get("CarParamsPersistent")
if cp:
import json as _j
cp_data = _j.loads(cp if isinstance(cp, str) else cp.decode())
return cp_data.get("carName", "") or cp_data.get("car_platform", "") or ""
except Exception:
pass
return ""
except Exception:
return ""
# ================= 指令执行 =================
async def execute_cmd(command, timeout=30):
try:
result = await asyncio.wait_for(
asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
),
timeout=timeout
)
stdout, _ = await result.communicate()
return {
"status": "ok",
"output": stdout.decode(errors="replace"),
"returncode": result.returncode
}
except asyncio.TimeoutError:
return {"status": "error", "output": "命令执行超时"}
except Exception as e:
return {"status": "error", "output": str(e)}
async def execute_tmux():
tmux_check = await execute_cmd("ps aux | grep tmux | grep -v grep", timeout=3)
if not tmux_check.get("output", "").strip():
return {"status": "ok", "output": "⚠️ 设备上未检测到 tmux 进程运行"}
sessions_result = await execute_cmd("tmux list-sessions 2>&1", timeout=3)
if "error" in sessions_result.get("output", ""):
return {"status": "ok", "output": f"⚠️ 无法获取 tmux 会话\n{sessions_result['output']}"}
result = await execute_cmd(
"tmux capture-pane -t $(tmux list-sessions -F '#{session_name}' 2>/dev/null | head -1) -p -S -200 2>/dev/null",
timeout=5
)
if result["status"] == "ok" and result.get("output", "").strip():
session_info = await execute_cmd("tmux list-sessions 2>&1", timeout=3)
output = f"--- tmux sessions ---\n{session_info.get('output', '')}\n\n--- capture output ---\n{result['output']}"
return {"status": "ok", "output": output}
else:
return {"status": "ok", "output": f"⚠️ 无法捕获 tmux 输出\n{sessions_result.get('output', '')}"}
# ================= 错误查询 =================
# ================= Messaging 数据采集 =================
async def execute_messaging():
"""获取 messaging 实时数据:进程状态、设备状态、车辆状态、控制状态
使用 subprocess 独立进程执行同步 SubMaster 操作,避免在 async 协程中阻塞事件循环"""
try:
script = r'''import sys, json
sys.path.insert(0, "/data/openpilot")
from cereal.messaging import SubMaster, pub_sock, recv_one_or_none
from cereal import log
import time
# 分别订阅每个 topic 并单独等待,避免一次 update 等不全
topics = {"managerState": None, "deviceState": None, "carState": None, "controlsState": None}
for t in topics:
sm = SubMaster([t])
for _ in range(5): # 最多尝试 5 轮,每轮 1 秒
sm.update(1000)
if sm.updated[t]:
topics[t] = sm[t]
break
ms = topics["managerState"]
ds = topics["deviceState"]
cs = topics["carState"]
cts = topics["controlsState"]
# 辅助函数:从 capnp 对象安全取值
def _get(obj, attr, default=0):
return getattr(obj, attr, default) if obj is not None else default
def _get_str(obj, attr, default="--"):
return str(getattr(obj, attr, default)) if obj is not None else default
def _round(obj, attr, precision=2):
return round(getattr(obj, attr, 0), precision) if obj is not None else 0
def _list(obj, attr):
return list(getattr(obj, attr, [])) if obj is not None else []
# 1. 进程状态
key_names = ["selfdrived","modeld","modeld_v2","updated","ui","sensord","camerad",
"boardd","pandad","athenad","c3_client","mapd_nav"]
processes = []
for p in _get(ms, "processes", []):
if p.name in key_names:
processes.append({"name": p.name, "running": p.running, "shouldBeRunning": p.shouldBeRunning, "pid": p.pid})
# 2. 设备状态
device_state = {
"deviceType": _get_str(ds, "deviceType"),
"started": _get(ds, "started", False),
"thermalStatus": _get_str(ds, "thermalStatus"),
"networkType": _get_str(ds, "networkType"),
"networkStrength": _get_str(ds, "networkStrength"),
"cpuUsagePercent": _list(ds, "cpuUsagePercent"),
"gpuUsagePercent": _get(ds, "gpuUsagePercent"),
"memoryUsagePercent": _get(ds, "memoryUsagePercent"),
"freeSpacePercent": _round(ds, "freeSpacePercent", 1),
"powerDrawW": _round(ds, "powerDrawW", 1),
"fanSpeedPercentDesired": _get(ds, "fanSpeedPercentDesired"),
"screenBrightnessPercent": _get(ds, "screenBrightnessPercent"),
"carBatteryCapacityUwh": _get(ds, "carBatteryCapacityUwh"),
"cpuTempC": _list(ds, "cpuTempC"),
"gpuTempC": _list(ds, "gpuTempC"),
"memoryTempC": _round(ds, "memoryTempC", 1),
"maxTempC": _round(ds, "maxTempC", 1),
"dspTempC": _round(ds, "dspTempC", 1) if _get(ds, "dspTempC") else 0,
}
# 3. 车辆状态
cs_obj = _get(cs, "cruiseState")
ws_obj = _get(cs, "wheelSpeeds")
car_state = {
"vEgo": _round(cs, "vEgo"),
"vEgoRaw": _round(cs, "vEgoRaw"),
"vCruise": _round(cs, "vCruise"),
"vCruiseCluster": _round(cs, "vCruiseCluster"),
"steeringAngleDeg": _round(cs, "steeringAngleDeg", 1),
"steeringRateDeg": _round(cs, "steeringRateDeg", 1),
"steeringTorque": _get(cs, "steeringTorque"),
"steeringTorqueEps": _round(cs, "steeringTorqueEps", 1),
"steeringPressed": _get(cs, "steeringPressed", False),
"steerFaultPermanent": _get(cs, "steerFaultPermanent", False),
"steerFaultTemporary": _get(cs, "steerFaultTemporary", False),
"gasPressed": _get(cs, "gasPressed", False),
"brakePressed": _get(cs, "brakePressed", False),
"brakeHoldActive": _get(cs, "brakeHoldActive", False),
"standstill": _get(cs, "standstill", False),
"seatbeltUnlatched": _get(cs, "seatbeltUnlatched", False),
"doorOpen": _get(cs, "doorOpen", False),
"parkingBrake": _get(cs, "parkingBrake", False),
"gearShifter": _get_str(cs, "gearShifter"),
"leftBlinker": _get(cs, "leftBlinker", False),
"rightBlinker": _get(cs, "rightBlinker", False),
"leftBlindspot": _get(cs, "leftBlindspot", False),
"rightBlindspot": _get(cs, "rightBlindspot", False),
"canValid": _get(cs, "canValid", False),
"canTimeout": _get(cs, "canTimeout", False),
"accFaulted": _get(cs, "accFaulted", False),
"aEgo": _round(cs, "aEgo", 4),
"yawRate": _round(cs, "yawRate", 4),
"cruiseState": {
"enabled": _get(cs_obj, "enabled", False),
"available": _get(cs_obj, "available", False),
"speed": _round(cs_obj, "speed"),
"speedCluster": _round(cs_obj, "speedCluster"),
},
"wheelSpeeds": {
"fl": _round(ws_obj, "fl"),
"fr": _round(ws_obj, "fr"),
"rl": _round(ws_obj, "rl"),
"rr": _round(ws_obj, "rr"),
},
}
# 4. 控制状态
controls_state = {
"longControlState": _get_str(cts, "longControlState"),
"lateralControlState": _get_str(cts, "lateralControlState"),
"curvature": _round(cts, "curvature", 6),
"desiredCurvature": _round(cts, "desiredCurvature", 6),
"ufAccelCmd": _round(cts, "ufAccelCmd", 4),
"uiAccelCmd": _round(cts, "uiAccelCmd", 4),
"upAccelCmd": _round(cts, "upAccelCmd", 4),
"forceDecel": _get(cts, "forceDecel", False),
}
print(json.dumps({
"processes": processes,
"deviceState": device_state,
"carState": car_state,
"controlsState": controls_state,
}))
'''
result = await asyncio.wait_for(
asyncio.create_subprocess_exec(
"python3", "-c", script,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
), timeout=15
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
return {"status": "error", "output": f"子进程错误: {stderr.decode(errors='replace')[:500]}"}
output = stdout.decode(errors="replace").strip()
# 验证是否为合法JSON
json.loads(output)
return {"status": "ok", "output": output}
except asyncio.TimeoutError:
return {"status": "error", "output": "获取 messaging 数据超时(15s)"}
except json.JSONDecodeError:
return {"status": "error", "output": f"数据解析失败: {output[:300]}"}
except Exception as e:
return {"status": "error", "output": f"获取 messaging 数据失败: {e}\n{traceback.format_exc()}"}
# ================= 更新辅助 =================
async def execute_update_oneclick():
"""一键更新:直接发送 SIGHUP 信号触发 updated 进程执行完整检查+下载
只发 SIGHUP 即可(内部先 check_for_update 再 fetch_update
避免先发 SIGUSR1 再发 SIGHUP 导致的时序竞争(user_request 在 sleep 前被清空)"""
try:
# 检查 updated 进程是否在运行
check = await execute_cmd("pgrep -f 'system.updated.updated'", timeout=3)
if not check.get("output", "").strip():
return {"status": "error", "output": "设备不在 offroad 状态,updated 进程未运行"}
# 只发 SIGHUP(内部先检查再下载,一步到位)
subprocess.run(
["sudo", "-u", "comma", "pkill", "-SIGHUP", "-f", "system.updated.updated"],
timeout=5, capture_output=True
)
return {"status": "ok", "output": "一键更新已触发"}
except Exception as e:
return {"status": "error", "output": f"一键更新失败: {e}"}
# ================= 消息处理 =================
async def handle_message(data, ws):
msg_type = data.get("type")
msg_id = data.get("id")
content = data.get("content", "")
timeout = data.get("timeout", 15)
if msg_type == "cmd":
result = await execute_cmd(content, timeout)
elif msg_type == "tmux":
result = await execute_tmux()
elif msg_type == "ping":
result = {"status": "ok", "output": "pong"}
elif msg_type == "update":
result = await execute_update_oneclick()
elif msg_type == "msgq":
result = await execute_messaging()
else:
result = {"status": "error", "output": f"未知指令类型: {msg_type}"}
response = {
"type": "result",
"id": msg_id,
"status": result["status"],
"output": result.get("output", "")
}
try:
await ws.send(json.dumps(response))
except Exception as e:
print(f"[C3] 发送响应失败: {e}")
# ================= 主循环 =================
async def run():
serial = get_serial()
dongle_id = get_dongle_id()
git_branch = get_git_branch()
car_platform = get_car_platform()
device_type = get_device_type()
print(f"[C3 Director Client] 启动 serial={serial} branch={git_branch} platform={car_platform}")
while True:
try:
async with _WebSocketClient(
SERVER_URL,
ping_interval=20,
ping_timeout=15
) as ws:
print(f"[C3] ✅ 已连接服务器")
register_msg = json.dumps({
"type": "register",
"serial": serial,
"dongle_id": dongle_id,
"git_branch": git_branch,
"car_platform": car_platform,
"device_type": device_type
})
await ws.send(register_msg)
print(f"[C3] 已注册: {serial} branch={git_branch} type={device_type}")
async def heartbeat():
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
try:
# 通过 modeld_tinygrad 进程判断 onroad/offroad
offroad = True # 默认 offroad
try:
import subprocess
result = subprocess.run(
"ps aux | grep -v grep | grep -q modeld_tinygrad && echo 0 || echo 1",
shell=True, capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
offroad = result.stdout.strip() == "1"
except Exception:
offroad = True
await ws.send(json.dumps({"type": "heartbeat", "offroad": offroad, "car_platform": get_car_platform()}))
except Exception:
break
async def receiver():
async for message in ws:
try:
data = json.loads(message)
await handle_message(data, ws)
except json.JSONDecodeError:
pass
except Exception as e:
print(f"[C3] 处理消息出错: {e}")
traceback.print_exc()
await asyncio.wait_for(asyncio.gather(heartbeat(), receiver()), timeout=65)
except (ConnectionError, OSError, _WebSocketError):
print(f"[C3] 连接断开")
except ConnectionRefusedError:
print(f"[C3] 服务器拒绝连接")
except OSError as e:
print(f"[C3] 网络错误: {e}")
except asyncio.TimeoutError:
print(f"[C3] 连接超时")
except Exception as e:
print(f"[C3] 异常: {e}")
traceback.print_exc()
print(f"[C3] {RECONNECT_DELAY} 秒后重连...")
await asyncio.sleep(RECONNECT_DELAY)
# ================= 入口 =================
def main():
try:
asyncio.run(run())
except KeyboardInterrupt:
print("[C3] 客户端已停止")
except Exception as e:
print(f"[C3] 致命错误: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()