Files
onepilot/system/hardware/tici/modem.py
T
2026-06-11 20:00:23 +08:00

752 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
import fcntl
import json
import logging
import os
import serial
import signal
import subprocess
import tempfile
import time
from ipaddress import IPv4Address, AddressValueError
from enum import Enum
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d %(levelname)-7s modem: %(message)s",
datefmt="%H:%M:%S",
)
AT_PORT = "/dev/modem_at0"
PPP_PORT = "/dev/modem_at1"
STATE_PATH = "/dev/shm/modem"
AT_LOCK = "/dev/shm/modem.lock" # shared with LPA
AT_INIT = [
"ATE0", # disable command echo
"ATV1", # verbose result codes (CONNECT/BUSY/NO CARRIER, not numeric)
"AT+CMEE=1", # numeric +CME ERROR codes on failures (per 3GPP 27.007)
"ATX4", # extended result codes: busy + dial tone detection, line speed in CONNECT
"AT&C1", # DCD pin follows carrier state (V.250 default)
"AT+CREG=2", # registration URCs include location info
"AT+CGREG=2", # GPRS registration URCs include location info
]
CREG = {0: "not_registered", 1: "home", 2: "searching", 3: "denied", 4: "unknown", 5: "roaming"}
# 3GPP TS 27.007 +COPS <AcT> -> network type
NETWORK_TYPE = {0: "gsm", 1: "gsm", 3: "gsm", 8: "gsm",
2: "utran", 4: "utran", 5: "utran", 6: "utran",
7: "lte", 9: "lte", 10: "lte",
11: "nr", 12: "nr", 13: "nr"}
DIAL_CID = 1
WEBBING_ICCID_PREFIX = "8985235"
PPPD_CMD = [
"sudo", "pppd", PPP_PORT, "460800", "noauth", "nodetach", "noipdefault", "usepeerdns",
"nodefaultroute", "connect",
"/usr/sbin/chat -v ABORT 'NO CARRIER' ABORT 'NO DIALTONE' ABORT 'BUSY' " +
f"ABORT 'NO ANSWER' ABORT 'ERROR' TIMEOUT 5 '' AT OK ATD*99***{DIAL_CID}# CONNECT ''",
"lcp-echo-interval", "30", "lcp-echo-failure", "4", "mtu", "1500", "mru", "1500",
"novj", "novjccomp", "ipcp-accept-local", "ipcp-accept-remote", "nomagic",
"user", '""', "password", '""',
]
INITIAL_STATE = {
"seconds_since_boot": 0,
"state": "INITIALIZING",
"connected": False, "ip_address": "",
"iccid": "", "mcc_mnc": "", "imei": "", "modem_version": "",
"signal_strength": 0, "signal_quality": 0,
"network_type": "unknown", "operator": "", "band": "", "channel": 0,
"registration": "unknown", "temperatures": [], "extra": "",
"tx_bytes": 0, "rx_bytes": 0,
}
class State(Enum):
INITIALIZING = "INITIALIZING"
SEARCHING = "SEARCHING"
CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
DISCONNECTING = "DISCONNECTING"
STATE_WAIT = 1.0 # seconds to wait after each state handler returns
class PPPSession:
"""Owns pppd lifecycle, fail tracking, and PPP routing."""
MAX_FAILS = 3
def __init__(self):
self._proc: subprocess.Popen | None = None
self._fails = 0
self._peer = ""
def start(self):
self._proc = subprocess.Popen(PPPD_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
self._peer = ""
logging.info(f"PPP dialing CID {DIAL_CID}")
def kill(self):
subprocess.run(["sudo", "killall", "-9", "pppd"], capture_output=True)
self._peer = ""
@staticmethod
def reset_data_port():
"""Drop DTR on PPP_PORT so the modem terminates any stuck PPP session."""
try:
with serial.Serial(PPP_PORT, 460800, timeout=1) as s:
s.dtr = False
time.sleep(0.2)
s.dtr = True
except Exception as e:
logging.warning(f"data port reset failed: {e}")
def has_exited(self) -> bool:
return self._proc is not None and self._proc.poll() is not None
def reset_fail_counter(self):
self._fails = 0
def record_fail(self) -> bool:
"""Bump fail counter; return True if at the give-up limit."""
self._fails += 1
return self._fails >= self.MAX_FAILS
@property
def fails(self) -> int:
return self._fails
def maybe_install_routes(self, ip: str, peer: str) -> bool:
"""Install routes if peer changed; kill the session on failure so the state machine reconnects."""
if not peer or peer == self._peer:
return False
try:
IPv4Address(ip)
IPv4Address(peer)
except AddressValueError:
logging.warning(f"refusing route install with non-IPv4 ip={ip!r} peer={peer!r}")
self.kill()
return False
self.cleanup_routes()
cmds = [
["sudo", "ip", "route", "add", "default", "via", peer, "dev", "ppp0", "metric", "1000"],
["sudo", "ip", "route", "add", "default", "via", peer, "dev", "ppp0", "table", "1000"],
["sudo", "ip", "rule", "add", "from", ip, "table", "1000"],
]
for cmd in cmds:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
logging.warning(f"route install failed ({' '.join(cmd[1:])}): {r.stderr.strip()}")
self.cleanup_routes()
self.kill()
return False
logging.info(f"route set up for {ip} via {peer}")
self._peer = peer
return True
def maybe_install_dns(self, dns_servers: list[str]) -> bool:
"""Register DNS servers with systemd-resolved; kill the session on failure to force a retry."""
if not dns_servers:
return False
for cmd in (["sudo", "resolvectl", "dns", "ppp0", *dns_servers],
["sudo", "resolvectl", "default-route", "ppp0", "yes"]):
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
logging.warning(f"resolvectl failed ({' '.join(cmd[1:])}): {r.stderr.strip()}")
self.kill()
return False
logging.info(f"resolvectl: ppp0 DNS = {dns_servers}")
return True
@staticmethod
def cleanup_routes():
subprocess.run(["sudo", "ip", "route", "del", "default", "dev", "ppp0"], capture_output=True)
subprocess.run(["sudo", "ip", "route", "flush", "table", "1000"], capture_output=True)
# rules don't have a flush; delete until none remain
while subprocess.run(["sudo", "ip", "rule", "del", "table", "1000"], capture_output=True).returncode == 0:
pass
subprocess.run(["sudo", "resolvectl", "revert", "ppp0"], capture_output=True)
# ============================================================================
# dragonpilot: QMI data path for the comma3 (tici) EG25
#
# The comma3 EG25 runs in QMI mode (AT+QCFG="usbnet"==0) and does NOT support
# *99# PPP data: pppd negotiates LCP/PAP then the modem drops at IPCP, so the
# PPP-only upstream daemon never gets an IP. QMISession is a drop-in for
# PPPSession (same interface the Modem state machine uses) that brings data up
# over QMI on wwan0 via qmicli. It is selected automatically in _do_initializing
# when the modem reports QMI mode; everything else (AT init, registration,
# polling, /dev/shm/modem) is unchanged. Kept self-contained to ease upstream
# modem.py merges.
# ============================================================================
QMI_DEV = "/dev/cdc-wdm0"
QMI_IFACE = "wwan0"
QMI_POLL_INTERVAL = 10.0 # s; how often to actually query the modem while connected (drop detection latency)
class QMISession:
"""Drop-in for PPPSession that connects over QMI/wwan0 instead of *99# PPP."""
MAX_FAILS = 3
IFACE = QMI_IFACE
def __init__(self):
self._fails = 0
self._ip = ""
self._last_status_check = 0.0
@staticmethod
def available() -> bool:
return os.path.exists(QMI_DEV)
@staticmethod
def _param(key):
try:
with open(f"/data/params/d/{key}") as f:
return f.read().strip()
except FileNotFoundError:
return ""
@staticmethod
def _qmicli(*args, timeout=30):
return subprocess.run(["sudo", "qmicli", "-d", QMI_DEV, "-p", *args],
capture_output=True, text=True, timeout=timeout)
def start(self):
apn = self._param("GsmApn")
# qmi_wwan needs raw-ip mode for IPv4; set it while the iface is down
subprocess.run(["sudo", "ip", "link", "set", QMI_IFACE, "down"], capture_output=True)
subprocess.run(["sudo", "sh", "-c", f"echo Y > /sys/class/net/{QMI_IFACE}/qmi/raw_ip"], capture_output=True)
subprocess.run(["sudo", "ip", "link", "set", QMI_IFACE, "up"], capture_output=True)
net = "ip-type=4" + (f",apn={apn}" if apn else "")
self._qmicli(f"--wds-start-network={net}", "--client-no-release-cid")
self._ip = ""
self._last_status_check = time.monotonic() # give the bearer an interval before the first drop check
logging.info(f"QMI start-network on {QMI_IFACE} (apn={apn or '(network-provided)'})")
def kill(self):
self.cleanup_routes()
subprocess.run(["sudo", "ip", "addr", "flush", "dev", QMI_IFACE], capture_output=True)
subprocess.run(["sudo", "ip", "link", "set", QMI_IFACE, "down"], capture_output=True)
self._ip = ""
@staticmethod
def reset_data_port():
pass # no DTR/serial data port for QMI
def has_exited(self) -> bool:
# Throttle: querying the modem every loop (1s) is wasteful and the link is stable.
# Between checks assume still up -> a drop is detected within QMI_POLL_INTERVAL.
now = time.monotonic()
if now - self._last_status_check < QMI_POLL_INTERVAL:
return False
self._last_status_check = now
r = self._qmicli("--wds-get-packet-service-status", timeout=10)
return "disconnected" in r.stdout.lower()
def reset_fail_counter(self):
self._fails = 0
def record_fail(self) -> bool:
self._fails += 1
return self._fails >= self.MAX_FAILS
@property
def fails(self) -> int:
return self._fails
def _settings(self):
r = self._qmicli("--wds-get-current-settings", timeout=10)
def grab(label):
for line in r.stdout.splitlines():
if label in line:
return line.split(":", 1)[1].strip()
return ""
dns = [d for d in (grab("IPv4 primary DNS"), grab("IPv4 secondary DNS")) if d]
return grab("IPv4 address"), grab("IPv4 gateway address"), grab("IPv4 subnet mask"), dns
@staticmethod
def cleanup_routes():
subprocess.run(["sudo", "ip", "route", "del", "default", "dev", QMI_IFACE], capture_output=True)
subprocess.run(["sudo", "ip", "route", "flush", "table", "1000"], capture_output=True)
while subprocess.run(["sudo", "ip", "rule", "del", "table", "1000"], capture_output=True).returncode == 0:
pass
subprocess.run(["sudo", "resolvectl", "revert", QMI_IFACE], capture_output=True)
def poll_iface(self) -> dict:
# The IP is stable for the whole session, so configure wwan0 once and then
# serve the cached value -- no qmicli per loop. _ip is cleared on (re)start.
if self._ip:
return {"ip_address": self._ip, "connected": True}
ip, gw, mask, dns = self._settings()
if not (ip and gw):
return {"connected": False, "ip_address": ""}
try:
IPv4Address(ip)
IPv4Address(gw)
except AddressValueError:
logging.warning(f"refusing route install with non-IPv4 ip={ip!r} gw={gw!r}")
return {}
prefix = sum(bin(int(o)).count("1") for o in mask.split(".")) if mask.count(".") == 3 else 32
self.cleanup_routes()
subprocess.run(["sudo", "ip", "addr", "flush", "dev", QMI_IFACE], capture_output=True)
cmds = [
["sudo", "ip", "addr", "add", f"{ip}/{prefix}", "dev", QMI_IFACE],
["sudo", "ip", "link", "set", QMI_IFACE, "up"],
["sudo", "ip", "route", "add", "default", "via", gw, "dev", QMI_IFACE, "metric", "1000"],
["sudo", "ip", "route", "add", "default", "via", gw, "dev", QMI_IFACE, "table", "1000"],
["sudo", "ip", "rule", "add", "from", ip, "table", "1000"],
]
for cmd in cmds:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
logging.warning(f"qmi route install failed ({' '.join(cmd[1:])}): {r.stderr.strip()}")
self.cleanup_routes()
return {}
if dns:
subprocess.run(["sudo", "resolvectl", "dns", QMI_IFACE, *dns], capture_output=True)
subprocess.run(["sudo", "resolvectl", "default-route", QMI_IFACE, "yes"], capture_output=True)
self._ip = ip
logging.info(f"QMI {ip}/{prefix} via {gw} on {QMI_IFACE}, dns={dns}")
return {"ip_address": ip, "connected": True}
class Modem:
def __init__(self):
self._ppp = PPPSession()
self._sim_change = False
self._apn = "" # blank = network-provided via PCO
self._roaming_allowed = True
self.running = True
self.S = INITIAL_STATE.copy()
@staticmethod
def _read_param(key):
try:
with open(f"/data/params/d/{key}") as f:
return f.read().strip()
except FileNotFoundError:
return ""
@staticmethod
def _parse_reg(v: str) -> str:
try:
return CREG.get(int(v.split(",")[1].strip('"')), "unknown")
except (ValueError, IndexError):
return "unknown"
@staticmethod
def _has_modem_manager() -> bool:
return os.path.isfile("/lib/systemd/system/ModemManager.service")
def _is_roaming_allowed(self) -> bool:
if self.S["iccid"].startswith(WEBBING_ICCID_PREFIX):
return True
return self._read_param("GsmRoaming") == "1"
def _publish_state(self, **kwargs):
self.S.update(kwargs)
self.S["seconds_since_boot"] = time.monotonic()
with tempfile.NamedTemporaryFile(mode="w", dir="/dev/shm", delete=False) as f:
json.dump(self.S, f, indent=2)
os.chmod(f.name, 0o644)
os.replace(f.name, STATE_PATH)
def _at(self, cmd):
"""Send AT command, return response lines. [] on error or if LPA holds port."""
fd = os.open(AT_LOCK, os.O_CREAT | os.O_RDWR, 0o666)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
os.close(fd)
return []
try:
with serial.Serial(AT_PORT, 9600, timeout=5) as ser:
ser.reset_input_buffer()
ser.write((cmd + "\r").encode())
lines = []
while True:
raw = ser.readline()
if not raw:
raise TimeoutError("AT timeout")
line = raw.decode(errors="ignore").strip()
if not line:
continue
if line == "OK":
break
if line == "ERROR" or line.startswith("+CME ERROR"):
raise RuntimeError(line)
lines.append(line)
return lines
except (RuntimeError, TimeoutError, OSError) as e:
logging.info(f"AT {cmd} failed: {e}")
return []
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
def _atv(self, cmd, pfx):
for line in self._at(cmd):
if pfx in line and ":" in line:
return line.split(":", 1)[1].strip()
return None
def _init_at_channel(self) -> bool:
"""Run AT_INIT and confirm ATE0 took effect. Returns False if echo is still on."""
for c in AT_INIT:
self._at(c)
r = self._at("AT+CGMI")
return bool(r) and not r[0].startswith("AT")
def _configure_modem(self, modem_version: str):
if not modem_version.startswith("EG25"):
return
cmds = [
# clear initial EPS bearer APN (some carriers reject the default)
'AT+CGDCONT=0,"IP",""',
]
# SIM hot swap: skip on the comma3 (TICI_DOS) - matches openpilot v0.10.0, which
# only sent these on tizi (C3X). Enabling SIM-detect on the C3 can cause
# spurious SIM-removed events and drop the connection.
if "TICI_DOS" not in os.environ:
cmds += [
'AT+QSIMDET=1,0',
'AT+QSIMSTAT=1',
]
cmds += [
# configure modem as data-centric
'AT+QNVW=5280,0,"0102000000000000"',
'AT+QNVFW="/nv/item_files/ims/IMS_enable",00',
'AT+QNVFW="/nv/item_files/modem/mmode/ue_usage_setting",01',
]
for c in cmds:
self._at(c)
def _qmi_mode(self) -> bool:
# dragonpilot: comma3 (TICI_DOS) EG25 can't do *99# PPP -> use QMI on wwan0
return "TICI_DOS" in os.environ and QMISession.available()
def _do_initializing(self):
if not os.path.exists(AT_PORT):
return State.INITIALIZING
logging.info("port found, initializing")
self._ppp.kill()
self._ppp.cleanup_routes()
if not self._init_at_channel():
logging.warning("AT echo still on, retrying")
return State.INITIALIZING
identity = self._read_identity()
if not identity["iccid"] or not identity["imei"]:
logging.warning(f"identity read incomplete: {identity}, retrying")
return State.INITIALIZING
self._configure_modem(identity["modem_version"])
# dragonpilot: switch the data session to QMI once if the modem is QMI-mode (comma3 EG25)
if not isinstance(self._ppp, QMISession) and self._qmi_mode():
self._ppp.kill()
self._ppp.cleanup_routes()
self._ppp = QMISession()
logging.info("using QMI data path (wwan0)")
self.S.update(identity)
self._apn = self._read_param("GsmApn")
self._roaming_allowed = self._is_roaming_allowed()
# blank APN lets the carrier supply one via PCO
self._at(f'AT+CGDCONT={DIAL_CID},"IP","{self._apn}"')
logging.info(f"APN '{self._apn or '(network-provided)'}' written to CID {DIAL_CID}, roaming={'on' if self._roaming_allowed else 'off'}")
self._sim_change = False # clear since we just re-read identity with the new SIM
self._publish_state(**identity)
return State.SEARCHING
def _read_identity(self):
def first_line(cmd):
r = self._at(cmd)
return r[0].strip() if r else ""
imei = first_line("AT+CGSN")
if not (imei.isdigit() and 14 <= len(imei) <= 17): # 3GPP TS 23.003
imei = ""
iccid = (self._atv("AT+QCCID", "+QCCID:") or "").rstrip("F")
if not iccid.isdigit():
iccid = ""
imsi = first_line("AT+CIMI")
mcc_mnc = imsi[:6] if imsi.isdigit() and len(imsi) >= 6 else ""
modem_version = first_line("AT+GMR")
logging.info(f"imei={imei} iccid={iccid} mcc_mnc={mcc_mnc} ver={modem_version}")
return {"imei": imei, "iccid": iccid, "mcc_mnc": mcc_mnc, "modem_version": modem_version}
def _do_searching(self):
new_roaming = self._is_roaming_allowed()
if new_roaming != self._roaming_allowed:
logging.info(f"roaming changed: {self._roaming_allowed} -> {new_roaming}")
self._roaming_allowed = new_roaming
v = self._atv("AT+CREG?", "+CREG:")
if not v:
return self._searching_idle()
reg = self._parse_reg(v)
greg = self._parse_reg(self._atv("AT+CGREG?", "+CGREG:") or "")
logging.debug(f"creg={reg} cgreg={greg} roaming_allowed={self._roaming_allowed}")
if reg == "roaming" and not self._roaming_allowed:
self._publish_state(registration=reg)
return State.SEARCHING
if reg in ("home", "roaming") and greg in ("home", "roaming"):
self._publish_state(registration=reg)
return State.CONNECTING
if reg != self.S.get("registration"):
self._publish_state(registration=reg)
return self._searching_idle()
def _searching_idle(self):
if self._sim_change or not os.path.exists(AT_PORT):
logging.info(f"-> reconnecting (sim_change={self._sim_change} port={os.path.exists(AT_PORT)})")
return State.DISCONNECTING
return State.SEARCHING
def _do_connecting(self):
logging.info("starting pppd")
self._ppp.reset_fail_counter()
self._sim_change = False
self._ppp.start()
return State.CONNECTED
def _handle_pppd_exit(self):
if self._sim_change or not os.path.exists(AT_PORT):
return State.DISCONNECTING
give_up = self._ppp.record_fail()
if give_up:
logging.warning(f"PPP fail {self._ppp.fails}/{self._ppp.MAX_FAILS}, reconnecting")
return State.DISCONNECTING
logging.warning(f"PPP fail {self._ppp.fails}/{self._ppp.MAX_FAILS}, retrying")
self._ppp.reset_data_port()
if not os.path.exists(AT_PORT):
return State.DISCONNECTING
self._ppp.start()
return State.CONNECTED
def _params_changed(self) -> bool:
new_apn = self._read_param("GsmApn")
if new_apn != self._apn:
logging.info(f"GsmApn changed: '{self._apn}' -> '{new_apn}'")
return True
new_roaming = self._is_roaming_allowed()
if new_roaming != self._roaming_allowed:
logging.info(f"roaming changed: {self._roaming_allowed} -> {new_roaming}")
return True
return False
def _check_iccid(self, state):
if state in (State.INITIALIZING, State.DISCONNECTING) or not self.S["iccid"]:
return
iccid = (self._atv("AT+QCCID", "+QCCID:") or "").rstrip("F")
if iccid and iccid != self.S["iccid"]:
logging.warning(f"iccid changed: {self.S['iccid']} -> {iccid}")
self._sim_change = True
def _do_connected(self):
if self._ppp.has_exited():
return self._handle_pppd_exit()
if self._sim_change or not os.path.exists(AT_PORT) or self._params_changed():
return State.DISCONNECTING
self._poll()
return State.CONNECTED
def _do_disconnecting(self):
logging.warning("reconnecting")
self._publish_state(**INITIAL_STATE)
self._ppp.kill()
self._ppp.cleanup_routes()
self._ppp.reset_data_port()
self._sim_change = False
return State.INITIALIZING
def _poll_signal(self) -> dict:
v = self._atv("AT+CSQ", "+CSQ:")
if not v:
return {}
try:
rssi = int(v.split(",")[0])
if rssi == 99:
return {}
return {"signal_strength": rssi, "signal_quality": min(100, int(rssi / 31 * 100))}
except (ValueError, IndexError):
return {}
def _poll_operator(self) -> dict:
v = self._atv("AT+COPS?", "+COPS:")
if not v:
return {}
p = v.split(",")
out: dict = {}
try:
if len(p) >= 3:
out["operator"] = p[2].strip('"')
if len(p) >= 4:
out["network_type"] = NETWORK_TYPE.get(int(p[3]), "unknown")
except (ValueError, IndexError):
pass
return out
def _poll_band(self) -> dict:
v = self._atv("AT+QNWINFO", "+QNWINFO:")
if not v:
return {}
info = v.replace('"', '').split(",")
try:
if len(info) >= 4:
return {"band": info[2], "channel": int(info[3])}
except ValueError:
pass
return {}
def _poll_extra(self) -> dict:
v = self._atv('AT+QENG="servingcell"', "+QENG:")
return {"extra": v.replace('"', '')} if v else {}
def _poll_temps(self) -> dict:
v = self._atv("AT+QTEMP", "+QTEMP:")
if not v:
return {}
try:
return {"temperatures": [t for t in (int(x) for x in v.split(",") if x.strip()) if t != 255]}
except (ValueError, IndexError):
return {}
def _poll_iface(self) -> dict:
if isinstance(self._ppp, QMISession): # dragonpilot: QMI handles its own iface/routes/dns
return self._ppp.poll_iface()
try:
r = subprocess.run(["ip", "-4", "addr", "show", "ppp0"], capture_output=True, text=True, timeout=2)
ip, peer = "", ""
for line in r.stdout.splitlines():
# `inet 10.x.x.x peer 10.64.64.64/32 ...`
parts = line.strip().split()
if "inet" in parts:
i = parts.index("inet")
ip = parts[i + 1].split("/")[0]
if "peer" in parts:
peer = parts[parts.index("peer") + 1].split("/")[0]
break
if ip:
if self._ppp.maybe_install_routes(ip, peer):
self._ppp.maybe_install_dns(self._read_cellular_dns())
return {"ip_address": ip, "connected": True}
if self.S["connected"]:
return {"connected": False, "ip_address": ""}
except Exception:
pass
return {}
def _read_cellular_dns(self) -> list[str]:
v = self._atv(f"AT+CGCONTRDP={DIAL_CID}", "+CGCONTRDP:")
if not v:
return []
# +CGCONTRDP: <cid>,<bearer_id>,<apn>,<local_addr>,<gw_addr>,<dns_prim>,<dns_sec>,...
fields = [f.strip().strip('"') for f in v.split(",")]
dns_servers = []
for d in fields[5:7]:
try:
dns_servers.append(str(IPv4Address(d)))
except (AddressValueError, ValueError):
pass
if not dns_servers:
logging.warning(f"no cellular DNS servers reported by modem: {v!r}")
return dns_servers
def _poll_byte_counters(self) -> dict:
iface = getattr(self._ppp, "IFACE", "ppp0") # dragonpilot: wwan0 under QMI, else ppp0
try:
with open(f"/sys/class/net/{iface}/statistics/tx_bytes") as f:
tx = int(f.read().strip())
with open(f"/sys/class/net/{iface}/statistics/rx_bytes") as f:
rx = int(f.read().strip())
except Exception:
return {}
return {"tx_bytes": tx, "rx_bytes": rx}
def _poll(self):
s: dict = {}
for fn in (self._poll_signal, self._poll_operator, self._poll_band,
self._poll_extra, self._poll_temps, self._poll_iface,
self._poll_byte_counters):
s.update(fn())
if s:
self._publish_state(**s)
def run(self):
logging.info("starting")
self._publish_state(state=State.INITIALIZING.value)
if self._has_modem_manager():
subprocess.run(["sudo", "systemctl", "mask", "--runtime", "ModemManager"], capture_output=True)
subprocess.run(["sudo", "systemctl", "stop", "ModemManager"], capture_output=True)
self._ppp.kill()
state = State.INITIALIZING
handlers = {
State.INITIALIZING: self._do_initializing,
State.SEARCHING: self._do_searching,
State.CONNECTING: self._do_connecting,
State.CONNECTED: self._do_connected,
State.DISCONNECTING: self._do_disconnecting,
}
while self.running:
try:
self._check_iccid(state)
prev = state
state = handlers[state]()
if state != prev:
self._publish_state(state=state.value)
logging.info(f"{prev.value} -> {state.value}")
except Exception:
logging.exception(f"error in {state.value}")
state = State.DISCONNECTING
time.sleep(STATE_WAIT)
def stop(self):
self.running = False
self._ppp.kill()
self._ppp.cleanup_routes()
try:
os.remove(STATE_PATH)
except FileNotFoundError:
pass
if self._has_modem_manager():
subprocess.run(["sudo", "systemctl", "unmask", "--runtime", "ModemManager"], capture_output=True)
subprocess.run(["sudo", "systemctl", "start", "ModemManager"], capture_output=True)
def main():
m = Modem()
def _sig(*_):
m.running = False
signal.signal(signal.SIGINT, _sig)
signal.signal(signal.SIGTERM, _sig)
m.run()
m.stop()
if __name__ == "__main__":
main()