编辑c3_client.py
This commit is contained in:
@@ -539,59 +539,6 @@ async def execute_update_oneclick():
|
||||
return {"status": "error", "output": f"一键更新失败: {e}"}
|
||||
|
||||
|
||||
# ================= 截图 =================
|
||||
async def execute_snapshot():
|
||||
"""截取设备屏幕画面,返回 base64 编码的 PNG"""
|
||||
try:
|
||||
import base64, subprocess, os, json
|
||||
|
||||
# 设备 UI 使用 pyray (raylib) 渲染,用 take_screenshot 截图
|
||||
# raylib 截图需要渲染循环,不能离线使用
|
||||
# 直接读取 framebuffer 方案
|
||||
|
||||
# 方法1: 检查有没有已存在的 screenshot 文件(某些系统支持热键截图)
|
||||
exists = os.path.exists("/tmp/screenshot.png") or os.path.exists("/data/screenshot.png")
|
||||
if exists:
|
||||
path = "/tmp/screenshot.png" if os.path.exists("/tmp/screenshot.png") else "/data/screenshot.png"
|
||||
with open(path, "rb") as f:
|
||||
b64 = base64.b64encode(f.read()).decode()
|
||||
return {"status": "ok", "output": b64}
|
||||
|
||||
# 方法2: 执行 screencap(部分安卓设备)
|
||||
r = subprocess.run(["screencap", "-p"], capture_output=True, timeout=5)
|
||||
if r.returncode == 0 and len(r.stdout) > 100:
|
||||
return {"status": "ok", "output": base64.b64encode(r.stdout).decode()}
|
||||
|
||||
# 方法3: 检查 /dev/fb0 并尝试直接读取
|
||||
if os.path.exists("/dev/fb0"):
|
||||
# 读取帧缓冲
|
||||
with open("/sys/class/graphics/fb0/virtual_size") as f:
|
||||
size_str = f.read().strip()
|
||||
if size_str:
|
||||
w, h = map(int, size_str.split(","))
|
||||
with open("/dev/fb0", "rb") as fb:
|
||||
fb.seek(0, os.SEEK_SET)
|
||||
data = fb.read(w * h * 4) # ARGB8888
|
||||
if len(data) == w * h * 4:
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
arr = np.frombuffer(data, dtype=np.uint8).reshape((h, w, 4))
|
||||
arr = arr[:, :, [2, 1, 0, 3]] # BGRA -> RGBA
|
||||
img = Image.fromarray(arr[:, :, :3], "RGB")
|
||||
from io import BytesIO
|
||||
buf = BytesIO()
|
||||
img.save(buf, format="PNG")
|
||||
return {"status": "ok", "output": base64.b64encode(buf.getvalue()).decode()}
|
||||
|
||||
return {"status": "error", "output": "设备不支持截图"}
|
||||
except ImportError as e:
|
||||
return {"status": "error", "output": f"缺少依赖: {e}"}
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "output": "无可用截图方法"}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"status": "error", "output": f"截图失败: {e}\\n{traceback.format_exc()}"}
|
||||
|
||||
# ================= 消息处理 =================
|
||||
async def handle_message(data, ws):
|
||||
msg_type = data.get("type")
|
||||
@@ -609,8 +556,6 @@ async def handle_message(data, ws):
|
||||
result = await execute_update_oneclick()
|
||||
elif msg_type == "msgq":
|
||||
result = await execute_messaging()
|
||||
elif msg_type == "snapshot":
|
||||
result = await execute_snapshot()
|
||||
|
||||
else:
|
||||
result = {"status": "error", "output": f"未知指令类型: {msg_type}"}
|
||||
|
||||
Reference in New Issue
Block a user