mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-05 21:42:05 +08:00
python ui: wifi manager (#34814)
* python wifi manager * fix ui * need auth callback * move to widgets * confirm forgot * add drag detection * improve keyboard & list * remove duplicate * typos * use gui_app render * refactor * cleanup * cleanup * shutdown * fix types * revert * scroll panel cleanup * reset is_dragging on mouse release * Revert "reset is_dragging on mouse release" This reverts commit ff5e51cf6f00848d93aa3ce0bab16602fea7a319. * unformat * cleanup * update state when connecting * forgotten callback * maybe fix? first tap didn't work * Revert "maybe fix? first tap didn't work" This reverts commit 739f0e3bd37323d10479b081a20d05c7fdff5495. * remove set_target_fps * Revert "remove set_target_fps" This reverts commit 96f74553ef6fa2ab2a4d1a07880175e7da628c9f. --------- Co-authored-by: Cameron Clough <cameronjclough@gmail.com>
This commit is contained in:
@@ -4,6 +4,7 @@ from enum import IntEnum
|
||||
MOUSE_WHEEL_SCROLL_SPEED = 30
|
||||
INERTIA_FRICTION = 0.95 # The rate at which the inertia slows down
|
||||
MIN_VELOCITY = 0.1 # Minimum velocity before stopping the inertia
|
||||
DRAG_THRESHOLD = 5 # Pixels of movement to consider it a drag, not a click
|
||||
|
||||
|
||||
class ScrollState(IntEnum):
|
||||
@@ -16,10 +17,12 @@ class GuiScrollPanel:
|
||||
def __init__(self, show_vertical_scroll_bar: bool = False):
|
||||
self._scroll_state: ScrollState = ScrollState.IDLE
|
||||
self._last_mouse_y: float = 0.0
|
||||
self._start_mouse_y: float = 0.0 # Track the initial mouse position for drag detection
|
||||
self._offset = rl.Vector2(0, 0)
|
||||
self._view = rl.Rectangle(0, 0, 0, 0)
|
||||
self._show_vertical_scroll_bar: bool = show_vertical_scroll_bar
|
||||
self._velocity_y = 0.0 # Velocity for inertia
|
||||
self._is_dragging = False
|
||||
|
||||
def handle_scroll(self, bounds: rl.Rectangle, content: rl.Rectangle) -> rl.Vector2:
|
||||
mouse_pos = rl.get_mouse_position()
|
||||
@@ -35,20 +38,27 @@ class GuiScrollPanel:
|
||||
self._scroll_state = ScrollState.DRAGGING_SCROLLBAR
|
||||
|
||||
self._last_mouse_y = mouse_pos.y
|
||||
self._start_mouse_y = mouse_pos.y # Record starting position
|
||||
self._velocity_y = 0.0 # Reset velocity when drag starts
|
||||
self._is_dragging = False # Reset dragging flag
|
||||
|
||||
if self._scroll_state != ScrollState.IDLE:
|
||||
if rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT):
|
||||
delta_y = mouse_pos.y - self._last_mouse_y
|
||||
|
||||
# Check if movement exceeds the drag threshold
|
||||
total_drag = abs(mouse_pos.y - self._start_mouse_y)
|
||||
if total_drag > DRAG_THRESHOLD:
|
||||
self._is_dragging = True
|
||||
|
||||
if self._scroll_state == ScrollState.DRAGGING_CONTENT:
|
||||
self._offset.y += delta_y
|
||||
else:
|
||||
elif self._scroll_state == ScrollState.DRAGGING_SCROLLBAR:
|
||||
delta_y = -delta_y
|
||||
|
||||
self._last_mouse_y = mouse_pos.y
|
||||
self._velocity_y = delta_y # Update velocity during drag
|
||||
else:
|
||||
elif rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT):
|
||||
self._scroll_state = ScrollState.IDLE
|
||||
|
||||
# Handle mouse wheel scrolling
|
||||
@@ -73,3 +83,6 @@ class GuiScrollPanel:
|
||||
self._offset.y = max(min(self._offset.y, 0), -max_scroll_y)
|
||||
|
||||
return self._offset
|
||||
|
||||
def is_click_valid(self) -> bool:
|
||||
return self._scroll_state == ScrollState.IDLE and not self._is_dragging and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT)
|
||||
|
||||
@@ -0,0 +1,481 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
|
||||
from dbus_next.aio import MessageBus
|
||||
from dbus_next import BusType, Variant, Message
|
||||
from dbus_next.errors import DBusError
|
||||
from dbus_next.constants import MessageType
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
|
||||
# NetworkManager constants
|
||||
NM = "org.freedesktop.NetworkManager"
|
||||
NM_PATH = '/org/freedesktop/NetworkManager'
|
||||
NM_IFACE = 'org.freedesktop.NetworkManager'
|
||||
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings'
|
||||
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings'
|
||||
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection'
|
||||
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless'
|
||||
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
|
||||
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
|
||||
|
||||
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8
|
||||
|
||||
# NetworkManager device states
|
||||
class NMDeviceState(IntEnum):
|
||||
DISCONNECTED = 30
|
||||
PREPARE = 40
|
||||
NEED_AUTH = 60
|
||||
IP_CONFIG = 70
|
||||
ACTIVATED = 100
|
||||
|
||||
class SecurityType(IntEnum):
|
||||
OPEN = 0
|
||||
WPA = 1
|
||||
WPA2 = 2
|
||||
WPA3 = 3
|
||||
UNSUPPORTED = 4
|
||||
|
||||
@dataclass
|
||||
class NetworkInfo:
|
||||
ssid: str
|
||||
strength: int
|
||||
is_connected: bool
|
||||
security_type: SecurityType
|
||||
path: str
|
||||
bssid: str
|
||||
# saved_path: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class WifiManagerCallbacks:
|
||||
need_auth: Callable[[], None] | None = None
|
||||
activated: Callable[[], None] | None = None
|
||||
forgotten: Callable[[], None] | None = None
|
||||
|
||||
|
||||
class WifiManager:
|
||||
def __init__(self, callbacks):
|
||||
self.callbacks: WifiManagerCallbacks = callbacks
|
||||
self.networks: list[NetworkInfo] = []
|
||||
self.bus: MessageBus = None
|
||||
self.device_path: str = ""
|
||||
self.device_proxy = None
|
||||
self.saved_connections: dict[str, str] = {}
|
||||
self.active_ap_path: str = ""
|
||||
self.scan_task: asyncio.Task | None = None
|
||||
self.running: bool = True
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Connect to the DBus system bus."""
|
||||
try:
|
||||
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||
if not await self._find_wifi_device():
|
||||
raise ValueError("No Wi-Fi device found")
|
||||
await self._setup_signals(self.device_path)
|
||||
|
||||
self.active_ap_path = await self.get_active_access_point()
|
||||
self.saved_connections = await self._get_saved_connections()
|
||||
self.scan_task = asyncio.create_task(self._periodic_scan())
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Failed to connect to DBus: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
cloudlog.error(f"Unexpected error during connect: {e}")
|
||||
raise
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
self.running = False
|
||||
if self.scan_task:
|
||||
self.scan_task.cancel()
|
||||
try:
|
||||
await self.scan_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
if self.bus:
|
||||
await self.bus.disconnect()
|
||||
|
||||
async def request_scan(self) -> None:
|
||||
try:
|
||||
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
|
||||
await interface.call_request_scan({})
|
||||
except DBusError as e:
|
||||
cloudlog.warning(f"Scan request failed: {str(e)}")
|
||||
|
||||
async def get_active_access_point(self):
|
||||
try:
|
||||
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE)
|
||||
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint')
|
||||
return ap_path.value
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Error fetching active access point: {str(e)}")
|
||||
return ''
|
||||
|
||||
async def forget_connection(self, ssid: str) -> bool:
|
||||
path = self.saved_connections.get(ssid)
|
||||
if not path:
|
||||
return False
|
||||
|
||||
try:
|
||||
nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE)
|
||||
await nm_iface.call_delete()
|
||||
return True
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}")
|
||||
return False
|
||||
|
||||
async def activate_connection(self, ssid: str) -> bool:
|
||||
connection_path = self.saved_connections.get(ssid)
|
||||
if not connection_path:
|
||||
return False
|
||||
try:
|
||||
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
|
||||
await nm_iface.call_activate_connection(connection_path, self.device_path, "/")
|
||||
return True
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}")
|
||||
return False
|
||||
|
||||
async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None:
|
||||
"""Connect to a selected Wi-Fi network."""
|
||||
try:
|
||||
connection = {
|
||||
'connection': {
|
||||
'type': Variant('s', '802-11-wireless'),
|
||||
'uuid': Variant('s', str(uuid.uuid4())),
|
||||
'id': Variant('s', ssid),
|
||||
'autoconnect-retries': Variant('i', 0),
|
||||
},
|
||||
'802-11-wireless': {
|
||||
'ssid': Variant('ay', ssid.encode('utf-8')),
|
||||
'hidden': Variant('b', is_hidden),
|
||||
'mode': Variant('s', 'infrastructure'),
|
||||
},
|
||||
'ipv4': {'method': Variant('s', 'auto')},
|
||||
'ipv6': {'method': Variant('s', 'ignore')},
|
||||
}
|
||||
|
||||
if bssid:
|
||||
connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8'))
|
||||
|
||||
if password:
|
||||
connection['802-11-wireless-security'] = {
|
||||
'key-mgmt': Variant('s', 'wpa-psk'),
|
||||
'auth-alg': Variant('s', 'open'),
|
||||
'psk': Variant('s', password),
|
||||
}
|
||||
|
||||
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
|
||||
await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/")
|
||||
await self._update_connection_status()
|
||||
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Error connecting to network: {e}")
|
||||
|
||||
def is_saved(self, ssid: str) -> bool:
|
||||
return ssid in self.saved_connections
|
||||
|
||||
async def _find_wifi_device(self) -> bool:
|
||||
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
|
||||
devices = await nm_iface.get_devices()
|
||||
|
||||
for device_path in devices:
|
||||
device = await self.bus.introspect(NM, device_path)
|
||||
device_proxy = self.bus.get_proxy_object(NM, device_path, device)
|
||||
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE)
|
||||
device_type = await device_interface.get_device_type() # type: ignore[attr-defined]
|
||||
if device_type == 2: # Wi-Fi device
|
||||
self.device_path = device_path
|
||||
self.device_proxy = device_proxy
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def _periodic_scan(self):
|
||||
while self.running:
|
||||
try:
|
||||
await self.request_scan()
|
||||
await self._get_available_networks()
|
||||
await asyncio.sleep(30)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Scan failed: {e}")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _setup_signals(self, device_path: str) -> None:
|
||||
rules = [
|
||||
f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'",
|
||||
f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'",
|
||||
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'",
|
||||
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'",
|
||||
]
|
||||
for rule in rules:
|
||||
await self._add_match_rule(rule)
|
||||
|
||||
# Set up signal handlers
|
||||
self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed)
|
||||
self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed)
|
||||
|
||||
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
|
||||
settings_iface.on_new_connection(self._on_new_connection)
|
||||
settings_iface.on_connection_removed(self._on_connection_removed)
|
||||
|
||||
def _on_properties_changed(self, interface: str, changed: dict, invalidated: list):
|
||||
# print("property changed", interface, changed, invalidated)
|
||||
if 'LastScan' in changed:
|
||||
asyncio.create_task(self._get_available_networks())
|
||||
elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed:
|
||||
self.active_ap_path = changed["ActiveAccessPoint"].value
|
||||
asyncio.create_task(self._get_available_networks())
|
||||
|
||||
def _on_state_changed(self, new_state: int, old_state: int, reason: int):
|
||||
print(f"State changed: {old_state} -> {new_state}, reason: {reason}")
|
||||
if new_state == NMDeviceState.ACTIVATED:
|
||||
if self.callbacks.activated:
|
||||
self.callbacks.activated()
|
||||
asyncio.create_task(self._update_connection_status())
|
||||
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH):
|
||||
for network in self.networks:
|
||||
network.is_connected = False
|
||||
if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth:
|
||||
self.callbacks.need_auth()
|
||||
|
||||
def _on_new_connection(self, path: str) -> None:
|
||||
"""Callback for NewConnection signal."""
|
||||
print(f"New connection added: {path}")
|
||||
asyncio.create_task(self._add_saved_connection(path))
|
||||
|
||||
def _on_connection_removed(self, path: str) -> None:
|
||||
"""Callback for ConnectionRemoved signal."""
|
||||
print(f"Connection removed: {path}")
|
||||
for ssid, p in list(self.saved_connections.items()):
|
||||
if path == p:
|
||||
del self.saved_connections[ssid]
|
||||
if self.callbacks.forgotten:
|
||||
self.callbacks.forgotten()
|
||||
break
|
||||
|
||||
async def _add_saved_connection(self, path: str) -> None:
|
||||
"""Add a new saved connection to the dictionary."""
|
||||
try:
|
||||
settings = await self._get_connection_settings(path)
|
||||
if ssid := self._extract_ssid(settings):
|
||||
self.saved_connections[ssid] = path
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Failed to add connection {path}: {e}")
|
||||
|
||||
def _extract_ssid(self, settings: dict) -> str | None:
|
||||
"""Extract SSID from connection settings."""
|
||||
ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value
|
||||
return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None
|
||||
|
||||
async def _update_connection_status(self):
|
||||
self.active_ap_path = await self.get_active_access_point()
|
||||
await self._get_available_networks()
|
||||
|
||||
async def _add_match_rule(self, rule):
|
||||
"""Add a match rule on the bus."""
|
||||
reply = await self.bus.call(
|
||||
Message(
|
||||
message_type=MessageType.METHOD_CALL,
|
||||
destination='org.freedesktop.DBus',
|
||||
interface="org.freedesktop.DBus",
|
||||
path='/org/freedesktop/DBus',
|
||||
member='AddMatch',
|
||||
signature='s',
|
||||
body=[rule],
|
||||
)
|
||||
)
|
||||
|
||||
assert reply.message_type == MessageType.METHOD_RETURN
|
||||
return reply
|
||||
|
||||
async def _get_available_networks(self):
|
||||
"""Get a list of available networks via NetworkManager."""
|
||||
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
|
||||
access_points = await wifi_iface.get_access_points()
|
||||
network_dict = {}
|
||||
for ap_path in access_points:
|
||||
try:
|
||||
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE)
|
||||
properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint')
|
||||
ssid_variant = properties['Ssid'].value
|
||||
ssid = ''.join(chr(byte) for byte in ssid_variant)
|
||||
if not ssid:
|
||||
continue
|
||||
|
||||
bssid = properties.get('HwAddress', Variant('s', '')).value
|
||||
strength = properties['Strength'].value
|
||||
flags = properties['Flags'].value
|
||||
wpa_flags = properties['WpaFlags'].value
|
||||
rsn_flags = properties['RsnFlags'].value
|
||||
existing_network = network_dict.get(ssid)
|
||||
if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)):
|
||||
network_dict[ssid] = NetworkInfo(
|
||||
ssid=ssid,
|
||||
strength=strength,
|
||||
security_type=self._get_security_type(flags, wpa_flags, rsn_flags),
|
||||
path=ap_path,
|
||||
bssid=bssid,
|
||||
is_connected=self.active_ap_path == ap_path,
|
||||
)
|
||||
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Error fetching networks: {e}")
|
||||
except Exception as e:
|
||||
cloudlog.error({e})
|
||||
|
||||
self.networks = sorted(
|
||||
network_dict.values(),
|
||||
key=lambda network: (
|
||||
not network.is_connected,
|
||||
-network.strength, # Higher signal strength first
|
||||
network.ssid.lower(),
|
||||
),
|
||||
)
|
||||
|
||||
async def _get_connection_settings(self, path):
|
||||
"""Fetch connection settings for a specific connection path."""
|
||||
try:
|
||||
connection_proxy = await self.bus.introspect(NM, path)
|
||||
connection = self.bus.get_proxy_object(NM, path, connection_proxy)
|
||||
settings = connection.get_interface(NM_CONNECTION_IFACE)
|
||||
return await settings.call_get_settings()
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Failed to get settings for {path}: {str(e)}")
|
||||
return {}
|
||||
|
||||
async def _process_chunk(self, paths_chunk):
|
||||
"""Process a chunk of connection paths."""
|
||||
tasks = [self._get_connection_settings(path) for path in paths_chunk]
|
||||
return await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
async def _get_saved_connections(self) -> dict[str, str]:
|
||||
try:
|
||||
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
|
||||
connection_paths = await settings_iface.call_list_connections()
|
||||
saved_ssids: dict[str, str] = {}
|
||||
batch_size = 20
|
||||
for i in range(0, len(connection_paths), batch_size):
|
||||
chunk = connection_paths[i : i + batch_size]
|
||||
results = await self._process_chunk(chunk)
|
||||
for path, config in zip(chunk, results, strict=True):
|
||||
if isinstance(config, dict) and '802-11-wireless' in config:
|
||||
if ssid := self._extract_ssid(config):
|
||||
saved_ssids[ssid] = path
|
||||
return saved_ssids
|
||||
except DBusError as e:
|
||||
cloudlog.error(f"Error fetching saved connections: {str(e)}")
|
||||
return {}
|
||||
|
||||
async def _get_interface(self, bus_name: str, path: str, name: str):
|
||||
introspection = await self.bus.introspect(bus_name, path)
|
||||
proxy = self.bus.get_proxy_object(bus_name, path, introspection)
|
||||
return proxy.get_interface(name)
|
||||
|
||||
def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType:
|
||||
"""Determine the security type based on flags."""
|
||||
if flags == 0 and not (wpa_flags or rsn_flags):
|
||||
return SecurityType.OPEN
|
||||
if rsn_flags & 0x200: # SAE (WPA3 Personal)
|
||||
return SecurityType.WPA3
|
||||
if rsn_flags: # RSN indicates WPA2 or higher
|
||||
return SecurityType.WPA2
|
||||
if wpa_flags: # WPA flags indicate WPA
|
||||
return SecurityType.WPA
|
||||
return SecurityType.UNSUPPORTED
|
||||
|
||||
|
||||
class WifiManagerWrapper:
|
||||
def __init__(self):
|
||||
self._manager: WifiManager | None = None
|
||||
self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks()
|
||||
|
||||
self._thread = threading.Thread(target=self._run, daemon=True)
|
||||
self._loop: asyncio.EventLoop | None = None
|
||||
self._running = False
|
||||
|
||||
def set_callbacks(self, callbacks: WifiManagerCallbacks):
|
||||
self._callbacks = callbacks
|
||||
|
||||
def start(self) -> None:
|
||||
if not self._running:
|
||||
self._thread.start()
|
||||
while self._thread is not None and not self._running:
|
||||
time.sleep(0.1)
|
||||
|
||||
def _run(self):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
|
||||
try:
|
||||
self._manager = WifiManager(self._callbacks)
|
||||
self._running = True
|
||||
self._loop.run_forever()
|
||||
except Exception as e:
|
||||
cloudlog.error(f"Error in WifiManagerWrapper thread: {e}")
|
||||
finally:
|
||||
if self._loop.is_running():
|
||||
self._loop.stop()
|
||||
self._running = False
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self._running:
|
||||
if self._manager is not None:
|
||||
self._run_coroutine(self._manager.shutdown())
|
||||
if self._loop and self._loop.is_running():
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._thread.join(timeout=2.0)
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def networks(self) -> list[NetworkInfo]:
|
||||
"""Get the current list of networks."""
|
||||
return self._manager.networks if self._manager else []
|
||||
|
||||
def is_saved(self, ssid: str) -> bool:
|
||||
"""Check if a network is saved."""
|
||||
return self._manager.is_saved(ssid) if self._manager else False
|
||||
|
||||
def connect(self):
|
||||
"""Connect to DBus and start Wi-Fi scanning."""
|
||||
if not self._manager:
|
||||
return
|
||||
self._run_coroutine(self._manager.connect())
|
||||
|
||||
def request_scan(self):
|
||||
"""Request a scan for Wi-Fi networks."""
|
||||
if not self._manager:
|
||||
return
|
||||
self._run_coroutine(self._manager.request_scan())
|
||||
|
||||
def forget_connection(self, ssid: str):
|
||||
"""Forget a saved Wi-Fi connection."""
|
||||
if not self._manager:
|
||||
return
|
||||
self._run_coroutine(self._manager.forget_connection(ssid))
|
||||
|
||||
def activate_connection(self, ssid: str):
|
||||
"""Activate an existing Wi-Fi connection."""
|
||||
if not self._manager:
|
||||
return
|
||||
self._run_coroutine(self._manager.activate_connection(ssid))
|
||||
|
||||
def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False):
|
||||
"""Connect to a Wi-Fi network."""
|
||||
if not self._manager:
|
||||
return
|
||||
self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden))
|
||||
|
||||
def _run_coroutine(self, coro):
|
||||
"""Run a coroutine in the async thread."""
|
||||
if not self._running or not self._loop:
|
||||
cloudlog.error("WifiManager thread is not running")
|
||||
return
|
||||
asyncio.run_coroutine_threadsafe(coro, self._loop)
|
||||
@@ -44,25 +44,28 @@ keyboard_layouts = {
|
||||
class Keyboard:
|
||||
def __init__(self, max_text_size: int = 255):
|
||||
self._layout = keyboard_layouts["lowercase"]
|
||||
self._input_text = ""
|
||||
self._max_text_size = max_text_size
|
||||
self._string_pointer = rl.ffi.new("char[]", max_text_size)
|
||||
self._input_text = ""
|
||||
self._clear()
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return self._input_text
|
||||
|
||||
def clear(self):
|
||||
self._input_text = ""
|
||||
def text(self):
|
||||
result = rl.ffi.string(self._string_pointer).decode("utf-8")
|
||||
self._clear()
|
||||
return result
|
||||
|
||||
def render(self, rect, title, sub_title):
|
||||
gui_label(rl.Rectangle(rect.x, rect.y, rect.width, 95), title, 90)
|
||||
gui_label(rl.Rectangle(rect.x, rect.y + 95, rect.width, 60), sub_title, 55, rl.GRAY)
|
||||
if gui_button(rl.Rectangle(rect.x + rect.width - 300, rect.y, 300, 100), "Cancel"):
|
||||
return -1
|
||||
self._clear()
|
||||
return 0
|
||||
|
||||
# Text box for input
|
||||
rl.gui_text_box(rl.Rectangle(rect.x, rect.y + 160, rect.width, 100), self._input_text, self._max_text_size, True)
|
||||
|
||||
self._sync_string_pointer()
|
||||
rl.gui_text_box(rl.Rectangle(rect.x, rect.y + 160, rect.width, 100), self._string_pointer, self._max_text_size, True)
|
||||
self._input_text = rl.ffi.string(self._string_pointer).decode("utf-8")
|
||||
h_space, v_space = 15, 15
|
||||
row_y_start = rect.y + 300 # Starting Y position for the first row
|
||||
key_height = (rect.height - 300 - 3 * v_space) / 4
|
||||
@@ -87,7 +90,7 @@ class Keyboard:
|
||||
else:
|
||||
self.handle_key_press(key)
|
||||
|
||||
return 0
|
||||
return -1
|
||||
|
||||
def handle_key_press(self, key):
|
||||
if key in (SHIFT_DOWN_KEY, ABC_KEY):
|
||||
@@ -102,3 +105,14 @@ class Keyboard:
|
||||
self._input_text = self._input_text[:-1]
|
||||
elif key != BACKSPACE_KEY and len(self._input_text) < self._max_text_size:
|
||||
self._input_text += key
|
||||
|
||||
def _clear(self):
|
||||
self._input_text = ''
|
||||
self._string_pointer[0] = b'\0'
|
||||
|
||||
def _sync_string_pointer(self):
|
||||
"""Sync the C-string pointer with the internal Python string."""
|
||||
encoded = self._input_text.encode("utf-8")[:self._max_text_size - 1] # Leave room for the null terminator
|
||||
buffer = rl.ffi.buffer(self._string_pointer)
|
||||
buffer[:len(encoded)] = encoded
|
||||
self._string_pointer[len(encoded)] = b'\0' # Null terminator
|
||||
|
||||
@@ -0,0 +1,172 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
import pyray as rl
|
||||
from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper
|
||||
from openpilot.system.ui.lib.application import gui_app
|
||||
from openpilot.system.ui.lib.button import gui_button
|
||||
from openpilot.system.ui.lib.label import gui_label
|
||||
from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel
|
||||
from openpilot.system.ui.widgets.keyboard import Keyboard
|
||||
from openpilot.system.ui.widgets.confirm_dialog import confirm_dialog
|
||||
|
||||
NM_DEVICE_STATE_NEED_AUTH = 60
|
||||
ITEM_HEIGHT = 160
|
||||
|
||||
|
||||
@dataclass
|
||||
class StateIdle:
|
||||
action: Literal["idle"] = "idle"
|
||||
|
||||
@dataclass
|
||||
class StateConnecting:
|
||||
network: NetworkInfo
|
||||
action: Literal["connecting"] = "connecting"
|
||||
|
||||
@dataclass
|
||||
class StateNeedsAuth:
|
||||
network: NetworkInfo
|
||||
action: Literal["needs_auth"] = "needs_auth"
|
||||
|
||||
@dataclass
|
||||
class StateShowForgetConfirm:
|
||||
network: NetworkInfo
|
||||
action: Literal["show_forget_confirm"] = "show_forget_confirm"
|
||||
|
||||
@dataclass
|
||||
class StateForgetting:
|
||||
network: NetworkInfo
|
||||
action: Literal["forgetting"] = "forgetting"
|
||||
|
||||
UIState = StateIdle | StateConnecting | StateNeedsAuth | StateShowForgetConfirm | StateForgetting
|
||||
|
||||
|
||||
class WifiManagerUI:
|
||||
def __init__(self, wifi_manager: WifiManagerWrapper):
|
||||
self.state: UIState = StateIdle()
|
||||
self.btn_width = 200
|
||||
self.scroll_panel = GuiScrollPanel()
|
||||
self.keyboard = Keyboard()
|
||||
|
||||
self.wifi_manager = wifi_manager
|
||||
self.wifi_manager.set_callbacks(WifiManagerCallbacks(self._on_need_auth, self._on_activated, self._on_forgotten))
|
||||
self.wifi_manager.start()
|
||||
self.wifi_manager.connect()
|
||||
|
||||
def render(self, rect: rl.Rectangle):
|
||||
if not self.wifi_manager.networks:
|
||||
gui_label(rect, "Scanning Wi-Fi networks...", 72, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
|
||||
return
|
||||
|
||||
match self.state:
|
||||
case StateNeedsAuth(network):
|
||||
result = self.keyboard.render(rect, "Enter password", f"for {network.ssid}")
|
||||
if result == 1:
|
||||
self.connect_to_network(network, self.keyboard.text)
|
||||
elif result == 0:
|
||||
self.state = StateIdle()
|
||||
|
||||
case StateShowForgetConfirm(network):
|
||||
result = confirm_dialog(rect, f'Forget Wi-Fi Network "{network.ssid}"?', "Forget")
|
||||
if result == 1:
|
||||
self.forget_network(network)
|
||||
elif result == 0:
|
||||
self.state = StateIdle()
|
||||
|
||||
case _:
|
||||
self._draw_network_list(rect)
|
||||
|
||||
def _draw_network_list(self, rect: rl.Rectangle):
|
||||
content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * ITEM_HEIGHT)
|
||||
offset = self.scroll_panel.handle_scroll(rect, content_rect)
|
||||
clicked = self.scroll_panel.is_click_valid()
|
||||
|
||||
rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height))
|
||||
for i, network in enumerate(self.wifi_manager.networks):
|
||||
y_offset = rect.y + i * ITEM_HEIGHT + offset.y
|
||||
item_rect = rl.Rectangle(rect.x, y_offset, rect.width, ITEM_HEIGHT)
|
||||
if not rl.check_collision_recs(item_rect, rect):
|
||||
continue
|
||||
|
||||
self._draw_network_item(item_rect, network, clicked)
|
||||
if i < len(self.wifi_manager.networks) - 1:
|
||||
line_y = int(item_rect.y + item_rect.height - 1)
|
||||
rl.draw_line(int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY)
|
||||
|
||||
rl.end_scissor_mode()
|
||||
|
||||
def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool):
|
||||
label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, ITEM_HEIGHT)
|
||||
state_rect = rl.Rectangle(rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, ITEM_HEIGHT)
|
||||
|
||||
gui_label(label_rect, network.ssid, 55)
|
||||
|
||||
status_text = ""
|
||||
if network.is_connected:
|
||||
status_text = "Connected"
|
||||
match self.state:
|
||||
case StateConnecting(network=connecting):
|
||||
if connecting.ssid == network.ssid:
|
||||
status_text = "CONNECTING..."
|
||||
case StateForgetting(network=forgetting):
|
||||
if forgetting.ssid == network.ssid:
|
||||
status_text = "FORGETTING..."
|
||||
if status_text:
|
||||
rl.gui_label(state_rect, status_text)
|
||||
|
||||
# If the network is saved, show the "Forget" button
|
||||
if self.wifi_manager.is_saved(network.ssid):
|
||||
forget_btn_rect = rl.Rectangle(
|
||||
rect.x + rect.width - self.btn_width,
|
||||
rect.y + (ITEM_HEIGHT - 80) / 2,
|
||||
self.btn_width,
|
||||
80,
|
||||
)
|
||||
if isinstance(self.state, StateIdle) and gui_button(forget_btn_rect, "Forget") and clicked:
|
||||
self.state = StateShowForgetConfirm(network)
|
||||
|
||||
if isinstance(self.state, StateIdle) and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect) and clicked:
|
||||
if not self.wifi_manager.is_saved(network.ssid):
|
||||
self.state = StateNeedsAuth(network)
|
||||
else:
|
||||
self.connect_to_network(network)
|
||||
|
||||
def connect_to_network(self, network: NetworkInfo, password=''):
|
||||
self.state = StateConnecting(network)
|
||||
if self.wifi_manager.is_saved(network.ssid) and not password:
|
||||
self.wifi_manager.activate_connection(network.ssid)
|
||||
else:
|
||||
self.wifi_manager.connect_to_network(network.ssid, password)
|
||||
|
||||
def forget_network(self, network: NetworkInfo):
|
||||
self.state = StateForgetting(network)
|
||||
self.wifi_manager.forget_connection(network.ssid)
|
||||
|
||||
def _on_need_auth(self):
|
||||
match self.state:
|
||||
case StateConnecting(network):
|
||||
self.state = StateNeedsAuth(network)
|
||||
|
||||
def _on_activated(self):
|
||||
if isinstance(self.state, StateConnecting):
|
||||
self.state = StateIdle()
|
||||
|
||||
def _on_forgotten(self):
|
||||
if isinstance(self.state, StateForgetting):
|
||||
self.state = StateIdle()
|
||||
|
||||
|
||||
def main():
|
||||
gui_app.init_window("Wi-Fi Manager")
|
||||
wifi_manager = WifiManagerWrapper()
|
||||
wifi_ui = WifiManagerUI(wifi_manager)
|
||||
|
||||
for _ in gui_app.render():
|
||||
wifi_ui.render(rl.Rectangle(50, 50, gui_app.width - 100, gui_app.height - 100))
|
||||
|
||||
wifi_manager.shutdown()
|
||||
gui_app.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user