""" IEEE 802.11 device identity on the host: sysfs (class net / phy80211) plus optional lspci / iw. Module name ``ieee80211_dev`` mirrors kernel-style ``80211_dev`` naming; a leading digit is invalid in Python imports. Types: WirelessInterface — static PCI/USB identity + dynamic sysfs/iw/ethtool via @property WirelessSnapshot — full host scan (interfaces + primary + timestamp) Dynamic properties (cached per instance until ``refresh_runtime()``): ``mac_address``, ``operstate``, ``is_up``, ``carrier_up``, ``bssid``, ``iface_mode`` (AP/STA/…), ``is_ap`` / ``is_sta``, ``firmware_version``, ``driver_version`` (ethtool), ``channel``, ``center_freq_mhz``, ``channel_width_mhz``, ``center1_mhz``, ``chanspec``. The sysfs ``driver`` module name remains a field. Serializes to fiber_map ``fiber_ports[].wlan``; remote capture via ``fiwi.py wlan-info-json``. """ from __future__ import annotations import os import re import subprocess import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple SYS_NET = "/sys/class/net" # Keys stored under WirelessInterface runtime blob (JSON round-trip + live discovery) _RUNTIME_JSON_KEYS = frozenset( { "mac_address", "operstate", "is_up", "carrier_up", "bssid", "iface_mode", "firmware_version", "driver_version", "channel", "center_freq_mhz", "channel_width_mhz", "center1_mhz", "chanspec", } ) def _run_capture(cmd: List[str], timeout: float = 12.0) -> str: try: p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) except (OSError, subprocess.TimeoutExpired): return "" if p.returncode != 0: return "" return p.stdout or "" def _is_wireless_iface(net_name: str) -> bool: base = os.path.join(SYS_NET, net_name) if os.path.isdir(os.path.join(base, "wireless")): return True if os.path.isdir(os.path.join(base, "phy80211")): return True uevent = os.path.join(base, "device", "uevent") if os.path.isfile(uevent): try: with open(uevent, encoding="utf-8") as f: if "DEVTYPE=wlan" in f.read(): return True except OSError: pass return False def _read_link_basename(path: str) -> str: try: target = os.readlink(path) except OSError: return "" return os.path.basename(os.path.normpath(target)) _BDF_DIR = re.compile(r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])$", re.I) def _pci_endpoint_from_device(device_realpath: str) -> Tuple[str, str, str]: cur = device_realpath pci_addr, vendor_id, device_id = "", "", "" for _ in range(14): base = os.path.basename(cur) if _BDF_DIR.match(base) and os.path.isfile(os.path.join(cur, "vendor")): pci_addr = base.lower() vendor_id, device_id = _read_pci_vendor_device(cur) break parent = os.path.dirname(cur) if parent == cur: break cur = parent return pci_addr, vendor_id, device_id def _connection_type(realpath_device: str) -> str: low = realpath_device.lower() if re.search(r"/usb[0-9]*/", low) or "/usb/usb" in low: return "USB" if re.search(r"[0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]", low, re.I): return "PCIe" return "unknown" def _read_pci_vendor_device(pci_dir: str) -> Tuple[str, str]: vend, dev = "", "" try: with open(os.path.join(pci_dir, "vendor"), encoding="utf-8") as f: vend = f.read().strip().lower() if vend.startswith("0x"): vend = vend[2:] with open(os.path.join(pci_dir, "device"), encoding="utf-8") as f: dev = f.read().strip().lower() if dev.startswith("0x"): dev = dev[2:] except OSError: pass return vend, dev def _usb_ids_from_uevent(device_dir: str) -> Tuple[str, str]: ue = os.path.join(device_dir, "uevent") vid, pid = "", "" try: with open(ue, encoding="utf-8") as f: for ln in f: if ln.startswith("PRODUCT="): rest = ln.strip().split("=", 1)[1] parts = rest.split("/") if len(parts) >= 2: vid = parts[0].lower()[-4:].rjust(4, "0") pid = parts[1].lower()[-4:].rjust(4, "0") break except OSError: pass return vid, pid def _lspci_bdf_key(segment: str) -> str: s = segment.strip().lower() if re.match(r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", s, re.I): return "0000:" + s return s def _parse_lspci_nn_wireless(stdout: str) -> Dict[str, Dict[str, str]]: by_pci: Dict[str, Dict[str, str]] = {} pci_start = re.compile( r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]|[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])\s+(.+)$", re.I, ) tail_ids = re.compile(r"\[([0-9a-f]{4}):([0-9a-f]{4})\]\s*$", re.I) for line in stdout.splitlines(): line = line.strip() m = pci_start.match(line) if not m: continue bdf_n, rest = _lspci_bdf_key(m.group(1)), m.group(2) lowrest = rest.lower() if "ethernet controller" in lowrest or "[0200]" in rest: continue if "[0280]" not in rest and "[0282]" not in rest: if not any( x in lowrest for x in ("wireless", "wi-fi", "wifi", "802.11", "wlan") ): continue elif "ethernet" in lowrest and not any( x in lowrest for x in ("wireless", "802.11", "wifi", "wi-fi", "wlan") ): continue vendor, device = "", "" tm = tail_ids.search(rest) if tm: vendor, device = tm.group(1).lower(), tm.group(2).lower() by_pci[bdf_n] = { "product": rest, "lspci_vendor_id": vendor, "lspci_device_id": device, } return by_pci def _normalize_pci(addr: str) -> str: if not addr: return "" a = addr.strip().lower() if not a.startswith("0000:") and re.match( r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", a, re.I ): a = "0000:" + a return a def _merge_lspci_into( interfaces: Dict[str, WirelessInterface], lspci_by_pci: Dict[str, Dict[str, str]], sysfs_device_paths: Dict[str, str], ) -> None: for name, wi in interfaces.items(): keys = [_normalize_pci(wi.pci_address)] sd = sysfs_device_paths.get(name, "") if sd: ep, _, _ = _pci_endpoint_from_device(sd) keys.append(_normalize_pci(ep)) for key in keys: if not key or key not in lspci_by_pci: continue merge = lspci_by_pci[key] if merge.get("product"): wi.product = merge["product"] if merge.get("lspci_vendor_id") and not wi.vendor_id: wi.vendor_id = merge["lspci_vendor_id"] if merge.get("lspci_device_id") and not wi.device_id: wi.device_id = merge["lspci_device_id"] break def _iw_iface_phy_map() -> Dict[str, str]: out = _run_capture(["iw", "dev"], timeout=8.0) if not out.strip(): return {} mapping: Dict[str, str] = {} current_phy: Optional[str] = None for raw in out.splitlines(): line = raw.rstrip() if not line.startswith("\t"): m = re.match(r"phy#(\d+)\b", line.strip()) if m: current_phy = f"phy{m.group(1)}" else: current_phy = None continue inner = line.lstrip() m = re.match(r"Interface\s+(\S+)", inner) if m and current_phy: mapping[m.group(1)] = current_phy return mapping def _bands_from_iw_phy(phyname: str) -> List[float]: if not phyname: return [] out = _run_capture(["iw", phyname, "info"], timeout=10.0) bands: List[float] = [] for line in out.splitlines(): line = line.strip() if line.startswith("Band "): try: n = int(line.split()[1]) except (ValueError, IndexError): continue if n == 1: bands.append(2.4) elif n == 2: bands.append(5.0) elif n == 3: bands.append(6.0) return sorted(set(bands)) def _attach_iw_to_interfaces(interfaces: Dict[str, WirelessInterface]) -> None: iface_to_phy = _iw_iface_phy_map() for name, wi in interfaces.items(): phy = iface_to_phy.get(name, "") if not phy: continue wi.wiphy = phy bs = _bands_from_iw_phy(phy) if bs: wi.bands_ghz = bs def _read_sysfs_trim(path: str) -> str: try: with open(path, encoding="utf-8") as f: return f.read().strip() except OSError: return "" def _parse_iw_dev_info(text: str) -> Dict[str, Any]: """Parse `iw dev info` for nl80211 type and current channel / width.""" r: Dict[str, Any] = {} if not text.strip(): return r for raw in text.splitlines(): line = raw.strip() m = re.match(r"type\s+(\S+)", line, re.I) if m: t = m.group(1).lower() if t == "ap": r["iface_mode"] = "AP" elif t == "managed": r["iface_mode"] = "STA" elif t == "monitor": r["iface_mode"] = "monitor" elif t == "mesh_point": r["iface_mode"] = "mesh" elif t == "P2P-client": r["iface_mode"] = "P2P-client" elif t == "P2P-GO": r["iface_mode"] = "P2P-GO" else: r["iface_mode"] = m.group(1) continue m = re.search(r"channel\s+(\d+)\s+\((\d+)\s+MHz\)", line, re.I) if m: try: r["channel"] = int(m.group(1)) r["center_freq_mhz"] = int(m.group(2)) except ValueError: pass m = re.search(r"width:\s*(\d+)\s*MHz", line, re.I) if m: try: r["channel_width_mhz"] = int(m.group(1)) except ValueError: pass m = re.search(r"center1:\s*(\d+)\s*MHz", line, re.I) if m: try: r["center1_mhz"] = int(m.group(1)) except ValueError: pass _fill_chanspec(r) return r def _fill_chanspec(r: Dict[str, Any]) -> None: cf = r.get("center_freq_mhz") w = r.get("channel_width_mhz") c1 = r.get("center1_mhz") if cf is not None and w is not None: r["chanspec"] = f"{cf}@{w}MHz" if c1 is not None: r["chanspec"] += f" center1={c1}" elif cf is not None: r["chanspec"] = str(cf) def _parse_iw_dev_link(text: str) -> Dict[str, Any]: """Parse `iw dev link` (STA: BSSID, freq when associated).""" r: Dict[str, Any] = {} if not text.strip(): return r for raw in text.splitlines(): line = raw.strip() m = re.match(r"Connected to\s+([0-9a-f:]+)", line, re.I) if m: r["bssid"] = m.group(1).lower() continue m = re.match(r"freq:\s*(\d+)", line, re.I) if m: try: r["center_freq_mhz"] = int(m.group(1)) except ValueError: pass _fill_chanspec(r) return r def _parse_ethtool_i(text: str) -> Dict[str, str]: out: Dict[str, str] = {} for raw in text.splitlines(): line = raw.strip() if ":" not in line: continue key, _, val = line.partition(":") key, val = key.strip().lower(), val.strip() if key == "firmware-version": out["firmware_version"] = val elif key == "version": out["driver_version"] = val elif key == "driver": out["ethtool_driver"] = val elif key == "expansion-rom-version": out["expansion_rom_version"] = val return out def _merge_runtime_dict(base: Dict[str, Any], extra: Dict[str, Any]) -> None: for k, v in extra.items(): if v is None or v == "": continue if k not in base or base[k] in (None, "", 0): base[k] = v def _collect_iface_runtime(ifname: str) -> Dict[str, Any]: """Live sysfs + iw + ethtool; merged into WirelessInterface._runtime.""" r: Dict[str, Any] = {} base = os.path.join(SYS_NET, ifname) mac = _read_sysfs_trim(os.path.join(base, "address")) if mac: r["mac_address"] = mac.lower() op = _read_sysfs_trim(os.path.join(base, "operstate")).lower() if op: r["operstate"] = op r["is_up"] = op == "up" car_raw = _read_sysfs_trim(os.path.join(base, "carrier")) if car_raw in ("0", "1"): r["carrier_up"] = car_raw == "1" info_txt = _run_capture(["iw", "dev", ifname, "info"], timeout=10.0) _merge_runtime_dict(r, _parse_iw_dev_info(info_txt)) link_txt = _run_capture(["iw", "dev", ifname, "link"], timeout=10.0) link_d = _parse_iw_dev_link(link_txt) if link_d.get("bssid"): r["bssid"] = link_d["bssid"] if "center_freq_mhz" in link_d: r["center_freq_mhz"] = link_d["center_freq_mhz"] _fill_chanspec(r) eth_txt = _run_capture(["ethtool", "-i", ifname], timeout=8.0) eth = _parse_ethtool_i(eth_txt) if eth.get("firmware_version"): r["firmware_version"] = eth["firmware_version"] if eth.get("driver_version"): r["driver_version"] = eth["driver_version"] return r def _coerce_runtime_json_types(rt: Dict[str, Any]) -> None: for k in ("channel", "center_freq_mhz", "channel_width_mhz", "center1_mhz"): if k not in rt: continue v = rt[k] if isinstance(v, (int, float)): rt[k] = int(v) elif isinstance(v, str) and v.strip().isdigit(): rt[k] = int(v.strip()) if "is_up" in rt: rt["is_up"] = bool(rt["is_up"]) if "carrier_up" in rt: cv = rt["carrier_up"] rt["carrier_up"] = None if cv is None else bool(cv) @dataclass class WirelessInterface: """One 802.11 netdev: static identity from sysfs/lspci; live link/chan/MAC via properties.""" name: str driver: str = "" pci_address: str = "" connection_type: str = "unknown" vendor_id: str = "" device_id: str = "" product: Optional[str] = None chip_label: str = "" wiphy: Optional[str] = None bands_ghz: Optional[List[float]] = None _runtime_loaded: bool = field(default=False, repr=False, compare=False) _runtime: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) def _ensure_runtime(self) -> None: if self._runtime_loaded: return self._runtime = _collect_iface_runtime(self.name) self._runtime_loaded = True def refresh_runtime(self) -> None: """Re-query sysfs / iw / ethtool (next property read uses fresh data).""" self._runtime_loaded = False self._runtime.clear() @property def mac_address(self) -> str: self._ensure_runtime() return str(self._runtime.get("mac_address", "")) @property def operstate(self) -> str: self._ensure_runtime() return str(self._runtime.get("operstate", "")) @property def is_up(self) -> bool: self._ensure_runtime() return bool(self._runtime.get("is_up", False)) @property def carrier_up(self) -> Optional[bool]: self._ensure_runtime() if "carrier_up" not in self._runtime: return None return bool(self._runtime["carrier_up"]) @property def bssid(self) -> str: self._ensure_runtime() return str(self._runtime.get("bssid", "")) @property def iface_mode(self) -> str: """nl80211 interface type: AP, STA, monitor, mesh, … (empty if unknown).""" self._ensure_runtime() return str(self._runtime.get("iface_mode", "")) @property def is_ap(self) -> bool: return self.iface_mode == "AP" @property def is_sta(self) -> bool: return self.iface_mode == "STA" @property def firmware_version(self) -> str: self._ensure_runtime() return str(self._runtime.get("firmware_version", "")) @property def driver_version(self) -> str: """Driver build/version string from ethtool when available.""" self._ensure_runtime() return str(self._runtime.get("driver_version", "")) @property def channel(self) -> Optional[int]: self._ensure_runtime() v = self._runtime.get("channel") return int(v) if isinstance(v, int) else None @property def center_freq_mhz(self) -> Optional[int]: self._ensure_runtime() v = self._runtime.get("center_freq_mhz") return int(v) if isinstance(v, int) else None @property def channel_width_mhz(self) -> Optional[int]: self._ensure_runtime() v = self._runtime.get("channel_width_mhz") return int(v) if isinstance(v, int) else None @property def center1_mhz(self) -> Optional[int]: self._ensure_runtime() v = self._runtime.get("center1_mhz") return int(v) if isinstance(v, int) else None @property def chanspec(self) -> str: """Human-readable channel summary (freq / width / center1) when known.""" self._ensure_runtime() return str(self._runtime.get("chanspec", "")) @classmethod def from_sysfs(cls, net_name: str) -> Optional[WirelessInterface]: base = os.path.join(SYS_NET, net_name) device_ln = os.path.join(base, "device") if not os.path.isdir(device_ln): return None real_dev = os.path.realpath(device_ln) driver = _read_link_basename(os.path.join(device_ln, "driver")) if not driver: driver = _read_link_basename(os.path.join(real_dev, "driver")) conn = _connection_type(real_dev) pci_addr, vendor_id, device_id = _pci_endpoint_from_device(real_dev) if conn == "USB" and (not vendor_id or not device_id): uv, up = _usb_ids_from_uevent(device_ln) vendor_id, device_id = uv or vendor_id, up or device_id return cls( name=net_name, driver=driver or "", pci_address=pci_addr or "", connection_type=conn, vendor_id=vendor_id, device_id=device_id, ) def refresh_chip_label(self) -> None: if self.product: self.chip_label = self.product[:220] elif self.vendor_id and self.device_id: self.chip_label = f"{self.vendor_id}:{self.device_id}" else: self.chip_label = (self.driver or self.name)[:220] def primary_score(self) -> Tuple[int, str]: score = 0 if self.connection_type == "PCIe": score += 10 elif self.connection_type == "USB": score += 3 if self.name == "wlan0": score += 5 if self.driver: score += 1 return score, self.name def to_map_dict(self) -> Dict[str, Any]: self._ensure_runtime() d: Dict[str, Any] = { "interface": self.name, "driver": self.driver, "pci_address": self.pci_address, "connection_type": self.connection_type, "vendor_id": self.vendor_id, "device_id": self.device_id, "chip_label": self.chip_label, } if self.product: d["product"] = self.product if self.wiphy: d["wiphy"] = self.wiphy if self.bands_ghz: d["bands_ghz"] = list(self.bands_ghz) for k in sorted(_RUNTIME_JSON_KEYS): if k in self._runtime: d[k] = self._runtime[k] return d @classmethod def from_map_dict(cls, d: Dict[str, Any]) -> WirelessInterface: rt: Dict[str, Any] = {} static: Dict[str, Any] = {} for k, v in d.items(): if k in _RUNTIME_JSON_KEYS: rt[k] = v else: static[k] = v _coerce_runtime_json_types(rt) bands = static.get("bands_ghz") if bands is not None and not isinstance(bands, list): bands = None wi = cls( name=str(static.get("interface") or ""), driver=str(static.get("driver") or ""), pci_address=str(static.get("pci_address") or ""), connection_type=str(static.get("connection_type") or "unknown"), vendor_id=str(static.get("vendor_id") or ""), device_id=str(static.get("device_id") or ""), product=static.get("product") if isinstance(static.get("product"), str) else None, chip_label=str(static.get("chip_label") or ""), wiphy=static.get("wiphy") if isinstance(static.get("wiphy"), str) else None, bands_ghz=[float(x) for x in bands] if bands else None, ) wi._runtime = rt wi._runtime_loaded = bool(rt) return wi @dataclass class WirelessSnapshot: """Wireless layout on one host at one time; serializes to fiber_ports[].wlan.""" wlan_scanned_at: str interfaces: Dict[str, WirelessInterface] = field(default_factory=dict) _primary_from_map: Optional[WirelessInterface] = field( default=None, compare=False, repr=False ) @property def primary(self) -> Optional[WirelessInterface]: if self._primary_from_map is not None: return self._primary_from_map if not self.interfaces: return None return max(self.interfaces.values(), key=lambda w: w.primary_score()) def to_map_dict(self) -> Dict[str, Any]: prim = self.primary return { "wlan_scanned_at": self.wlan_scanned_at, "interfaces": {k: v.to_map_dict() for k, v in sorted(self.interfaces.items())}, "primary": prim.to_map_dict() if prim else None, } def chip_and_interface(self) -> Tuple[str, str]: prim = self.primary if prim is None: return "", "" chip = (prim.chip_label or prim.product or "").strip() return chip, prim.name.strip() @classmethod def discover(cls) -> WirelessSnapshot: scanned = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if not os.path.isdir(SYS_NET): return cls(wlan_scanned_at=scanned, interfaces={}) names = sorted( n for n in os.listdir(SYS_NET) if n != "lo" and os.path.isdir(os.path.join(SYS_NET, n)) ) interfaces: Dict[str, WirelessInterface] = {} sysfs_paths: Dict[str, str] = {} for n in names: if not _is_wireless_iface(n): continue wi = WirelessInterface.from_sysfs(n) if wi is None: continue sysfs_paths[n] = os.path.realpath(os.path.join(SYS_NET, n, "device")) interfaces[n] = wi lspci_out = _run_capture(["lspci", "-nn"], timeout=15.0) if lspci_out: _merge_lspci_into(interfaces, _parse_lspci_nn_wireless(lspci_out), sysfs_paths) _attach_iw_to_interfaces(interfaces) for wi in interfaces.values(): wi.refresh_chip_label() return cls(wlan_scanned_at=scanned, interfaces=interfaces) @classmethod def from_map_dict(cls, d: Dict[str, Any]) -> Optional[WirelessSnapshot]: if not isinstance(d, dict): return None raw_if = d.get("interfaces") if raw_if is None: raw_if = {} if not isinstance(raw_if, dict): return None interfaces: Dict[str, WirelessInterface] = {} for _key, val in raw_if.items(): if not isinstance(val, dict): continue wi = WirelessInterface.from_map_dict(val) if wi.name: interfaces[wi.name] = wi ts = d.get("wlan_scanned_at") pr = d.get("primary") primary_override: Optional[WirelessInterface] = None if isinstance(pr, dict) and pr.get("interface"): primary_override = WirelessInterface.from_map_dict(pr) return cls( wlan_scanned_at=str(ts) if isinstance(ts, str) else "", interfaces=interfaces, _primary_from_map=primary_override, ) def discover_wireless_for_map() -> Dict[str, Any]: """JSON-serializable dict for fiber_map / wlan-info-json (stable schema).""" return WirelessSnapshot.discover().to_map_dict() def wlan_chip_and_interface(wlan_doc: Optional[Dict[str, Any]]) -> Tuple[str, str]: """(chip_label, interface_name) from a wlan snapshot dict, or ('','').""" if not isinstance(wlan_doc, dict): return "", "" snap = WirelessSnapshot.from_map_dict(wlan_doc) if snap is None: return "", "" return snap.chip_and_interface()