UmberHubManager/fiwi/ieee80211_dev.py

769 lines
25 KiB
Python

"""
IEEE 802.11 device identity on the host: sysfs (class net / phy80211) plus optional lspci / iw.
Module name ``ieee80211_dev`` mirrors kernel-style ``80211_dev`` naming; a leading digit is invalid in Python imports.
Types:
WirelessInterface — static PCI/USB identity + dynamic sysfs/iw/ethtool via @property
WirelessSnapshot — full host scan (interfaces + primary + timestamp)
Dynamic properties (cached per instance until ``refresh_runtime()``): ``mac_address``, ``operstate``,
``is_up``, ``carrier_up``, ``bssid``, ``iface_mode`` (AP/STA/…), ``is_ap`` / ``is_sta``,
``firmware_version``, ``driver_version`` (ethtool), ``channel``, ``center_freq_mhz``,
``channel_width_mhz``, ``center1_mhz``, ``chanspec``. The sysfs ``driver`` module name remains a field.
Serializes to fiber_map ``fiber_ports[].wlan``; remote capture via ``fiwi.py wlan-info-json``.
"""
from __future__ import annotations
import os
import re
import subprocess
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
SYS_NET = "/sys/class/net"
# Keys stored under WirelessInterface runtime blob (JSON round-trip + live discovery)
_RUNTIME_JSON_KEYS = frozenset(
{
"mac_address",
"operstate",
"is_up",
"carrier_up",
"bssid",
"iface_mode",
"firmware_version",
"driver_version",
"channel",
"center_freq_mhz",
"channel_width_mhz",
"center1_mhz",
"chanspec",
}
)
def _run_capture(cmd: List[str], timeout: float = 12.0) -> str:
try:
p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
except (OSError, subprocess.TimeoutExpired):
return ""
if p.returncode != 0:
return ""
return p.stdout or ""
def _is_wireless_iface(net_name: str) -> bool:
base = os.path.join(SYS_NET, net_name)
if os.path.isdir(os.path.join(base, "wireless")):
return True
if os.path.isdir(os.path.join(base, "phy80211")):
return True
uevent = os.path.join(base, "device", "uevent")
if os.path.isfile(uevent):
try:
with open(uevent, encoding="utf-8") as f:
if "DEVTYPE=wlan" in f.read():
return True
except OSError:
pass
return False
def _read_link_basename(path: str) -> str:
try:
target = os.readlink(path)
except OSError:
return ""
return os.path.basename(os.path.normpath(target))
_BDF_DIR = re.compile(r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])$", re.I)
def _pci_endpoint_from_device(device_realpath: str) -> Tuple[str, str, str]:
cur = device_realpath
pci_addr, vendor_id, device_id = "", "", ""
for _ in range(14):
base = os.path.basename(cur)
if _BDF_DIR.match(base) and os.path.isfile(os.path.join(cur, "vendor")):
pci_addr = base.lower()
vendor_id, device_id = _read_pci_vendor_device(cur)
break
parent = os.path.dirname(cur)
if parent == cur:
break
cur = parent
return pci_addr, vendor_id, device_id
def _connection_type(realpath_device: str) -> str:
low = realpath_device.lower()
if re.search(r"/usb[0-9]*/", low) or "/usb/usb" in low:
return "USB"
if re.search(r"[0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]", low, re.I):
return "PCIe"
return "unknown"
def _read_pci_vendor_device(pci_dir: str) -> Tuple[str, str]:
vend, dev = "", ""
try:
with open(os.path.join(pci_dir, "vendor"), encoding="utf-8") as f:
vend = f.read().strip().lower()
if vend.startswith("0x"):
vend = vend[2:]
with open(os.path.join(pci_dir, "device"), encoding="utf-8") as f:
dev = f.read().strip().lower()
if dev.startswith("0x"):
dev = dev[2:]
except OSError:
pass
return vend, dev
def _usb_ids_from_uevent(device_dir: str) -> Tuple[str, str]:
ue = os.path.join(device_dir, "uevent")
vid, pid = "", ""
try:
with open(ue, encoding="utf-8") as f:
for ln in f:
if ln.startswith("PRODUCT="):
rest = ln.strip().split("=", 1)[1]
parts = rest.split("/")
if len(parts) >= 2:
vid = parts[0].lower()[-4:].rjust(4, "0")
pid = parts[1].lower()[-4:].rjust(4, "0")
break
except OSError:
pass
return vid, pid
def _lspci_bdf_key(segment: str) -> str:
s = segment.strip().lower()
if re.match(r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", s, re.I):
return "0000:" + s
return s
def _parse_lspci_nn_wireless(stdout: str) -> Dict[str, Dict[str, str]]:
by_pci: Dict[str, Dict[str, str]] = {}
pci_start = re.compile(
r"^([0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]|[0-9a-f]{2}:[0-9a-f]{2}\.[0-7])\s+(.+)$",
re.I,
)
tail_ids = re.compile(r"\[([0-9a-f]{4}):([0-9a-f]{4})\]\s*$", re.I)
for line in stdout.splitlines():
line = line.strip()
m = pci_start.match(line)
if not m:
continue
bdf_n, rest = _lspci_bdf_key(m.group(1)), m.group(2)
lowrest = rest.lower()
if "ethernet controller" in lowrest or "[0200]" in rest:
continue
if "[0280]" not in rest and "[0282]" not in rest:
if not any(
x in lowrest
for x in ("wireless", "wi-fi", "wifi", "802.11", "wlan")
):
continue
elif "ethernet" in lowrest and not any(
x in lowrest for x in ("wireless", "802.11", "wifi", "wi-fi", "wlan")
):
continue
vendor, device = "", ""
tm = tail_ids.search(rest)
if tm:
vendor, device = tm.group(1).lower(), tm.group(2).lower()
by_pci[bdf_n] = {
"product": rest,
"lspci_vendor_id": vendor,
"lspci_device_id": device,
}
return by_pci
def _normalize_pci(addr: str) -> str:
if not addr:
return ""
a = addr.strip().lower()
if not a.startswith("0000:") and re.match(
r"^[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]$", a, re.I
):
a = "0000:" + a
return a
def _merge_lspci_into(
interfaces: Dict[str, WirelessInterface],
lspci_by_pci: Dict[str, Dict[str, str]],
sysfs_device_paths: Dict[str, str],
) -> None:
for name, wi in interfaces.items():
keys = [_normalize_pci(wi.pci_address)]
sd = sysfs_device_paths.get(name, "")
if sd:
ep, _, _ = _pci_endpoint_from_device(sd)
keys.append(_normalize_pci(ep))
for key in keys:
if not key or key not in lspci_by_pci:
continue
merge = lspci_by_pci[key]
if merge.get("product"):
wi.product = merge["product"]
if merge.get("lspci_vendor_id") and not wi.vendor_id:
wi.vendor_id = merge["lspci_vendor_id"]
if merge.get("lspci_device_id") and not wi.device_id:
wi.device_id = merge["lspci_device_id"]
break
def _iw_iface_phy_map() -> Dict[str, str]:
out = _run_capture(["iw", "dev"], timeout=8.0)
if not out.strip():
return {}
mapping: Dict[str, str] = {}
current_phy: Optional[str] = None
for raw in out.splitlines():
line = raw.rstrip()
if not line.startswith("\t"):
m = re.match(r"phy#(\d+)\b", line.strip())
if m:
current_phy = f"phy{m.group(1)}"
else:
current_phy = None
continue
inner = line.lstrip()
m = re.match(r"Interface\s+(\S+)", inner)
if m and current_phy:
mapping[m.group(1)] = current_phy
return mapping
def _bands_from_iw_phy(phyname: str) -> List[float]:
if not phyname:
return []
out = _run_capture(["iw", phyname, "info"], timeout=10.0)
bands: List[float] = []
for line in out.splitlines():
line = line.strip()
if line.startswith("Band "):
try:
n = int(line.split()[1])
except (ValueError, IndexError):
continue
if n == 1:
bands.append(2.4)
elif n == 2:
bands.append(5.0)
elif n == 3:
bands.append(6.0)
return sorted(set(bands))
def _attach_iw_to_interfaces(interfaces: Dict[str, WirelessInterface]) -> None:
iface_to_phy = _iw_iface_phy_map()
for name, wi in interfaces.items():
phy = iface_to_phy.get(name, "")
if not phy:
continue
wi.wiphy = phy
bs = _bands_from_iw_phy(phy)
if bs:
wi.bands_ghz = bs
def _read_sysfs_trim(path: str) -> str:
try:
with open(path, encoding="utf-8") as f:
return f.read().strip()
except OSError:
return ""
def _parse_iw_dev_info(text: str) -> Dict[str, Any]:
"""Parse `iw dev <if> info` for nl80211 type and current channel / width."""
r: Dict[str, Any] = {}
if not text.strip():
return r
for raw in text.splitlines():
line = raw.strip()
m = re.match(r"type\s+(\S+)", line, re.I)
if m:
t = m.group(1).lower()
if t == "ap":
r["iface_mode"] = "AP"
elif t == "managed":
r["iface_mode"] = "STA"
elif t == "monitor":
r["iface_mode"] = "monitor"
elif t == "mesh_point":
r["iface_mode"] = "mesh"
elif t == "P2P-client":
r["iface_mode"] = "P2P-client"
elif t == "P2P-GO":
r["iface_mode"] = "P2P-GO"
else:
r["iface_mode"] = m.group(1)
continue
m = re.search(r"channel\s+(\d+)\s+\((\d+)\s+MHz\)", line, re.I)
if m:
try:
r["channel"] = int(m.group(1))
r["center_freq_mhz"] = int(m.group(2))
except ValueError:
pass
m = re.search(r"width:\s*(\d+)\s*MHz", line, re.I)
if m:
try:
r["channel_width_mhz"] = int(m.group(1))
except ValueError:
pass
m = re.search(r"center1:\s*(\d+)\s*MHz", line, re.I)
if m:
try:
r["center1_mhz"] = int(m.group(1))
except ValueError:
pass
_fill_chanspec(r)
return r
def _fill_chanspec(r: Dict[str, Any]) -> None:
cf = r.get("center_freq_mhz")
w = r.get("channel_width_mhz")
c1 = r.get("center1_mhz")
if cf is not None and w is not None:
r["chanspec"] = f"{cf}@{w}MHz"
if c1 is not None:
r["chanspec"] += f" center1={c1}"
elif cf is not None:
r["chanspec"] = str(cf)
def _parse_iw_dev_link(text: str) -> Dict[str, Any]:
"""Parse `iw dev <if> link` (STA: BSSID, freq when associated)."""
r: Dict[str, Any] = {}
if not text.strip():
return r
for raw in text.splitlines():
line = raw.strip()
m = re.match(r"Connected to\s+([0-9a-f:]+)", line, re.I)
if m:
r["bssid"] = m.group(1).lower()
continue
m = re.match(r"freq:\s*(\d+)", line, re.I)
if m:
try:
r["center_freq_mhz"] = int(m.group(1))
except ValueError:
pass
_fill_chanspec(r)
return r
def _parse_ethtool_i(text: str) -> Dict[str, str]:
out: Dict[str, str] = {}
for raw in text.splitlines():
line = raw.strip()
if ":" not in line:
continue
key, _, val = line.partition(":")
key, val = key.strip().lower(), val.strip()
if key == "firmware-version":
out["firmware_version"] = val
elif key == "version":
out["driver_version"] = val
elif key == "driver":
out["ethtool_driver"] = val
elif key == "expansion-rom-version":
out["expansion_rom_version"] = val
return out
def _merge_runtime_dict(base: Dict[str, Any], extra: Dict[str, Any]) -> None:
for k, v in extra.items():
if v is None or v == "":
continue
if k not in base or base[k] in (None, "", 0):
base[k] = v
def _collect_iface_runtime(ifname: str) -> Dict[str, Any]:
"""Live sysfs + iw + ethtool; merged into WirelessInterface._runtime."""
r: Dict[str, Any] = {}
base = os.path.join(SYS_NET, ifname)
mac = _read_sysfs_trim(os.path.join(base, "address"))
if mac:
r["mac_address"] = mac.lower()
op = _read_sysfs_trim(os.path.join(base, "operstate")).lower()
if op:
r["operstate"] = op
r["is_up"] = op == "up"
car_raw = _read_sysfs_trim(os.path.join(base, "carrier"))
if car_raw in ("0", "1"):
r["carrier_up"] = car_raw == "1"
info_txt = _run_capture(["iw", "dev", ifname, "info"], timeout=10.0)
_merge_runtime_dict(r, _parse_iw_dev_info(info_txt))
link_txt = _run_capture(["iw", "dev", ifname, "link"], timeout=10.0)
link_d = _parse_iw_dev_link(link_txt)
if link_d.get("bssid"):
r["bssid"] = link_d["bssid"]
if "center_freq_mhz" in link_d:
r["center_freq_mhz"] = link_d["center_freq_mhz"]
_fill_chanspec(r)
eth_txt = _run_capture(["ethtool", "-i", ifname], timeout=8.0)
eth = _parse_ethtool_i(eth_txt)
if eth.get("firmware_version"):
r["firmware_version"] = eth["firmware_version"]
if eth.get("driver_version"):
r["driver_version"] = eth["driver_version"]
return r
def _coerce_runtime_json_types(rt: Dict[str, Any]) -> None:
for k in ("channel", "center_freq_mhz", "channel_width_mhz", "center1_mhz"):
if k not in rt:
continue
v = rt[k]
if isinstance(v, (int, float)):
rt[k] = int(v)
elif isinstance(v, str) and v.strip().isdigit():
rt[k] = int(v.strip())
if "is_up" in rt:
rt["is_up"] = bool(rt["is_up"])
if "carrier_up" in rt:
cv = rt["carrier_up"]
rt["carrier_up"] = None if cv is None else bool(cv)
@dataclass
class WirelessInterface:
"""One 802.11 netdev: static identity from sysfs/lspci; live link/chan/MAC via properties."""
name: str
driver: str = ""
pci_address: str = ""
connection_type: str = "unknown"
vendor_id: str = ""
device_id: str = ""
product: Optional[str] = None
chip_label: str = ""
wiphy: Optional[str] = None
bands_ghz: Optional[List[float]] = None
_runtime_loaded: bool = field(default=False, repr=False, compare=False)
_runtime: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False)
def _ensure_runtime(self) -> None:
if self._runtime_loaded:
return
self._runtime = _collect_iface_runtime(self.name)
self._runtime_loaded = True
def refresh_runtime(self) -> None:
"""Re-query sysfs / iw / ethtool (next property read uses fresh data)."""
self._runtime_loaded = False
self._runtime.clear()
@property
def mac_address(self) -> str:
self._ensure_runtime()
return str(self._runtime.get("mac_address", ""))
@property
def operstate(self) -> str:
self._ensure_runtime()
return str(self._runtime.get("operstate", ""))
@property
def is_up(self) -> bool:
self._ensure_runtime()
return bool(self._runtime.get("is_up", False))
@property
def carrier_up(self) -> Optional[bool]:
self._ensure_runtime()
if "carrier_up" not in self._runtime:
return None
return bool(self._runtime["carrier_up"])
@property
def bssid(self) -> str:
self._ensure_runtime()
return str(self._runtime.get("bssid", ""))
@property
def iface_mode(self) -> str:
"""nl80211 interface type: AP, STA, monitor, mesh, … (empty if unknown)."""
self._ensure_runtime()
return str(self._runtime.get("iface_mode", ""))
@property
def is_ap(self) -> bool:
return self.iface_mode == "AP"
@property
def is_sta(self) -> bool:
return self.iface_mode == "STA"
@property
def firmware_version(self) -> str:
self._ensure_runtime()
return str(self._runtime.get("firmware_version", ""))
@property
def driver_version(self) -> str:
"""Driver build/version string from ethtool when available."""
self._ensure_runtime()
return str(self._runtime.get("driver_version", ""))
@property
def channel(self) -> Optional[int]:
self._ensure_runtime()
v = self._runtime.get("channel")
return int(v) if isinstance(v, int) else None
@property
def center_freq_mhz(self) -> Optional[int]:
self._ensure_runtime()
v = self._runtime.get("center_freq_mhz")
return int(v) if isinstance(v, int) else None
@property
def channel_width_mhz(self) -> Optional[int]:
self._ensure_runtime()
v = self._runtime.get("channel_width_mhz")
return int(v) if isinstance(v, int) else None
@property
def center1_mhz(self) -> Optional[int]:
self._ensure_runtime()
v = self._runtime.get("center1_mhz")
return int(v) if isinstance(v, int) else None
@property
def chanspec(self) -> str:
"""Human-readable channel summary (freq / width / center1) when known."""
self._ensure_runtime()
return str(self._runtime.get("chanspec", ""))
@classmethod
def from_sysfs(cls, net_name: str) -> Optional[WirelessInterface]:
base = os.path.join(SYS_NET, net_name)
device_ln = os.path.join(base, "device")
if not os.path.isdir(device_ln):
return None
real_dev = os.path.realpath(device_ln)
driver = _read_link_basename(os.path.join(device_ln, "driver"))
if not driver:
driver = _read_link_basename(os.path.join(real_dev, "driver"))
conn = _connection_type(real_dev)
pci_addr, vendor_id, device_id = _pci_endpoint_from_device(real_dev)
if conn == "USB" and (not vendor_id or not device_id):
uv, up = _usb_ids_from_uevent(device_ln)
vendor_id, device_id = uv or vendor_id, up or device_id
return cls(
name=net_name,
driver=driver or "",
pci_address=pci_addr or "",
connection_type=conn,
vendor_id=vendor_id,
device_id=device_id,
)
def refresh_chip_label(self) -> None:
if self.product:
self.chip_label = self.product[:220]
elif self.vendor_id and self.device_id:
self.chip_label = f"{self.vendor_id}:{self.device_id}"
else:
self.chip_label = (self.driver or self.name)[:220]
def primary_score(self) -> Tuple[int, str]:
score = 0
if self.connection_type == "PCIe":
score += 10
elif self.connection_type == "USB":
score += 3
if self.name == "wlan0":
score += 5
if self.driver:
score += 1
return score, self.name
def to_map_dict(self) -> Dict[str, Any]:
self._ensure_runtime()
d: Dict[str, Any] = {
"interface": self.name,
"driver": self.driver,
"pci_address": self.pci_address,
"connection_type": self.connection_type,
"vendor_id": self.vendor_id,
"device_id": self.device_id,
"chip_label": self.chip_label,
}
if self.product:
d["product"] = self.product
if self.wiphy:
d["wiphy"] = self.wiphy
if self.bands_ghz:
d["bands_ghz"] = list(self.bands_ghz)
for k in sorted(_RUNTIME_JSON_KEYS):
if k in self._runtime:
d[k] = self._runtime[k]
return d
@classmethod
def from_map_dict(cls, d: Dict[str, Any]) -> WirelessInterface:
rt: Dict[str, Any] = {}
static: Dict[str, Any] = {}
for k, v in d.items():
if k in _RUNTIME_JSON_KEYS:
rt[k] = v
else:
static[k] = v
_coerce_runtime_json_types(rt)
bands = static.get("bands_ghz")
if bands is not None and not isinstance(bands, list):
bands = None
wi = cls(
name=str(static.get("interface") or ""),
driver=str(static.get("driver") or ""),
pci_address=str(static.get("pci_address") or ""),
connection_type=str(static.get("connection_type") or "unknown"),
vendor_id=str(static.get("vendor_id") or ""),
device_id=str(static.get("device_id") or ""),
product=static.get("product") if isinstance(static.get("product"), str) else None,
chip_label=str(static.get("chip_label") or ""),
wiphy=static.get("wiphy") if isinstance(static.get("wiphy"), str) else None,
bands_ghz=[float(x) for x in bands] if bands else None,
)
wi._runtime = rt
wi._runtime_loaded = bool(rt)
return wi
@dataclass
class WirelessSnapshot:
"""Wireless layout on one host at one time; serializes to fiber_ports[].wlan."""
wlan_scanned_at: str
interfaces: Dict[str, WirelessInterface] = field(default_factory=dict)
_primary_from_map: Optional[WirelessInterface] = field(
default=None, compare=False, repr=False
)
@property
def primary(self) -> Optional[WirelessInterface]:
if self._primary_from_map is not None:
return self._primary_from_map
if not self.interfaces:
return None
return max(self.interfaces.values(), key=lambda w: w.primary_score())
def to_map_dict(self) -> Dict[str, Any]:
prim = self.primary
return {
"wlan_scanned_at": self.wlan_scanned_at,
"interfaces": {k: v.to_map_dict() for k, v in sorted(self.interfaces.items())},
"primary": prim.to_map_dict() if prim else None,
}
def chip_and_interface(self) -> Tuple[str, str]:
prim = self.primary
if prim is None:
return "", ""
chip = (prim.chip_label or prim.product or "").strip()
return chip, prim.name.strip()
@classmethod
def discover(cls) -> WirelessSnapshot:
scanned = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
if not os.path.isdir(SYS_NET):
return cls(wlan_scanned_at=scanned, interfaces={})
names = sorted(
n
for n in os.listdir(SYS_NET)
if n != "lo" and os.path.isdir(os.path.join(SYS_NET, n))
)
interfaces: Dict[str, WirelessInterface] = {}
sysfs_paths: Dict[str, str] = {}
for n in names:
if not _is_wireless_iface(n):
continue
wi = WirelessInterface.from_sysfs(n)
if wi is None:
continue
sysfs_paths[n] = os.path.realpath(os.path.join(SYS_NET, n, "device"))
interfaces[n] = wi
lspci_out = _run_capture(["lspci", "-nn"], timeout=15.0)
if lspci_out:
_merge_lspci_into(interfaces, _parse_lspci_nn_wireless(lspci_out), sysfs_paths)
_attach_iw_to_interfaces(interfaces)
for wi in interfaces.values():
wi.refresh_chip_label()
return cls(wlan_scanned_at=scanned, interfaces=interfaces)
@classmethod
def from_map_dict(cls, d: Dict[str, Any]) -> Optional[WirelessSnapshot]:
if not isinstance(d, dict):
return None
raw_if = d.get("interfaces")
if raw_if is None:
raw_if = {}
if not isinstance(raw_if, dict):
return None
interfaces: Dict[str, WirelessInterface] = {}
for _key, val in raw_if.items():
if not isinstance(val, dict):
continue
wi = WirelessInterface.from_map_dict(val)
if wi.name:
interfaces[wi.name] = wi
ts = d.get("wlan_scanned_at")
pr = d.get("primary")
primary_override: Optional[WirelessInterface] = None
if isinstance(pr, dict) and pr.get("interface"):
primary_override = WirelessInterface.from_map_dict(pr)
return cls(
wlan_scanned_at=str(ts) if isinstance(ts, str) else "",
interfaces=interfaces,
_primary_from_map=primary_override,
)
def discover_wireless_for_map() -> Dict[str, Any]:
"""JSON-serializable dict for fiber_map / wlan-info-json (stable schema)."""
return WirelessSnapshot.discover().to_map_dict()
def wlan_chip_and_interface(wlan_doc: Optional[Dict[str, Any]]) -> Tuple[str, str]:
"""(chip_label, interface_name) from a wlan snapshot dict, or ('','')."""
if not isinstance(wlan_doc, dict):
return "", ""
snap = WirelessSnapshot.from_map_dict(wlan_doc)
if snap is None:
return "", ""
return snap.chip_and_interface()