769 lines
25 KiB
Python
769 lines
25 KiB
Python
"""
|
|
IEEE 802.11 device identity on the host: sysfs (class net / phy80211) plus optional lspci / iw.
|
|
|
|
Module name ``ieee80211_dev`` mirrors kernel-style ``80211_dev`` naming; a leading digit is invalid in Python imports.
|
|
|
|
Types:
|
|
WirelessInterface — static PCI/USB identity + dynamic sysfs/iw/ethtool via @property
|
|
WirelessSnapshot — full host scan (interfaces + primary + timestamp)
|
|
|
|
Dynamic properties (cached per instance until ``refresh_runtime()``): ``mac_address``, ``operstate``,
|
|
``is_up``, ``carrier_up``, ``bssid``, ``iface_mode`` (AP/STA/…), ``is_ap`` / ``is_sta``,
|
|
``firmware_version``, ``driver_version`` (ethtool), ``channel``, ``center_freq_mhz``,
|
|
``channel_width_mhz``, ``center1_mhz``, ``chanspec``. The sysfs ``driver`` module name remains a field.
|
|
|
|
Serializes to fiber_map ``fiber_ports[].wlan``; remote capture via ``fiwi.py wlan-info-json``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
SYS_NET = "/sys/class/net"
|
|
|
|
# Keys stored under WirelessInterface runtime blob (JSON round-trip + live discovery)
|
|
_RUNTIME_JSON_KEYS = frozenset(
|
|
{
|
|
"mac_address",
|
|
"operstate",
|
|
"is_up",
|
|
"carrier_up",
|
|
"bssid",
|
|
"iface_mode",
|
|
"firmware_version",
|
|
"driver_version",
|
|
"channel",
|
|
"center_freq_mhz",
|
|
"channel_width_mhz",
|
|
"center1_mhz",
|
|
"chanspec",
|
|
}
|
|
)
|
|
|
|
|
|
def _run_capture(cmd: List[str], timeout: float = 12.0) -> str:
|
|
try:
|
|
p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return ""
|
|
if p.returncode != 0:
|
|
return ""
|
|
return p.stdout or ""
|
|
|
|
|
|
def _is_wireless_iface(net_name: str) -> bool:
|
|
base = os.path.join(SYS_NET, net_name)
|
|
if os.path.isdir(os.path.join(base, "wireless")):
|
|
return True
|
|
if os.path.isdir(os.path.join(base, "phy80211")):
|
|
return True
|
|
uevent = os.path.join(base, "device", "uevent")
|
|
if os.path.isfile(uevent):
|
|
try:
|
|
with open(uevent, encoding="utf-8") as f:
|
|
if "DEVTYPE=wlan" in f.read():
|
|
return True
|
|
except OSError:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _read_link_basename(path: str) -> str:
|
|
try:
|
|
target = os.readlink(path)
|
|
except OSError:
|
|
return ""
|
|
return os.path.basename(os.path.normpath(target))
|
|
|
|
|
|
_BDF_DIR = re.compile(r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])$", re.I)
|
|
|
|
|
|
def _pci_endpoint_from_device(device_realpath: str) -> Tuple[str, str, str]:
|
|
cur = device_realpath
|
|
pci_addr, vendor_id, device_id = "", "", ""
|
|
for _ in range(14):
|
|
base = os.path.basename(cur)
|
|
if _BDF_DIR.match(base) and os.path.isfile(os.path.join(cur, "vendor")):
|
|
pci_addr = base.lower()
|
|
vendor_id, device_id = _read_pci_vendor_device(cur)
|
|
break
|
|
parent = os.path.dirname(cur)
|
|
if parent == cur:
|
|
break
|
|
cur = parent
|
|
return pci_addr, vendor_id, device_id
|
|
|
|
|
|
def _connection_type(realpath_device: str) -> str:
|
|
low = realpath_device.lower()
|
|
if re.search(r"/usb[0-9]*/", low) or "/usb/usb" in low:
|
|
return "USB"
|
|
if re.search(r"[0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]", low, re.I):
|
|
return "PCIe"
|
|
return "unknown"
|
|
|
|
|
|
def _read_pci_vendor_device(pci_dir: str) -> Tuple[str, str]:
|
|
vend, dev = "", ""
|
|
try:
|
|
with open(os.path.join(pci_dir, "vendor"), encoding="utf-8") as f:
|
|
vend = f.read().strip().lower()
|
|
if vend.startswith("0x"):
|
|
vend = vend[2:]
|
|
with open(os.path.join(pci_dir, "device"), encoding="utf-8") as f:
|
|
dev = f.read().strip().lower()
|
|
if dev.startswith("0x"):
|
|
dev = dev[2:]
|
|
except OSError:
|
|
pass
|
|
return vend, dev
|
|
|
|
|
|
def _usb_ids_from_uevent(device_dir: str) -> Tuple[str, str]:
|
|
ue = os.path.join(device_dir, "uevent")
|
|
vid, pid = "", ""
|
|
try:
|
|
with open(ue, encoding="utf-8") as f:
|
|
for ln in f:
|
|
if ln.startswith("PRODUCT="):
|
|
rest = ln.strip().split("=", 1)[1]
|
|
parts = rest.split("/")
|
|
if len(parts) >= 2:
|
|
vid = parts[0].lower()[-4:].rjust(4, "0")
|
|
pid = parts[1].lower()[-4:].rjust(4, "0")
|
|
break
|
|
except OSError:
|
|
pass
|
|
return vid, pid
|
|
|
|
|
|
def _lspci_bdf_key(segment: str) -> str:
|
|
s = segment.strip().lower()
|
|
if re.match(r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", s, re.I):
|
|
return "0000:" + s
|
|
return s
|
|
|
|
|
|
def _parse_lspci_nn_wireless(stdout: str) -> Dict[str, Dict[str, str]]:
|
|
by_pci: Dict[str, Dict[str, str]] = {}
|
|
pci_start = re.compile(
|
|
r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]|[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])\s+(.+)$",
|
|
re.I,
|
|
)
|
|
tail_ids = re.compile(r"\[([0-9a-f]{4}):([0-9a-f]{4})\]\s*$", re.I)
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
m = pci_start.match(line)
|
|
if not m:
|
|
continue
|
|
bdf_n, rest = _lspci_bdf_key(m.group(1)), m.group(2)
|
|
lowrest = rest.lower()
|
|
if "ethernet controller" in lowrest or "[0200]" in rest:
|
|
continue
|
|
if "[0280]" not in rest and "[0282]" not in rest:
|
|
if not any(
|
|
x in lowrest
|
|
for x in ("wireless", "wi-fi", "wifi", "802.11", "wlan")
|
|
):
|
|
continue
|
|
elif "ethernet" in lowrest and not any(
|
|
x in lowrest for x in ("wireless", "802.11", "wifi", "wi-fi", "wlan")
|
|
):
|
|
continue
|
|
|
|
vendor, device = "", ""
|
|
tm = tail_ids.search(rest)
|
|
if tm:
|
|
vendor, device = tm.group(1).lower(), tm.group(2).lower()
|
|
by_pci[bdf_n] = {
|
|
"product": rest,
|
|
"lspci_vendor_id": vendor,
|
|
"lspci_device_id": device,
|
|
}
|
|
return by_pci
|
|
|
|
|
|
def _normalize_pci(addr: str) -> str:
|
|
if not addr:
|
|
return ""
|
|
a = addr.strip().lower()
|
|
if not a.startswith("0000:") and re.match(
|
|
r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", a, re.I
|
|
):
|
|
a = "0000:" + a
|
|
return a
|
|
|
|
|
|
def _merge_lspci_into(
|
|
interfaces: Dict[str, WirelessInterface],
|
|
lspci_by_pci: Dict[str, Dict[str, str]],
|
|
sysfs_device_paths: Dict[str, str],
|
|
) -> None:
|
|
for name, wi in interfaces.items():
|
|
keys = [_normalize_pci(wi.pci_address)]
|
|
sd = sysfs_device_paths.get(name, "")
|
|
if sd:
|
|
ep, _, _ = _pci_endpoint_from_device(sd)
|
|
keys.append(_normalize_pci(ep))
|
|
for key in keys:
|
|
if not key or key not in lspci_by_pci:
|
|
continue
|
|
merge = lspci_by_pci[key]
|
|
if merge.get("product"):
|
|
wi.product = merge["product"]
|
|
if merge.get("lspci_vendor_id") and not wi.vendor_id:
|
|
wi.vendor_id = merge["lspci_vendor_id"]
|
|
if merge.get("lspci_device_id") and not wi.device_id:
|
|
wi.device_id = merge["lspci_device_id"]
|
|
break
|
|
|
|
|
|
def _iw_iface_phy_map() -> Dict[str, str]:
|
|
out = _run_capture(["iw", "dev"], timeout=8.0)
|
|
if not out.strip():
|
|
return {}
|
|
mapping: Dict[str, str] = {}
|
|
current_phy: Optional[str] = None
|
|
for raw in out.splitlines():
|
|
line = raw.rstrip()
|
|
if not line.startswith("\t"):
|
|
m = re.match(r"phy#(\d+)\b", line.strip())
|
|
if m:
|
|
current_phy = f"phy{m.group(1)}"
|
|
else:
|
|
current_phy = None
|
|
continue
|
|
inner = line.lstrip()
|
|
m = re.match(r"Interface\s+(\S+)", inner)
|
|
if m and current_phy:
|
|
mapping[m.group(1)] = current_phy
|
|
return mapping
|
|
|
|
|
|
def _bands_from_iw_phy(phyname: str) -> List[float]:
|
|
if not phyname:
|
|
return []
|
|
out = _run_capture(["iw", phyname, "info"], timeout=10.0)
|
|
bands: List[float] = []
|
|
for line in out.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("Band "):
|
|
try:
|
|
n = int(line.split()[1])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
if n == 1:
|
|
bands.append(2.4)
|
|
elif n == 2:
|
|
bands.append(5.0)
|
|
elif n == 3:
|
|
bands.append(6.0)
|
|
return sorted(set(bands))
|
|
|
|
|
|
def _attach_iw_to_interfaces(interfaces: Dict[str, WirelessInterface]) -> None:
|
|
iface_to_phy = _iw_iface_phy_map()
|
|
for name, wi in interfaces.items():
|
|
phy = iface_to_phy.get(name, "")
|
|
if not phy:
|
|
continue
|
|
wi.wiphy = phy
|
|
bs = _bands_from_iw_phy(phy)
|
|
if bs:
|
|
wi.bands_ghz = bs
|
|
|
|
|
|
def _read_sysfs_trim(path: str) -> str:
|
|
try:
|
|
with open(path, encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def _parse_iw_dev_info(text: str) -> Dict[str, Any]:
|
|
"""Parse `iw dev <if> info` for nl80211 type and current channel / width."""
|
|
r: Dict[str, Any] = {}
|
|
if not text.strip():
|
|
return r
|
|
for raw in text.splitlines():
|
|
line = raw.strip()
|
|
m = re.match(r"type\s+(\S+)", line, re.I)
|
|
if m:
|
|
t = m.group(1).lower()
|
|
if t == "ap":
|
|
r["iface_mode"] = "AP"
|
|
elif t == "managed":
|
|
r["iface_mode"] = "STA"
|
|
elif t == "monitor":
|
|
r["iface_mode"] = "monitor"
|
|
elif t == "mesh_point":
|
|
r["iface_mode"] = "mesh"
|
|
elif t == "P2P-client":
|
|
r["iface_mode"] = "P2P-client"
|
|
elif t == "P2P-GO":
|
|
r["iface_mode"] = "P2P-GO"
|
|
else:
|
|
r["iface_mode"] = m.group(1)
|
|
continue
|
|
m = re.search(r"channel\s+(\d+)\s+\((\d+)\s+MHz\)", line, re.I)
|
|
if m:
|
|
try:
|
|
r["channel"] = int(m.group(1))
|
|
r["center_freq_mhz"] = int(m.group(2))
|
|
except ValueError:
|
|
pass
|
|
m = re.search(r"width:\s*(\d+)\s*MHz", line, re.I)
|
|
if m:
|
|
try:
|
|
r["channel_width_mhz"] = int(m.group(1))
|
|
except ValueError:
|
|
pass
|
|
m = re.search(r"center1:\s*(\d+)\s*MHz", line, re.I)
|
|
if m:
|
|
try:
|
|
r["center1_mhz"] = int(m.group(1))
|
|
except ValueError:
|
|
pass
|
|
_fill_chanspec(r)
|
|
return r
|
|
|
|
|
|
def _fill_chanspec(r: Dict[str, Any]) -> None:
|
|
cf = r.get("center_freq_mhz")
|
|
w = r.get("channel_width_mhz")
|
|
c1 = r.get("center1_mhz")
|
|
if cf is not None and w is not None:
|
|
r["chanspec"] = f"{cf}@{w}MHz"
|
|
if c1 is not None:
|
|
r["chanspec"] += f" center1={c1}"
|
|
elif cf is not None:
|
|
r["chanspec"] = str(cf)
|
|
|
|
|
|
def _parse_iw_dev_link(text: str) -> Dict[str, Any]:
|
|
"""Parse `iw dev <if> link` (STA: BSSID, freq when associated)."""
|
|
r: Dict[str, Any] = {}
|
|
if not text.strip():
|
|
return r
|
|
for raw in text.splitlines():
|
|
line = raw.strip()
|
|
m = re.match(r"Connected to\s+([0-9a-f:]+)", line, re.I)
|
|
if m:
|
|
r["bssid"] = m.group(1).lower()
|
|
continue
|
|
m = re.match(r"freq:\s*(\d+)", line, re.I)
|
|
if m:
|
|
try:
|
|
r["center_freq_mhz"] = int(m.group(1))
|
|
except ValueError:
|
|
pass
|
|
_fill_chanspec(r)
|
|
return r
|
|
|
|
|
|
def _parse_ethtool_i(text: str) -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
for raw in text.splitlines():
|
|
line = raw.strip()
|
|
if ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key, val = key.strip().lower(), val.strip()
|
|
if key == "firmware-version":
|
|
out["firmware_version"] = val
|
|
elif key == "version":
|
|
out["driver_version"] = val
|
|
elif key == "driver":
|
|
out["ethtool_driver"] = val
|
|
elif key == "expansion-rom-version":
|
|
out["expansion_rom_version"] = val
|
|
return out
|
|
|
|
|
|
def _merge_runtime_dict(base: Dict[str, Any], extra: Dict[str, Any]) -> None:
|
|
for k, v in extra.items():
|
|
if v is None or v == "":
|
|
continue
|
|
if k not in base or base[k] in (None, "", 0):
|
|
base[k] = v
|
|
|
|
|
|
def _collect_iface_runtime(ifname: str) -> Dict[str, Any]:
|
|
"""Live sysfs + iw + ethtool; merged into WirelessInterface._runtime."""
|
|
r: Dict[str, Any] = {}
|
|
base = os.path.join(SYS_NET, ifname)
|
|
|
|
mac = _read_sysfs_trim(os.path.join(base, "address"))
|
|
if mac:
|
|
r["mac_address"] = mac.lower()
|
|
|
|
op = _read_sysfs_trim(os.path.join(base, "operstate")).lower()
|
|
if op:
|
|
r["operstate"] = op
|
|
r["is_up"] = op == "up"
|
|
|
|
car_raw = _read_sysfs_trim(os.path.join(base, "carrier"))
|
|
if car_raw in ("0", "1"):
|
|
r["carrier_up"] = car_raw == "1"
|
|
|
|
info_txt = _run_capture(["iw", "dev", ifname, "info"], timeout=10.0)
|
|
_merge_runtime_dict(r, _parse_iw_dev_info(info_txt))
|
|
|
|
link_txt = _run_capture(["iw", "dev", ifname, "link"], timeout=10.0)
|
|
link_d = _parse_iw_dev_link(link_txt)
|
|
if link_d.get("bssid"):
|
|
r["bssid"] = link_d["bssid"]
|
|
if "center_freq_mhz" in link_d:
|
|
r["center_freq_mhz"] = link_d["center_freq_mhz"]
|
|
_fill_chanspec(r)
|
|
|
|
eth_txt = _run_capture(["ethtool", "-i", ifname], timeout=8.0)
|
|
eth = _parse_ethtool_i(eth_txt)
|
|
if eth.get("firmware_version"):
|
|
r["firmware_version"] = eth["firmware_version"]
|
|
if eth.get("driver_version"):
|
|
r["driver_version"] = eth["driver_version"]
|
|
|
|
return r
|
|
|
|
|
|
def _coerce_runtime_json_types(rt: Dict[str, Any]) -> None:
|
|
for k in ("channel", "center_freq_mhz", "channel_width_mhz", "center1_mhz"):
|
|
if k not in rt:
|
|
continue
|
|
v = rt[k]
|
|
if isinstance(v, (int, float)):
|
|
rt[k] = int(v)
|
|
elif isinstance(v, str) and v.strip().isdigit():
|
|
rt[k] = int(v.strip())
|
|
if "is_up" in rt:
|
|
rt["is_up"] = bool(rt["is_up"])
|
|
if "carrier_up" in rt:
|
|
cv = rt["carrier_up"]
|
|
rt["carrier_up"] = None if cv is None else bool(cv)
|
|
|
|
|
|
@dataclass
|
|
class WirelessInterface:
|
|
"""One 802.11 netdev: static identity from sysfs/lspci; live link/chan/MAC via properties."""
|
|
|
|
name: str
|
|
driver: str = ""
|
|
pci_address: str = ""
|
|
connection_type: str = "unknown"
|
|
vendor_id: str = ""
|
|
device_id: str = ""
|
|
product: Optional[str] = None
|
|
chip_label: str = ""
|
|
wiphy: Optional[str] = None
|
|
bands_ghz: Optional[List[float]] = None
|
|
_runtime_loaded: bool = field(default=False, repr=False, compare=False)
|
|
_runtime: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False)
|
|
|
|
def _ensure_runtime(self) -> None:
|
|
if self._runtime_loaded:
|
|
return
|
|
self._runtime = _collect_iface_runtime(self.name)
|
|
self._runtime_loaded = True
|
|
|
|
def refresh_runtime(self) -> None:
|
|
"""Re-query sysfs / iw / ethtool (next property read uses fresh data)."""
|
|
self._runtime_loaded = False
|
|
self._runtime.clear()
|
|
|
|
@property
|
|
def mac_address(self) -> str:
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("mac_address", ""))
|
|
|
|
@property
|
|
def operstate(self) -> str:
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("operstate", ""))
|
|
|
|
@property
|
|
def is_up(self) -> bool:
|
|
self._ensure_runtime()
|
|
return bool(self._runtime.get("is_up", False))
|
|
|
|
@property
|
|
def carrier_up(self) -> Optional[bool]:
|
|
self._ensure_runtime()
|
|
if "carrier_up" not in self._runtime:
|
|
return None
|
|
return bool(self._runtime["carrier_up"])
|
|
|
|
@property
|
|
def bssid(self) -> str:
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("bssid", ""))
|
|
|
|
@property
|
|
def iface_mode(self) -> str:
|
|
"""nl80211 interface type: AP, STA, monitor, mesh, … (empty if unknown)."""
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("iface_mode", ""))
|
|
|
|
@property
|
|
def is_ap(self) -> bool:
|
|
return self.iface_mode == "AP"
|
|
|
|
@property
|
|
def is_sta(self) -> bool:
|
|
return self.iface_mode == "STA"
|
|
|
|
@property
|
|
def firmware_version(self) -> str:
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("firmware_version", ""))
|
|
|
|
@property
|
|
def driver_version(self) -> str:
|
|
"""Driver build/version string from ethtool when available."""
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("driver_version", ""))
|
|
|
|
@property
|
|
def channel(self) -> Optional[int]:
|
|
self._ensure_runtime()
|
|
v = self._runtime.get("channel")
|
|
return int(v) if isinstance(v, int) else None
|
|
|
|
@property
|
|
def center_freq_mhz(self) -> Optional[int]:
|
|
self._ensure_runtime()
|
|
v = self._runtime.get("center_freq_mhz")
|
|
return int(v) if isinstance(v, int) else None
|
|
|
|
@property
|
|
def channel_width_mhz(self) -> Optional[int]:
|
|
self._ensure_runtime()
|
|
v = self._runtime.get("channel_width_mhz")
|
|
return int(v) if isinstance(v, int) else None
|
|
|
|
@property
|
|
def center1_mhz(self) -> Optional[int]:
|
|
self._ensure_runtime()
|
|
v = self._runtime.get("center1_mhz")
|
|
return int(v) if isinstance(v, int) else None
|
|
|
|
@property
|
|
def chanspec(self) -> str:
|
|
"""Human-readable channel summary (freq / width / center1) when known."""
|
|
self._ensure_runtime()
|
|
return str(self._runtime.get("chanspec", ""))
|
|
|
|
@classmethod
|
|
def from_sysfs(cls, net_name: str) -> Optional[WirelessInterface]:
|
|
base = os.path.join(SYS_NET, net_name)
|
|
device_ln = os.path.join(base, "device")
|
|
if not os.path.isdir(device_ln):
|
|
return None
|
|
real_dev = os.path.realpath(device_ln)
|
|
driver = _read_link_basename(os.path.join(device_ln, "driver"))
|
|
if not driver:
|
|
driver = _read_link_basename(os.path.join(real_dev, "driver"))
|
|
|
|
conn = _connection_type(real_dev)
|
|
pci_addr, vendor_id, device_id = _pci_endpoint_from_device(real_dev)
|
|
|
|
if conn == "USB" and (not vendor_id or not device_id):
|
|
uv, up = _usb_ids_from_uevent(device_ln)
|
|
vendor_id, device_id = uv or vendor_id, up or device_id
|
|
|
|
return cls(
|
|
name=net_name,
|
|
driver=driver or "",
|
|
pci_address=pci_addr or "",
|
|
connection_type=conn,
|
|
vendor_id=vendor_id,
|
|
device_id=device_id,
|
|
)
|
|
|
|
def refresh_chip_label(self) -> None:
|
|
if self.product:
|
|
self.chip_label = self.product[:220]
|
|
elif self.vendor_id and self.device_id:
|
|
self.chip_label = f"{self.vendor_id}:{self.device_id}"
|
|
else:
|
|
self.chip_label = (self.driver or self.name)[:220]
|
|
|
|
def primary_score(self) -> Tuple[int, str]:
|
|
score = 0
|
|
if self.connection_type == "PCIe":
|
|
score += 10
|
|
elif self.connection_type == "USB":
|
|
score += 3
|
|
if self.name == "wlan0":
|
|
score += 5
|
|
if self.driver:
|
|
score += 1
|
|
return score, self.name
|
|
|
|
def to_map_dict(self) -> Dict[str, Any]:
|
|
self._ensure_runtime()
|
|
d: Dict[str, Any] = {
|
|
"interface": self.name,
|
|
"driver": self.driver,
|
|
"pci_address": self.pci_address,
|
|
"connection_type": self.connection_type,
|
|
"vendor_id": self.vendor_id,
|
|
"device_id": self.device_id,
|
|
"chip_label": self.chip_label,
|
|
}
|
|
if self.product:
|
|
d["product"] = self.product
|
|
if self.wiphy:
|
|
d["wiphy"] = self.wiphy
|
|
if self.bands_ghz:
|
|
d["bands_ghz"] = list(self.bands_ghz)
|
|
for k in sorted(_RUNTIME_JSON_KEYS):
|
|
if k in self._runtime:
|
|
d[k] = self._runtime[k]
|
|
return d
|
|
|
|
@classmethod
|
|
def from_map_dict(cls, d: Dict[str, Any]) -> WirelessInterface:
|
|
rt: Dict[str, Any] = {}
|
|
static: Dict[str, Any] = {}
|
|
for k, v in d.items():
|
|
if k in _RUNTIME_JSON_KEYS:
|
|
rt[k] = v
|
|
else:
|
|
static[k] = v
|
|
_coerce_runtime_json_types(rt)
|
|
bands = static.get("bands_ghz")
|
|
if bands is not None and not isinstance(bands, list):
|
|
bands = None
|
|
wi = cls(
|
|
name=str(static.get("interface") or ""),
|
|
driver=str(static.get("driver") or ""),
|
|
pci_address=str(static.get("pci_address") or ""),
|
|
connection_type=str(static.get("connection_type") or "unknown"),
|
|
vendor_id=str(static.get("vendor_id") or ""),
|
|
device_id=str(static.get("device_id") or ""),
|
|
product=static.get("product") if isinstance(static.get("product"), str) else None,
|
|
chip_label=str(static.get("chip_label") or ""),
|
|
wiphy=static.get("wiphy") if isinstance(static.get("wiphy"), str) else None,
|
|
bands_ghz=[float(x) for x in bands] if bands else None,
|
|
)
|
|
wi._runtime = rt
|
|
wi._runtime_loaded = bool(rt)
|
|
return wi
|
|
|
|
|
|
@dataclass
|
|
class WirelessSnapshot:
|
|
"""Wireless layout on one host at one time; serializes to fiber_ports[].wlan."""
|
|
|
|
wlan_scanned_at: str
|
|
interfaces: Dict[str, WirelessInterface] = field(default_factory=dict)
|
|
_primary_from_map: Optional[WirelessInterface] = field(
|
|
default=None, compare=False, repr=False
|
|
)
|
|
|
|
@property
|
|
def primary(self) -> Optional[WirelessInterface]:
|
|
if self._primary_from_map is not None:
|
|
return self._primary_from_map
|
|
if not self.interfaces:
|
|
return None
|
|
return max(self.interfaces.values(), key=lambda w: w.primary_score())
|
|
|
|
def to_map_dict(self) -> Dict[str, Any]:
|
|
prim = self.primary
|
|
return {
|
|
"wlan_scanned_at": self.wlan_scanned_at,
|
|
"interfaces": {k: v.to_map_dict() for k, v in sorted(self.interfaces.items())},
|
|
"primary": prim.to_map_dict() if prim else None,
|
|
}
|
|
|
|
def chip_and_interface(self) -> Tuple[str, str]:
|
|
prim = self.primary
|
|
if prim is None:
|
|
return "", ""
|
|
chip = (prim.chip_label or prim.product or "").strip()
|
|
return chip, prim.name.strip()
|
|
|
|
@classmethod
|
|
def discover(cls) -> WirelessSnapshot:
|
|
scanned = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
if not os.path.isdir(SYS_NET):
|
|
return cls(wlan_scanned_at=scanned, interfaces={})
|
|
|
|
names = sorted(
|
|
n
|
|
for n in os.listdir(SYS_NET)
|
|
if n != "lo" and os.path.isdir(os.path.join(SYS_NET, n))
|
|
)
|
|
interfaces: Dict[str, WirelessInterface] = {}
|
|
sysfs_paths: Dict[str, str] = {}
|
|
for n in names:
|
|
if not _is_wireless_iface(n):
|
|
continue
|
|
wi = WirelessInterface.from_sysfs(n)
|
|
if wi is None:
|
|
continue
|
|
sysfs_paths[n] = os.path.realpath(os.path.join(SYS_NET, n, "device"))
|
|
interfaces[n] = wi
|
|
|
|
lspci_out = _run_capture(["lspci", "-nn"], timeout=15.0)
|
|
if lspci_out:
|
|
_merge_lspci_into(interfaces, _parse_lspci_nn_wireless(lspci_out), sysfs_paths)
|
|
|
|
_attach_iw_to_interfaces(interfaces)
|
|
|
|
for wi in interfaces.values():
|
|
wi.refresh_chip_label()
|
|
|
|
return cls(wlan_scanned_at=scanned, interfaces=interfaces)
|
|
|
|
@classmethod
|
|
def from_map_dict(cls, d: Dict[str, Any]) -> Optional[WirelessSnapshot]:
|
|
if not isinstance(d, dict):
|
|
return None
|
|
raw_if = d.get("interfaces")
|
|
if raw_if is None:
|
|
raw_if = {}
|
|
if not isinstance(raw_if, dict):
|
|
return None
|
|
interfaces: Dict[str, WirelessInterface] = {}
|
|
for _key, val in raw_if.items():
|
|
if not isinstance(val, dict):
|
|
continue
|
|
wi = WirelessInterface.from_map_dict(val)
|
|
if wi.name:
|
|
interfaces[wi.name] = wi
|
|
ts = d.get("wlan_scanned_at")
|
|
pr = d.get("primary")
|
|
primary_override: Optional[WirelessInterface] = None
|
|
if isinstance(pr, dict) and pr.get("interface"):
|
|
primary_override = WirelessInterface.from_map_dict(pr)
|
|
return cls(
|
|
wlan_scanned_at=str(ts) if isinstance(ts, str) else "",
|
|
interfaces=interfaces,
|
|
_primary_from_map=primary_override,
|
|
)
|
|
|
|
|
|
def discover_wireless_for_map() -> Dict[str, Any]:
|
|
"""JSON-serializable dict for fiber_map / wlan-info-json (stable schema)."""
|
|
return WirelessSnapshot.discover().to_map_dict()
|
|
|
|
|
|
def wlan_chip_and_interface(wlan_doc: Optional[Dict[str, Any]]) -> Tuple[str, str]:
|
|
"""(chip_label, interface_name) from a wlan snapshot dict, or ('','')."""
|
|
if not isinstance(wlan_doc, dict):
|
|
return "", ""
|
|
snap = WirelessSnapshot.from_map_dict(wlan_doc)
|
|
if snap is None:
|
|
return "", ""
|
|
return snap.chip_and_interface()
|