Files
StarPilot/system/ui/lib/wifi_manager.py
T
Shane Smiskol a3f2452fa7 WifiManager: single source for known connections (#37262)
* temp

* rev

* reproduce race condition where connection removed signal takes a while to remove, then update networks keep is_saved true

* fix

* Revert "reproduce race condition where connection removed signal takes a while to remove, then update networks keep is_saved true"

This reverts commit cf7044ee955777db16434ab81c520bbe798c9164.

* not anymore

* more clear

* safe guards

nl
2026-02-19 00:49:35 -08:00

955 lines
36 KiB
Python

import atexit
import threading
import time
import uuid
import subprocess
from collections.abc import Callable
from dataclasses import dataclass, replace
from enum import IntEnum
from typing import Any
from jeepney import DBusAddress, new_method_call
from jeepney.bus_messages import MatchRule, message_bus
from jeepney.io.blocking import DBusConnection, open_dbus_connection as open_dbus_connection_blocking
from jeepney.io.threading import DBusRouter, open_dbus_connection as open_dbus_connection_threading
from jeepney.low_level import MessageType
from jeepney.wrappers import Properties
from openpilot.common.swaglog import cloudlog
from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_802_11_AP_SEC_PAIR_WEP40,
NM_802_11_AP_SEC_PAIR_WEP104, NM_802_11_AP_SEC_GROUP_WEP40,
NM_802_11_AP_SEC_GROUP_WEP104, NM_802_11_AP_SEC_KEY_MGMT_PSK,
NM_802_11_AP_SEC_KEY_MGMT_802_1X, NM_802_11_AP_FLAGS_NONE,
NM_802_11_AP_FLAGS_PRIVACY, NM_802_11_AP_FLAGS_WPS,
NM_PATH, NM_IFACE, NM_ACCESS_POINT_IFACE, NM_SETTINGS_PATH,
NM_SETTINGS_IFACE, NM_CONNECTION_IFACE, NM_DEVICE_IFACE,
NM_DEVICE_TYPE_WIFI, NM_DEVICE_TYPE_MODEM, NM_ACTIVE_CONNECTION_IFACE,
NM_IP4_CONFIG_IFACE, NM_PROPERTIES_IFACE, NMDeviceState, NMDeviceStateReason)
try:
from openpilot.common.params import Params
except Exception:
Params = None
TETHERING_IP_ADDRESS = "192.168.43.1"
DEFAULT_TETHERING_PASSWORD = "swagswagcomma"
SIGNAL_QUEUE_SIZE = 10
SCAN_PERIOD_SECONDS = 5
DEBUG = False
_dbus_call_idx = 0
def _wrap_router(router):
def _wrap(orig):
def wrapper(msg, **kw):
global _dbus_call_idx
_dbus_call_idx += 1
if DEBUG:
h = msg.header.fields
print(f"[DBUS #{_dbus_call_idx}] {h.get(6, '?')} {h.get(3, '?')} {msg.body}")
return orig(msg, **kw)
return wrapper
router.send_and_get_reply = _wrap(router.send_and_get_reply)
router.send = _wrap(router.send)
class SecurityType(IntEnum):
OPEN = 0
WPA = 1
WPA2 = 2
WPA3 = 3
UNSUPPORTED = 4
class MeteredType(IntEnum):
UNKNOWN = 0
YES = 1
NO = 2
def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType:
wpa_props = wpa_flags | rsn_flags
# obtained by looking at flags of networks in the office as reported by an Android phone
supports_wpa = (NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 |
NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK)
if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)):
return SecurityType.OPEN
elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X):
return SecurityType.WPA
else:
cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}")
return SecurityType.UNSUPPORTED
@dataclass(frozen=True)
class Network:
ssid: str
strength: int
security_type: SecurityType
@classmethod
def from_dbus(cls, ssid: str, aps: list["AccessPoint"]) -> "Network":
# we only want to show the strongest AP for each Network/SSID
strongest_ap = max(aps, key=lambda ap: ap.strength)
security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags)
return cls(
ssid=ssid,
strength=strongest_ap.strength,
security_type=security_type,
)
@dataclass(frozen=True)
class AccessPoint:
ssid: str
bssid: str
strength: int
flags: int
wpa_flags: int
rsn_flags: int
ap_path: str
@classmethod
def from_dbus(cls, ap_props: dict[str, tuple[str, Any]], ap_path: str) -> "AccessPoint":
ssid = bytes(ap_props['Ssid'][1]).decode("utf-8", "replace")
bssid = str(ap_props['HwAddress'][1])
strength = int(ap_props['Strength'][1])
flags = int(ap_props['Flags'][1])
wpa_flags = int(ap_props['WpaFlags'][1])
rsn_flags = int(ap_props['RsnFlags'][1])
return cls(
ssid=ssid,
bssid=bssid,
strength=strength,
flags=flags,
wpa_flags=wpa_flags,
rsn_flags=rsn_flags,
ap_path=ap_path,
)
class ConnectStatus(IntEnum):
DISCONNECTED = 0
CONNECTING = 1
CONNECTED = 2
@dataclass
class WifiState:
ssid: str | None = None
status: ConnectStatus = ConnectStatus.DISCONNECTED
class WifiManager:
def __init__(self):
self._networks: list[Network] = [] # a network can be comprised of multiple APs
self._active = True # used to not run when not in settings
self._exit = False
# DBus connections
try:
self._router_main = DBusRouter(open_dbus_connection_threading(bus="SYSTEM")) # used by scanner / general method calls
_wrap_router(self._router_main)
self._conn_monitor = open_dbus_connection_blocking(bus="SYSTEM") # used by state monitor thread
self._nm = DBusAddress(NM_PATH, bus_name=NM, interface=NM_IFACE)
except FileNotFoundError:
cloudlog.exception("Failed to connect to system D-Bus")
self._router_main = None
self._conn_monitor = None
self._exit = True
# Store wifi device path
self._wifi_device: str | None = None
# State
self._connections: dict[str, str] = {} # ssid -> connection path, updated via NM signals
self._wifi_state: WifiState = WifiState()
self._ipv4_address: str = ""
self._current_network_metered: MeteredType = MeteredType.UNKNOWN
self._tethering_password: str = ""
self._ipv4_forward = False
self._last_network_scan: float = 0.0
self._callback_queue: list[Callable] = []
self._tethering_ssid = "weedle"
if Params is not None:
dongle_id = Params().get("DongleId")
if dongle_id:
self._tethering_ssid += "-" + dongle_id[:4]
# Callbacks
self._need_auth: list[Callable[[str], None]] = []
self._activated: list[Callable[[], None]] = []
self._forgotten: list[Callable[[str | None], None]] = []
self._networks_updated: list[Callable[[list[Network]], None]] = []
self._disconnected: list[Callable[[], None]] = []
self._lock = threading.Lock()
self._scan_thread = threading.Thread(target=self._network_scanner, daemon=True)
self._state_thread = threading.Thread(target=self._monitor_state, daemon=True)
self._initialize()
atexit.register(self.stop)
def _initialize(self):
def worker():
self._wait_for_wifi_device()
self._init_connections()
if Params is not None and self._tethering_ssid not in self._connections:
self._add_tethering_connection()
self._init_wifi_state()
self._scan_thread.start()
self._state_thread.start()
self._tethering_password = self._get_tethering_password()
cloudlog.debug("WifiManager initialized")
threading.Thread(target=worker, daemon=True).start()
def _init_wifi_state(self, block: bool = True):
def worker():
dev_addr = DBusAddress(self._wifi_device, bus_name=NM, interface=NM_DEVICE_IFACE)
dev_state = self._router_main.send_and_get_reply(Properties(dev_addr).get('State')).body[0][1]
wifi_state = WifiState()
if NMDeviceState.PREPARE <= dev_state <= NMDeviceState.SECONDARIES and dev_state != NMDeviceState.NEED_AUTH:
wifi_state.status = ConnectStatus.CONNECTING
elif dev_state == NMDeviceState.ACTIVATED:
wifi_state.status = ConnectStatus.CONNECTED
conn_path, _ = self._get_active_wifi_connection()
if conn_path:
wifi_state.ssid = next((s for s, p in self._connections.items() if p == conn_path), None)
self._wifi_state = wifi_state
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def add_callbacks(self, need_auth: Callable[[str], None] | None = None,
activated: Callable[[], None] | None = None,
forgotten: Callable[[str], None] | None = None,
networks_updated: Callable[[list[Network]], None] | None = None,
disconnected: Callable[[], None] | None = None):
if need_auth is not None:
self._need_auth.append(need_auth)
if activated is not None:
self._activated.append(activated)
if forgotten is not None:
self._forgotten.append(forgotten)
if networks_updated is not None:
self._networks_updated.append(networks_updated)
if disconnected is not None:
self._disconnected.append(disconnected)
@property
def networks(self) -> list[Network]:
return self._networks
@property
def wifi_state(self) -> WifiState:
return self._wifi_state
@property
def ipv4_address(self) -> str:
return self._ipv4_address
@property
def current_network_metered(self) -> MeteredType:
return self._current_network_metered
@property
def connecting_to_ssid(self) -> str | None:
return self._wifi_state.ssid if self._wifi_state.status == ConnectStatus.CONNECTING else None
@property
def connected_ssid(self) -> str | None:
return self._wifi_state.ssid if self._wifi_state.status == ConnectStatus.CONNECTED else None
@property
def tethering_password(self) -> str:
return self._tethering_password
def _set_connecting(self, ssid: str | None):
self._wifi_state = WifiState(ssid=ssid, status=ConnectStatus.DISCONNECTED if ssid is None else ConnectStatus.CONNECTING)
def _enqueue_callbacks(self, cbs: list[Callable], *args):
for cb in cbs:
self._callback_queue.append(lambda _cb=cb: _cb(*args))
def process_callbacks(self):
# Call from UI thread to run any pending callbacks
to_run, self._callback_queue = self._callback_queue, []
for cb in to_run:
cb()
def set_active(self, active: bool):
self._active = active
# Update networks and WiFi state (to self-heal) immediately when activating for UI
if active:
self._init_wifi_state(block=False)
self._update_networks(block=False)
def _monitor_state(self):
# Filter for signals
rules = (
MatchRule(
type="signal",
interface=NM_DEVICE_IFACE,
member="StateChanged",
path=self._wifi_device,
),
MatchRule(
type="signal",
interface=NM_SETTINGS_IFACE,
member="NewConnection",
path=NM_SETTINGS_PATH,
),
MatchRule(
type="signal",
interface=NM_SETTINGS_IFACE,
member="ConnectionRemoved",
path=NM_SETTINGS_PATH,
),
MatchRule(
type="signal",
interface=NM_PROPERTIES_IFACE,
member="PropertiesChanged",
path=self._wifi_device,
),
)
for rule in rules:
self._conn_monitor.send_and_get_reply(message_bus.AddMatch(rule))
with (self._conn_monitor.filter(rules[0], bufsize=SIGNAL_QUEUE_SIZE) as state_q,
self._conn_monitor.filter(rules[1], bufsize=SIGNAL_QUEUE_SIZE) as new_conn_q,
self._conn_monitor.filter(rules[2], bufsize=SIGNAL_QUEUE_SIZE) as removed_conn_q,
self._conn_monitor.filter(rules[3], bufsize=SIGNAL_QUEUE_SIZE) as props_q):
while not self._exit:
try:
self._conn_monitor.recv_messages(timeout=1)
except TimeoutError:
continue
# Connection added/removed
while len(removed_conn_q):
conn_path = removed_conn_q.popleft().body[0]
self._connection_removed(conn_path)
while len(new_conn_q):
conn_path = new_conn_q.popleft().body[0]
self._new_connection(conn_path)
# PropertiesChanged on wifi device (LastScan = scan complete)
while len(props_q):
iface, changed, _ = props_q.popleft().body
if iface == NM_WIRELESS_IFACE and 'LastScan' in changed:
self._update_networks()
# Device state changes
# TODO: known race conditions when switching networks (e.g. forget A, connect to B):
# 1. DEACTIVATING/DISCONNECTED + CONNECTION_REMOVED: fires before NewConnection for B
# arrives, so _set_connecting(None) clears B's CONNECTING state causing UI flicker.
# DEACTIVATING(CONNECTION_REMOVED): wifi_state (B, CONNECTING) -> (None, DISCONNECTED)
# Fix: make DEACTIVATING a no-op, and guard DISCONNECTED with
# `if wifi_state.ssid not in _connections` (NewConnection arrives between the two).
# 2. PREPARE/CONFIG ssid lookup: DBus may return stale A's conn_path, overwriting B.
# PREPARE(0): wifi_state (B, CONNECTING) -> (A, CONNECTING)
# Fix: only do DBus lookup when wifi_state.ssid is None (auto-connections);
# user-initiated connections already have ssid set via _set_connecting.
while len(state_q):
new_state, previous_state, change_reason = state_q.popleft().body
if new_state == NMDeviceState.DISCONNECTED:
if change_reason != NMDeviceStateReason.NEW_ACTIVATION:
# catches CONNECTION_REMOVED reason when connection is forgotten
self._set_connecting(None)
elif new_state in (NMDeviceState.PREPARE, NMDeviceState.CONFIG):
# Set connecting status when NetworkManager connects to known networks on its own
wifi_state = replace(self._wifi_state, status=ConnectStatus.CONNECTING)
conn_path, _ = self._get_active_wifi_connection(self._conn_monitor)
if conn_path is None:
cloudlog.warning("Failed to get active wifi connection during PREPARE/CONFIG state")
else:
wifi_state.ssid = next((s for s, p in self._connections.items() if p == conn_path), None)
self._wifi_state = wifi_state
# BAD PASSWORD
# - strong network rejects with NEED_AUTH+SUPPLICANT_DISCONNECT
# - weak/gone network fails with FAILED+NO_SECRETS
elif ((new_state == NMDeviceState.NEED_AUTH and change_reason == NMDeviceStateReason.SUPPLICANT_DISCONNECT) or
(new_state == NMDeviceState.FAILED and change_reason == NMDeviceStateReason.NO_SECRETS)):
if self._wifi_state.ssid:
self._enqueue_callbacks(self._need_auth, self._wifi_state.ssid)
self._set_connecting(None)
elif new_state in (NMDeviceState.NEED_AUTH, NMDeviceState.IP_CONFIG, NMDeviceState.IP_CHECK,
NMDeviceState.SECONDARIES, NMDeviceState.FAILED):
pass
elif new_state == NMDeviceState.ACTIVATED:
# Note that IP address from Ip4Config may not be propagated immediately and could take until the next scan results
self._update_networks()
wifi_state = replace(self._wifi_state, status=ConnectStatus.CONNECTED)
conn_path, _ = self._get_active_wifi_connection(self._conn_monitor)
if conn_path is None:
cloudlog.warning("Failed to get active wifi connection during ACTIVATED state")
self._wifi_state = wifi_state
self._enqueue_callbacks(self._activated)
else:
wifi_state.ssid = next((s for s, p in self._connections.items() if p == conn_path), None)
self._wifi_state = wifi_state
self._enqueue_callbacks(self._activated)
# Persist volatile connections (created by AddAndActivateConnection2) to disk
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
save_reply = self._conn_monitor.send_and_get_reply(new_method_call(conn_addr, 'Save'))
if save_reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to persist connection to disk: {save_reply}")
elif new_state == NMDeviceState.DEACTIVATING:
if change_reason == NMDeviceStateReason.CONNECTION_REMOVED:
# When connection is forgotten
self._set_connecting(None)
def _network_scanner(self):
while not self._exit:
if self._active:
if time.monotonic() - self._last_network_scan > SCAN_PERIOD_SECONDS:
self._request_scan()
self._last_network_scan = time.monotonic()
time.sleep(1 / 2.)
def _wait_for_wifi_device(self):
while not self._exit:
device_path = self._get_adapter(NM_DEVICE_TYPE_WIFI)
if device_path is not None:
self._wifi_device = device_path
break
time.sleep(1)
def _get_adapter(self, adapter_type: int) -> str | None:
# Return the first NetworkManager device path matching adapter_type
try:
device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0]
for device_path in device_paths:
dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE)
dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1]
if dev_type == adapter_type:
return str(device_path)
except Exception as e:
cloudlog.exception(f"Error getting adapter type {adapter_type}: {e}")
return None
def _init_connections(self) -> None:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]
conns: dict[str, str] = {}
for conn_path in known_connections:
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get connection settings for {conn_path}')
continue
if "802-11-wireless" in settings:
ssid = settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace")
if ssid != "":
conns[ssid] = conn_path
self._connections = conns
def _new_connection(self, conn_path: str):
settings = self._get_connection_settings(conn_path)
if "802-11-wireless" in settings:
ssid = settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace")
if ssid != "":
self._connections[ssid] = conn_path
def _connection_removed(self, conn_path: str):
self._connections = {ssid: path for ssid, path in self._connections.items() if path != conn_path}
def _get_active_connections(self, router: DBusConnection | DBusRouter | None = None):
# Returns list of ActiveConnection
if router is None:
router = self._router_main
return router.send_and_get_reply(Properties(self._nm).get('ActiveConnections')).body[0][1]
def _get_active_wifi_connection(self, router: DBusConnection | DBusRouter | None = None) -> tuple[str | None, dict | None]:
# Returns first Connection settings path and ActiveConnection props from ActiveConnections with Type 802-11-wireless
if router is None:
router = self._router_main
for active_conn in self._get_active_connections(router):
conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE)
reply = router.send_and_get_reply(Properties(conn_addr).get_all())
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get active connection properties for {active_conn}: {reply}")
continue
props = reply.body[0]
conn_path = props.get('Connection', ('o', '/'))[1]
if props.get('Type', ('s', ''))[1] == '802-11-wireless' and conn_path != '/':
return conn_path, props
return None, None
def _get_connection_settings(self, conn_path: str) -> dict:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'GetSettings'))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to get connection settings: {reply}')
return {}
return dict(reply.body[0])
def _add_tethering_connection(self):
connection = {
'connection': {
'type': ('s', '802-11-wireless'),
'uuid': ('s', str(uuid.uuid4())),
'id': ('s', 'Hotspot'),
'autoconnect-retries': ('i', 0),
'interface-name': ('s', 'wlan0'),
'autoconnect': ('b', False),
},
'802-11-wireless': {
'band': ('s', 'bg'),
'mode': ('s', 'ap'),
'ssid': ('ay', self._tethering_ssid.encode("utf-8")),
},
'802-11-wireless-security': {
'group': ('as', ['ccmp']),
'key-mgmt': ('s', 'wpa-psk'),
'pairwise': ('as', ['ccmp']),
'proto': ('as', ['rsn']),
'psk': ('s', DEFAULT_TETHERING_PASSWORD),
},
'ipv4': {
'method': ('s', 'shared'),
'address-data': ('aa{sv}', [[
('address', ('s', TETHERING_IP_ADDRESS)),
('prefix', ('u', 24)),
]]),
'gateway': ('s', TETHERING_IP_ADDRESS),
'never-default': ('b', True),
},
'ipv6': {'method': ('s', 'ignore')},
}
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
self._router_main.send_and_get_reply(new_method_call(settings_addr, 'AddConnection', 'a{sa{sv}}', (connection,)))
def connect_to_network(self, ssid: str, password: str, hidden: bool = False):
self._set_connecting(ssid)
def worker():
# Clear all connections that may already exist to the network we are connecting to
self.forget_connection(ssid, block=True)
connection = {
'connection': {
'type': ('s', '802-11-wireless'),
'uuid': ('s', str(uuid.uuid4())),
'id': ('s', f'openpilot connection {ssid}'),
'autoconnect-retries': ('i', 0),
},
'802-11-wireless': {
'ssid': ('ay', ssid.encode("utf-8")),
'hidden': ('b', hidden),
'mode': ('s', 'infrastructure'),
},
'ipv4': {
'method': ('s', 'auto'),
'dns-priority': ('i', 600),
},
'ipv6': {'method': ('s', 'ignore')},
}
if password:
connection['802-11-wireless-security'] = {
'key-mgmt': ('s', 'wpa-psk'),
'auth-alg': ('s', 'open'),
'psk': ('s', password),
}
# Volatile connection auto-deletes on disconnect (wrong password, user switches networks)
# Persisted to disk on ACTIVATED via Save()
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
self._set_connecting(None)
return
reply = self._router_main.send_and_get_reply(new_method_call(self._nm, 'AddAndActivateConnection2', 'a{sa{sv}}ooa{sv}',
(connection, self._wifi_device, "/", {'persist': ('s', 'volatile')})))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to add and activate connection for {ssid}: {reply}")
self._set_connecting(None)
threading.Thread(target=worker, daemon=True).start()
def forget_connection(self, ssid: str, block: bool = False):
def worker():
conn_path = self._connections.get(ssid, None)
if conn_path is None:
cloudlog.warning(f"Trying to forget unknown connection: {ssid}")
else:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Delete'))
self._update_networks()
self._enqueue_callbacks(self._forgotten, ssid)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def activate_connection(self, ssid: str, block: bool = False):
self._set_connecting(ssid)
def worker():
conn_path = self._connections.get(ssid, None)
if conn_path is None or self._wifi_device is None:
cloudlog.warning(f"Failed to activate connection for {ssid}: conn_path={conn_path}, wifi_device={self._wifi_device}")
self._set_connecting(None)
return
reply = self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo',
(conn_path, self._wifi_device, "/")))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to activate connection for {ssid}: {reply}")
self._set_connecting(None)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def _deactivate_connection(self, ssid: str):
for active_conn in self._get_active_connections():
conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE)
specific_obj_path = self._router_main.send_and_get_reply(Properties(conn_addr).get('SpecificObject')).body[0][1]
if specific_obj_path != "/":
ap_addr = DBusAddress(specific_obj_path, bus_name=NM, interface=NM_ACCESS_POINT_IFACE)
ap_ssid = bytes(self._router_main.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace")
if ap_ssid == ssid:
self._router_main.send_and_get_reply(new_method_call(self._nm, 'DeactivateConnection', 'o', (active_conn,)))
return
def is_tethering_active(self) -> bool:
# Check ssid, not connected_ssid, to also catch connecting state
return self._wifi_state.ssid == self._tethering_ssid
def is_connection_saved(self, ssid: str) -> bool:
return ssid in self._connections
def set_tethering_password(self, password: str):
def worker():
conn_path = self._connections.get(self._tethering_ssid, None)
if conn_path is None:
cloudlog.warning('No tethering connection found')
return
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get tethering settings for {conn_path}')
return
settings['802-11-wireless-security']['psk'] = ('s', password)
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Update', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to update tethering settings: {reply}')
return
self._tethering_password = password
if self.is_tethering_active():
self.activate_connection(self._tethering_ssid, block=True)
threading.Thread(target=worker, daemon=True).start()
def _get_tethering_password(self) -> str:
conn_path = self._connections.get(self._tethering_ssid, None)
if conn_path is None:
cloudlog.warning('No tethering connection found')
return ''
reply = self._router_main.send_and_get_reply(new_method_call(
DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE),
'GetSecrets', 's', ('802-11-wireless-security',)
))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to get tethering password: {reply}')
return ''
secrets = reply.body[0]
if '802-11-wireless-security' not in secrets:
return ''
return str(secrets['802-11-wireless-security'].get('psk', ('s', ''))[1])
def set_ipv4_forward(self, enabled: bool):
self._ipv4_forward = enabled
def set_tethering_active(self, active: bool):
def worker():
if active:
self.activate_connection(self._tethering_ssid, block=True)
if not self._ipv4_forward:
time.sleep(5)
cloudlog.warning("net.ipv4.ip_forward = 0")
subprocess.run(["sudo", "sysctl", "net.ipv4.ip_forward=0"], check=False)
else:
self._deactivate_connection(self._tethering_ssid)
threading.Thread(target=worker, daemon=True).start()
def set_current_network_metered(self, metered: MeteredType):
def worker():
if self.is_tethering_active():
return
conn_path, _ = self._get_active_wifi_connection()
if conn_path is None:
cloudlog.warning('No active WiFi connection found')
return
settings = self._get_connection_settings(conn_path)
if len(settings) == 0:
cloudlog.warning(f'Failed to get connection settings for {conn_path}')
return
settings['connection']['metered'] = ('i', int(metered))
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Update', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f'Failed to update metered settings: {reply}')
threading.Thread(target=worker, daemon=True).start()
def _request_scan(self):
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
wifi_addr = DBusAddress(self._wifi_device, bus_name=NM, interface=NM_WIRELESS_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(wifi_addr, 'RequestScan', 'a{sv}', ({},)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to request scan: {reply}")
def _update_networks(self, block: bool = True):
if not self._active:
return
def worker():
with self._lock:
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
# NOTE: AccessPoints property may exclude hidden APs (use GetAllAccessPoints method if needed)
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE)
wifi_props = self._router_main.send_and_get_reply(Properties(wifi_addr).get_all()).body[0]
ap_paths = wifi_props.get('AccessPoints', ('ao', []))[1]
aps: dict[str, list[AccessPoint]] = {}
for ap_path in ap_paths:
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ap_props = self._router_main.send_and_get_reply(Properties(ap_addr).get_all())
# some APs have been seen dropping off during iteration
if ap_props.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get AP properties for {ap_path}")
continue
try:
ap = AccessPoint.from_dbus(ap_props.body[0], ap_path)
if ap.ssid == "":
continue
if ap.ssid not in aps:
aps[ap.ssid] = []
aps[ap.ssid].append(ap)
except Exception:
# catch all for parsing errors
cloudlog.exception(f"Failed to parse AP properties for {ap_path}")
networks = [Network.from_dbus(ssid, ap_list) for ssid, ap_list in aps.items()]
networks.sort(key=lambda n: (n.ssid != self._wifi_state.ssid, not self.is_connection_saved(n.ssid), -n.strength, n.ssid.lower()))
self._networks = networks
self._update_active_connection_info()
self._enqueue_callbacks(self._networks_updated, self._networks)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def _update_active_connection_info(self):
ipv4_address = ""
metered = MeteredType.UNKNOWN
conn_path, props = self._get_active_wifi_connection()
if conn_path is not None and props is not None:
# IPv4 address
ip4config_path = props.get('Ip4Config', ('o', '/'))[1]
if ip4config_path != "/":
ip4config_addr = DBusAddress(ip4config_path, bus_name=NM, interface=NM_IP4_CONFIG_IFACE)
address_data = self._router_main.send_and_get_reply(Properties(ip4config_addr).get('AddressData')).body[0][1]
for entry in address_data:
if 'address' in entry:
ipv4_address = entry['address'][1]
break
# Metered status
settings = self._get_connection_settings(conn_path)
if len(settings) > 0:
metered_prop = settings['connection'].get('metered', ('i', 0))[1]
if metered_prop == MeteredType.YES:
metered = MeteredType.YES
elif metered_prop == MeteredType.NO:
metered = MeteredType.NO
self._ipv4_address = ipv4_address
self._current_network_metered = metered
def __del__(self):
self.stop()
def update_gsm_settings(self, roaming: bool, apn: str, metered: bool):
"""Update GSM settings for cellular connection"""
def worker():
try:
lte_connection_path = self._get_lte_connection_path()
if not lte_connection_path:
cloudlog.warning("No LTE connection found")
return
settings = self._get_connection_settings(lte_connection_path)
if len(settings) == 0:
cloudlog.warning(f"Failed to get connection settings for {lte_connection_path}")
return
# Ensure dicts exist
if 'gsm' not in settings:
settings['gsm'] = {}
if 'connection' not in settings:
settings['connection'] = {}
changes = False
auto_config = apn == ""
if settings['gsm'].get('auto-config', ('b', False))[1] != auto_config:
cloudlog.warning(f'Changing gsm.auto-config to {auto_config}')
settings['gsm']['auto-config'] = ('b', auto_config)
changes = True
if settings['gsm'].get('apn', ('s', ''))[1] != apn:
cloudlog.warning(f'Changing gsm.apn to {apn}')
settings['gsm']['apn'] = ('s', apn)
changes = True
if settings['gsm'].get('home-only', ('b', False))[1] == roaming:
cloudlog.warning(f'Changing gsm.home-only to {not roaming}')
settings['gsm']['home-only'] = ('b', not roaming)
changes = True
# Unknown means NetworkManager decides
metered_int = int(MeteredType.UNKNOWN if metered else MeteredType.NO)
if settings['connection'].get('metered', ('i', 0))[1] != metered_int:
cloudlog.warning(f'Changing connection.metered to {metered_int}')
settings['connection']['metered'] = ('i', metered_int)
changes = True
if changes:
# Update the connection settings (temporary update)
conn_addr = DBusAddress(lte_connection_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'UpdateUnsaved', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to update GSM settings: {reply}")
return
self._activate_modem_connection(lte_connection_path)
except Exception as e:
cloudlog.exception(f"Error updating GSM settings: {e}")
threading.Thread(target=worker, daemon=True).start()
def _get_lte_connection_path(self) -> str | None:
try:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]
for conn_path in known_connections:
settings = self._get_connection_settings(conn_path)
if settings and settings.get('connection', {}).get('id', ('s', ''))[1] == 'lte':
return str(conn_path)
except Exception as e:
cloudlog.exception(f"Error finding LTE connection: {e}")
return None
def _activate_modem_connection(self, connection_path: str):
try:
modem_device = self._get_adapter(NM_DEVICE_TYPE_MODEM)
if modem_device and connection_path:
self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo', (connection_path, modem_device, "/")))
except Exception as e:
cloudlog.exception(f"Error activating modem connection: {e}")
def stop(self):
if not self._exit:
self._exit = True
if self._scan_thread.is_alive():
self._scan_thread.join()
if self._state_thread.is_alive():
self._state_thread.join()
if self._router_main is not None:
self._router_main.close()
self._router_main.conn.close()
if self._conn_monitor is not None:
self._conn_monitor.close()