Skip to content

Instantly share code, notes, and snippets.

@mirkobrombin
Created August 28, 2023 12:23
Show Gist options
  • Select an option

  • Save mirkobrombin/3cfa21a44c318e08db29fd27b6252f4a to your computer and use it in GitHub Desktop.

Select an option

Save mirkobrombin/3cfa21a44c318e08db29fd27b6252f4a to your computer and use it in GitHub Desktop.

Revisions

  1. Mirko Brombin created this gist Aug 28, 2023.
    492 changes: 492 additions & 0 deletions network.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,492 @@
    # network.py
    #
    # Copyright 2023 mirkobrombin
    # Copyright 2023 matbme
    #
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundationat version 3 of the License.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program. If not, see <http://www.gnu.org/licenses/>.

    import logging
    import time
    from gettext import gettext as _
    from operator import attrgetter
    from threading import Lock, Timer

    from gi.repository import NM, NMA4, Adw, Gtk, GLib

    from vanilla_installer.utils.run_async import RunAsync

    logger = logging.getLogger("VanillaInstaller::Network")

    # Dictionary mapping security types to a tuple containing
    # their pretty name and whether it is a secure protocol.
    # If security is None, it means that no padlock icon is shown.
    # If security is False, a warning symbol appears instead of a padlock.
    AP_SECURITY_TYPES = {
    "none": (None, None),
    "wep": (False, _("Insecure network (WEP)")),
    "wpa": (True, _("Secure network (WPA)")),
    "wpa2": (True, _("Secure network (WPA2)")),
    "sae": (True, _("Secure network (WPA3)")),
    "owe": (None, None),
    "owe_tm": (None, None),
    }

    # PyGObject-libnm doesn't seem to expose these values, so we have redefine them
    NM_802_11_AP_FLAGS_PRIVACY = 0x00000001
    NM_802_11_AP_SEC_NONE = 0x00000000

    NM_802_11_AP_SEC_KEY_MGMT_802_1X = 0x00000200
    NM_802_11_AP_SEC_KEY_MGMT_EAP_SUITE_B_192 = 0x00002000
    NM_802_11_AP_SEC_KEY_MGMT_OWE = 0x00000800
    NM_802_11_AP_SEC_KEY_MGMT_OWE_TM = 0x00001000
    NM_802_11_AP_SEC_KEY_MGMT_PSK = 0x00000100
    NM_802_11_AP_SEC_KEY_MGMT_SAE = 0x00000400


    @Gtk.Template(resource_path="/org/vanillaos/Installer/gtk/wireless-row.ui")
    class WirelessRow(Adw.ActionRow):
    __gtype_name__ = "WirelessRow"

    signal_icon = Gtk.Template.Child()
    secure_icon = Gtk.Template.Child()
    connected_label = Gtk.Template.Child()

    def __init__(self, window, client, device: NM.DeviceWifi, ap, **kwargs):
    super().__init__(**kwargs)
    self.__window = window
    self.client = client
    self.ap = ap
    self.device = device
    self.refresh_ui()

    self.set_activatable(True)
    self.connect("activated", self.__show_connect_dialog)

    @property
    def ssid(self):
    ssid = self.ap.get_ssid()
    if ssid is not None:
    ssid = ssid.get_data().decode("utf-8")
    else:
    ssid = ""
    return ssid

    @property
    def signal_strength(self):
    return self.ap.get_strength()

    @property
    def connected(self):
    active_connection = self.device.get_active_connection()
    if active_connection is not None:
    if active_connection.get_id() == self.ssid:
    return True
    return False

    def refresh_ui(self):
    # We use the same strength logic as gnome-control-center
    strength = self.signal_strength
    if strength < 20:
    icon_name = "network-wireless-signal-none-symbolic"
    elif strength < 40:
    icon_name = "network-wireless-signal-weak-symbolic"
    elif strength < 50:
    icon_name = "network-wireless-signal-ok-symbolic"
    elif strength < 80:
    icon_name = "network-wireless-signal-good-symbolic"
    else:
    icon_name = "network-wireless-signal-excellent-symbolic"

    self.set_title(self.ssid)
    self.signal_icon.set_from_icon_name(icon_name)
    secure, tooltip = self.__get_security()
    if secure is not None:
    if not secure:
    self.secure_icon.set_from_icon_name("warning-small-symbolic")
    else:
    self.secure_icon.set_from_icon_name(
    "network-wireless-encrypted-symbolic"
    )

    self.secure_icon.set_visible(secure is not None)
    if tooltip is not None:
    self.secure_icon.set_tooltip_text(tooltip)

    self.connected_label.set_visible(self.connected)

    def __get_security(self) -> tuple[bool | None, str | None]:
    flags = self.ap.get_flags()
    rsn_flags = self.ap.get_rsn_flags()
    wpa_flags = self.ap.get_wpa_flags()

    # Copying logic used in gnome-control-center because this is a mess
    if (
    not (flags & NM_802_11_AP_FLAGS_PRIVACY)
    and wpa_flags == NM_802_11_AP_SEC_NONE
    and rsn_flags == NM_802_11_AP_SEC_NONE
    ):
    return AP_SECURITY_TYPES["none"]
    elif (
    (flags & NM_802_11_AP_FLAGS_PRIVACY)
    and wpa_flags == NM_802_11_AP_SEC_NONE
    and rsn_flags == NM_802_11_AP_SEC_NONE
    ):
    return AP_SECURITY_TYPES["wep"]
    elif (
    (flags & NM_802_11_AP_FLAGS_PRIVACY)
    and wpa_flags != NM_802_11_AP_SEC_NONE
    and rsn_flags != NM_802_11_AP_SEC_NONE
    ):
    return AP_SECURITY_TYPES["wpa"]
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_SAE:
    return AP_SECURITY_TYPES["sae"]
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE:
    return AP_SECURITY_TYPES["owe"]
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE_TM:
    return AP_SECURITY_TYPES["owe_tm"]
    else:
    return AP_SECURITY_TYPES["wpa2"]

    @property
    def __key_mgmt(self):
    # Key management used for the connection. One of "none" (WEP or no
    # password protection), "ieee8021x" (Dynamic WEP), "owe" (Opportunistic
    # Wireless Encryption), "wpa-psk" (WPA2 + WPA3 personal), "sae" (WPA3
    # personal only), "wpa-eap" (WPA2 + WPA3 enterprise) or
    # "wpa-eap-suite-b-192" (WPA3 enterprise only).
    rsn_flags = self.ap.get_rsn_flags()
    wpa_flags = self.ap.get_wpa_flags()

    if wpa_flags == NM_802_11_AP_SEC_NONE and rsn_flags == NM_802_11_AP_SEC_NONE:
    return "none"
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_802_1X:
    return "ieee8021x"
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_EAP_SUITE_B_192:
    return "wpa-eap-suite-b-192"
    elif (
    rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE
    or rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE_TM
    ):
    return "owe"
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_PSK:
    return "wpa-psk"
    elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_SAE:
    return "sae"

    def __show_connect_dialog(self, data):
    dialog = NMA4.WifiDialog.new(
    self.client, self.__construct_connection(), self.device, self.ap, False
    )
    dialog.set_modal(True)
    dialog.set_transient_for(self.__window)

    dialog.connect("response", self.__on_dialog_response)

    dialog.show()

    def __on_dialog_response(self, dialog, response_id):
    def connect_cb(client, result, data):
    try:
    ac = client.add_and_activate_connection_finish(result)
    logger.debug("ActiveConnection {}".format(ac.get_path()))
    except Exception as e:
    logger.error("Error:", e)

    if response_id == -6:
    dialog.close()
    elif response_id == -5:
    conn, _, _ = dialog.get_connection()
    self.client.add_and_activate_connection_async(
    conn, self.device, self.ap.get_path(), None, connect_cb, None
    )
    dialog.close()

    def __construct_connection(self):
    connection = NM.SimpleConnection.new()
    s_con = NM.SettingConnection.new()
    s_con.set_property(NM.SETTING_CONNECTION_ID, self.ssid)
    s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless")
    s_wifi = NM.SettingWireless.new()
    s_wifi.set_property(NM.SETTING_WIRELESS_SSID, self.ap.get_ssid())
    s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure")
    s_wsec = NM.SettingWirelessSecurity.new()
    s_wsec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, self.__key_mgmt)
    s_ip4 = NM.SettingIP4Config.new()
    s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
    s_ip6 = NM.SettingIP6Config.new()
    s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")

    connection.add_setting(s_con)
    connection.add_setting(s_wifi)
    connection.add_setting(s_wsec)
    connection.add_setting(s_ip4)
    connection.add_setting(s_ip6)

    return connection


    @Gtk.Template(resource_path="/org/vanillaos/Installer/gtk/default-network.ui")
    class VanillaDefaultNetwork(Adw.Bin):
    __gtype_name__ = "VanillaDefaultNetwork"

    wired_group = Gtk.Template.Child()
    wireless_group = Gtk.Template.Child()
    hidden_network_row = Gtk.Template.Child()
    proxy_settings_row = Gtk.Template.Child()
    advanced_group = Gtk.Template.Child()
    btn_next = Gtk.Template.Child()

    def __init__(self, window, distro_info, key, step, **kwargs):
    super().__init__(**kwargs)
    self.__window = window
    self.__distro_info = distro_info
    self.__key = key
    self.__step = step
    self.__nm_client = NM.Client.new()

    self.__devices = []
    self.__wired_children = []
    self.__wireless_children = {}

    self.__last_wifi_scan = 0

    # Prevent concurrency issues when re-scanning Wi-Fi devices.
    # Since we reload the list every time there's a state change,
    # there's a high change that it coincides with a periodic
    # refresh operation.
    self.__wifi_lock = Lock()

    # Since we have a dedicated page for checking connectivity,
    # we only need to make sure the user has some type of
    # connection set up, be it wired or wireless.
    self.has_eth_connection = False
    self.has_wifi_connection = False

    self.__get_network_devices()
    self.__start_auto_refresh()

    # TODO: Remove once implemented
    self.advanced_group.set_visible(False)

    self.__nm_client.connect("device-added", self.__add_new_device)
    self.__nm_client.connect("device-added", self.__remove_device)
    self.btn_next.connect("clicked", self.__window.next)
    self.connect("realize", self.__try_skip_page)

    def __try_skip_page(self, data):
    # Skip page if already connected to the internet
    if self.has_eth_connection or self.has_wifi_connection:
    self.__window.next()

    @property
    def step_id(self):
    return self.__key

    def get_finals(self):
    return {}

    def set_btn_next(self, state: bool):
    if state:
    if not self.btn_next.has_css_class("suggested-action"):
    self.btn_next.add_css_class("suggested-action")
    self.btn_next.set_sensitive(True)
    else:
    if self.btn_next.has_css_class("suggested-action"):
    self.btn_next.remove_css_class("suggested-action")
    self.btn_next.set_sensitive(False)

    def __get_network_devices(self):
    devices = self.__nm_client.get_devices()
    eth_devices = 0
    wifi_devices = 0
    for device in devices:
    if device.is_real():
    device_type = device.get_device_type()
    if device_type == NM.DeviceType.ETHERNET:
    self.__add_ethernet_connection(device)
    eth_devices += 1
    elif device_type == NM.DeviceType.WIFI:
    device.connect("state-changed", self.__on_state_changed)
    self.has_wifi_connection = (
    device.get_active_connection() is not None
    )
    self.__refresh_wifi_list(device)
    wifi_devices += 1
    else:
    continue

    self.__devices.append(device)

    self.wired_group.set_visible(eth_devices > 0)
    self.wireless_group.set_visible(wifi_devices > 0)

    def __add_new_device(self, client, device):
    self.__devices.append(device)

    def __remove_device(self, client, device):
    self.__devices.remove(device)

    def __on_state_changed(self, device, new_state, old_state, reason):
    self.has_wifi_connection = device.get_active_connection() is not None
    self.__refresh()

    def __refresh(self):
    for child in self.__wired_children:
    self.wired_group.remove(child)
    self.__wired_children = []

    for device in self.__devices:
    device_type = device.get_device_type()
    if device_type == NM.DeviceType.ETHERNET:
    self.__add_ethernet_connection(device)
    elif device_type == NM.DeviceType.WIFI:
    self.__scan_wifi(device)

    self.set_btn_next(self.has_eth_connection or self.has_wifi_connection)

    def __start_auto_refresh(self):
    def run_async():
    while True:
    GLib.idle_add(self.__refresh)
    time.sleep(10)

    RunAsync(run_async, None)

    def __device_status(self, conn: NM.Device):
    connected = False
    match conn.get_state():
    case NM.DeviceState.ACTIVATED:
    status = _("Connected")
    connected = True
    case NM.DeviceState.NEED_AUTH:
    status = _("Authentication required")
    case [
    NM.DeviceState.PREPARE,
    NM.DeviceState.CONFIG,
    NM.DeviceState.IP_CONFIG,
    NM.DeviceState.IP_CHECK,
    NM.DeviceState.SECONDARIES,
    ]:
    status = _("Connecting")
    case NM.DeviceState.DISCONNECTED:
    status = _("Disconnected")
    case NM.DeviceState.DEACTIVATING:
    status = _("Disconnecting")
    case NM.DeviceState.FAILED:
    status = _("Connection Failed")
    case NM.DeviceState.UNKNOWN:
    status = _("Status Unknown")
    case NM.DeviceState.UNMANAGED:
    status = _("Unmanaged")
    case NM.DeviceState.UNAVAILABLE:
    status = _("Unavailable")

    return status, connected

    def __add_ethernet_connection(self, conn: NM.DeviceEthernet):
    status, connected = self.__device_status(conn)
    if connected:
    status += f" - {conn.get_speed()} Mbps"
    self.has_eth_connection = True
    else:
    self.has_eth_connection = False

    # Wired devices with no cable plugged in are shown as unavailable
    if conn.get_state() == NM.DeviceState.UNAVAILABLE:
    status = _("Cable Unplugged")

    eth_conn = Adw.ActionRow(title=status)
    self.wired_group.add(eth_conn)
    self.__wired_children.append(eth_conn)

    def __refresh_wifi_list(self, conn: NM.DeviceWifi):
    while conn.get_last_scan() == self.__last_wifi_scan:
    time.sleep(0.25)

    networks = {}
    for ap in conn.get_access_points():
    ssid = ap.get_ssid()
    if ssid is None:
    continue

    ssid = ssid.get_data().decode("utf-8")
    if ssid in networks.keys():
    networks[ssid].append(ap)
    else:
    networks[ssid] = [ap]

    self.__wifi_lock.acquire()

    # Invalidate current list
    for ssid, (child, clean) in self.__wireless_children.items():
    self.__wireless_children[ssid] = (child, True)

    for ssid, aps in networks.items():
    max_strength = -1
    best_ap = None
    for ap in aps:
    ap_strength = ap.get_strength()
    if ap_strength > max_strength:
    max_strength = ap_strength
    best_ap = ap

    # Try to re-use entries with the same SSID
    if ssid in self.__wireless_children.keys():
    child = self.__wireless_children[ssid][0]
    child.ap = best_ap
    child.refresh_ui()
    self.__wireless_children[ssid] = (child, False)
    continue

    # Create new row if SSID is new
    wifi_network = WirelessRow(self.__window, self.__nm_client, conn, best_ap)
    self.wireless_group.add(wifi_network)
    self.__wireless_children[ssid] = (wifi_network, False)

    # Remove invalid rows
    invalid_ssids = []
    for ssid, (child, clean) in self.__wireless_children.items():
    self.wireless_group.remove(child)
    if clean:
    invalid_ssids.append(ssid)

    for ssid in invalid_ssids:
    del self.__wireless_children[ssid]

    for row in self.__sorted_wireless_children:
    self.wireless_group.add(row)

    self.__wifi_lock.release()

    def __scan_wifi(self, conn: NM.DeviceWifi):
    self.__last_wifi_scan = conn.get_last_scan()
    conn.request_scan_async()

    t = Timer(1.5, self.__refresh_wifi_list, [conn])
    t.start()

    @property
    def __sorted_wireless_children(self):
    def multisort(xs, specs):
    for key, reverse in reversed(specs):
    xs.sort(key=attrgetter(key), reverse=reverse)
    return xs

    # 1 - Is connected
    # 2 - Signal strength
    # 3 - Alphabetically
    return multisort(
    [it[0] for it in list(self.__wireless_children.values())],
    (("connected", True), ("signal_strength", True), ("ssid", True)),
    )