WifiManager: update networks on active (#37186)

* immediately

* only if active
This commit is contained in:
Shane Smiskol
2026-02-12 13:40:00 -08:00
committed by GitHub
parent 7665045fc6
commit 2b7f91d151
+44 -34
View File
@@ -232,6 +232,10 @@ class WifiManager:
def set_active(self, active: bool):
self._active = active
# Update networks immediately when activating for UI
if active:
self._update_networks(block=False)
def _monitor_state(self):
# Filter for signals
rules = (
@@ -607,53 +611,59 @@ class WifiManager:
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to request scan: {reply}")
def _update_networks(self):
def _update_networks(self, block: bool = True):
if not self._active:
return
with self._lock:
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
def worker():
with self._lock:
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
# NOTE: AccessPoints property may exclude hidden APs (use GetAllAccessPoints method if needed)
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE)
wifi_props = self._router_main.send_and_get_reply(Properties(wifi_addr).get_all()).body[0]
active_ap_path = wifi_props.get('ActiveAccessPoint', ('o', '/'))[1]
ap_paths = wifi_props.get('AccessPoints', ('ao', []))[1]
# NOTE: AccessPoints property may exclude hidden APs (use GetAllAccessPoints method if needed)
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE)
wifi_props = self._router_main.send_and_get_reply(Properties(wifi_addr).get_all()).body[0]
active_ap_path = wifi_props.get('ActiveAccessPoint', ('o', '/'))[1]
ap_paths = wifi_props.get('AccessPoints', ('ao', []))[1]
aps: dict[str, list[AccessPoint]] = {}
aps: dict[str, list[AccessPoint]] = {}
for ap_path in ap_paths:
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ap_props = self._router_main.send_and_get_reply(Properties(ap_addr).get_all())
for ap_path in ap_paths:
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ap_props = self._router_main.send_and_get_reply(Properties(ap_addr).get_all())
# some APs have been seen dropping off during iteration
if ap_props.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get AP properties for {ap_path}")
continue
try:
ap = AccessPoint.from_dbus(ap_props.body[0], ap_path, active_ap_path)
if ap.ssid == "":
# some APs have been seen dropping off during iteration
if ap_props.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get AP properties for {ap_path}")
continue
if ap.ssid not in aps:
aps[ap.ssid] = []
try:
ap = AccessPoint.from_dbus(ap_props.body[0], ap_path, active_ap_path)
if ap.ssid == "":
continue
aps[ap.ssid].append(ap)
except Exception:
# catch all for parsing errors
cloudlog.exception(f"Failed to parse AP properties for {ap_path}")
if ap.ssid not in aps:
aps[ap.ssid] = []
networks = [Network.from_dbus(ssid, ap_list, ssid in self._connections) for ssid, ap_list in aps.items()]
# sort with quantized strength to reduce jumping
networks.sort(key=lambda n: (-n.is_connected, -n.is_saved, -round(n.strength / 100 * 2), n.ssid.lower()))
self._networks = networks
aps[ap.ssid].append(ap)
except Exception:
# catch all for parsing errors
cloudlog.exception(f"Failed to parse AP properties for {ap_path}")
self._update_active_connection_info()
networks = [Network.from_dbus(ssid, ap_list, ssid in self._connections) for ssid, ap_list in aps.items()]
# sort with quantized strength to reduce jumping
networks.sort(key=lambda n: (-n.is_connected, -n.is_saved, -round(n.strength / 100 * 2), n.ssid.lower()))
self._networks = networks
self._enqueue_callbacks(self._networks_updated, self._networks)
self._update_active_connection_info()
self._enqueue_callbacks(self._networks_updated, self._networks)
if block:
worker()
else:
threading.Thread(target=worker, daemon=True).start()
def _update_active_connection_info(self):
self._ipv4_address = ""