FiWiManager/tests/check_concentrator.py

1519 lines
52 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Smoke check: :class:`fiwi.concentrator.FiWiConcentrator` plus consolidated USB hub and per-port
metrics tables covering **this machine** and each configured **remote**. The consolidated hub table
has **Type** (BrainStem hub class), **Location** (``local`` or remote IP/hostname, plus **tty** when
known) and **USB** (Bus/Device, VID:PID, product from sysfs). The per-port table starts with **Panel**
(``fiber_ports``
key / rack position when ``hub``, ``port``, and optional ``ssh`` match this run), rows sorted by panel
number; a **Power(N): Total … W / … mA (per port follows)** line heads the section (``N`` = ports with
``power`` ON), then columns **mA**, **V**, **W**, and **Location**.
(no USB column).
Adnacom PCIe catalog last.
Requires BrainStem locally for hub enumeration on this host. Remotes use SSH ``show_hostcards``
and ``port-metrics-json``; the remote tree must include that command (same revision as here).
**Standalone**::
python tests/check_concentrator.py
python tests/check_concentrator.py --config uax24
python tests/check_concentrator.py --inrush 1 0
python tests/check_concentrator.py --inrush 1 0 --inrush-host-sample --inrush-json
python tests/check_concentrator.py --panel-calibrate
python tests/check_concentrator.py --panel-calibrate --calibrate-merge --calibrate-ssh pi@192.168.1.39
``--config`` matches ``FIWI_CONFIG`` (profile or absolute ``*.ini``).
``--powercycle`` runs a destructive self-test on **local** USB hubs and each host in
``FIWI_CALIBRATE_REMOTES`` / merged hub hosts: all ports OFF (verify), then all ON (verify),
then prints the port power table (snapshot after the test).
After the per-port power table, the script builds a :class:`fiwi.radiohead.RadioHead` for every
mapped ``fiber_ports`` row (panel slots first, then any other keys) and prints panel, saved chip
type, and live power / mA / V.
``--inrush HUB PORT`` (after the report, or after ``--powercycle``) runs the same USB inrush probe as
``tests/check_inrush.py`` on **local** hubs only (1-based hub index, 0-based port). Use
``--inrush-host-sample`` for host-side polling; default is on-hub Reflex scratchpad.
``--panel-calibrate`` runs the interactive **fiber map** workflow (patch panel size, USB port walk,
wlan + lspci snapshot per step → ``chip_type`` / ``wlan`` / ``hub``+``port`` in ``fiber_map.json``).
Same as ``python3 fiwi.py panel calibrate``. Use ``--calibrate-merge``, ``--calibrate-limit N``,
and repeatable ``--calibrate-ssh user@host``. Needs a TTY; cannot combine with ``--powercycle`` or
``--inrush``.
With pytest::
FIWI_CONFIG=uax24 pytest tests/check_concentrator.py
"""
from __future__ import annotations
import argparse
import json
import os
import re
import socket
import sys
import time
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fiwi.concentrator import FiWiConcentrator
_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
_WIDTH = 62
_paths_configured = False
_HUB_TYPE_COL_W = 14
_CONSOLIDATED_HUB_HDR = (
f"{'#':<4} | {'Type':<{_HUB_TYPE_COL_W}} | {'Serial':<12} | {'Ports':<10} | "
f"{'Location':<28} | USB (Bus / ID / product)"
)
_PER_PORT_PWR_HDR = (
f"{'Panel':<8} | {'Hub#':<4} | {'Serial':<12} | {'Pt':<3} | "
f"{'mA':>10} | {'V':>8} | {'W':>10} | Location"
)
_HUB_TABLE_ROW_RE = re.compile(
r"^\s*\d+\s+\|\s+(0x[0-9A-Fa-f]+)\s+\|\s+(.+)\s*$"
)
@dataclass(frozen=True)
class ConsolidatedHubRow:
"""One row of the consolidated hub table (local or remote)."""
idx: int
serial: str
ports_display: str
n_ports: int
location: str
ssh_target: str | None
#: e.g. ``ttyUSB0`` or ``ttyUSB0+ttyACM0`` from sysfs (24ff hubs); empty if unknown.
usb_tty_hint: str = ""
#: sysfs / ``lsusb``-style line: Bus/Dev, ID ``24ff:…``, product string.
usb_identity_hint: str = ""
#: BrainStem stem class / ``defs.model_name`` (e.g. ``USBHub3p``); ``?`` if unknown.
hub_type: str = "?"
def _rule(char: str = "-") -> str:
return char * _WIDTH
def _ensure_paths_configured() -> None:
global _paths_configured
if _paths_configured:
return
import fiwi.paths as paths_mod
paths_mod.configure(_ROOT)
_paths_configured = True
def _instantiate_concentrator() -> FiWiConcentrator:
_ensure_paths_configured()
from fiwi.concentrator import FiWiConcentrator
c = FiWiConcentrator()
assert c.hubs == [], "new concentrator should have no connected hubs yet"
return c
def _remote_location_label(ssh_target: str) -> str:
"""IPv4 for ``user@host`` when possible; otherwise the host part."""
if "@" not in ssh_target:
host = ssh_target.strip()
else:
host = ssh_target.split("@", 1)[1].strip()
if re.fullmatch(r"(?:\d{1,3}\.){3}\d{1,3}", host):
return host
try:
return socket.gethostbyname(host)
except OSError:
return host
def _local_hub_type_for_spec(c: FiWiConcentrator, spec: object) -> str:
"""Stem class name when connected, else ``defs.model_name(spec.model)``, else ``?``."""
import fiwi.brainstem_loader as stemmod
stemmod.load_brainstem()
sn = getattr(spec, "serial_number", None)
if isinstance(sn, int):
for stem in c.hubs:
r = stem.system.getSerialNumber()
if r.error == c.SUCCESS and r.value == sn:
return type(stem).__name__
defs_mod = getattr(stemmod.brainstem, "defs", None)
model = getattr(spec, "model", None)
if defs_mod is not None and model is not None:
try:
mn = defs_mod.model_name(int(model))
except (TypeError, ValueError):
mn = "Unknown"
if mn != "Unknown":
return mn
return "?"
def _parse_serial_to_hub_type_from_hostcards_stdout(text: str) -> dict[str, str]:
"""
From ``show_hostcards`` text, map normalized serial (``0x…``) → hub type name using
``serial=0x… model=N`` discovery lines (same stdout as the hub table).
"""
import fiwi.brainstem_loader as stemmod
stemmod.load_brainstem()
defs_mod = getattr(stemmod.brainstem, "defs", None)
out: dict[str, str] = {}
for ln in text.splitlines():
m = re.search(r"serial=(0x[0-9A-Fa-f]+)\s+model=(\S+)", ln, re.I)
if not m:
continue
key = _norm_hub_serial(m.group(1))
mod_s = m.group(2).strip()
if mod_s == "?" or defs_mod is None:
out[key] = "?"
continue
try:
mid = int(mod_s, 10)
except ValueError:
out[key] = "?"
continue
try:
mn = defs_mod.model_name(mid)
except Exception:
mn = "Unknown"
out[key] = mn if mn != "Unknown" else "?"
return out
def _collect_local_hub_rows(c: FiWiConcentrator) -> list[tuple[str, str, str]]:
"""
``(serial_hex, ports_display, hub_type)`` per local USB hub, same rules as
:meth:`fiwi.concentrator.FiWiConcentrator._print_usb_hub_summary_table`.
"""
specs = c._enumerate_usb_specs()
if not specs:
return []
if not c.hubs:
c._connect_specs(specs)
by_sn = c._serial_to_opened_port_count()
out: list[tuple[str, str, str]] = []
for spec in specs:
sn = spec.serial_number
sn_s = f"0x{sn:08X}"
if sn in by_sn:
ports = str(by_sn[sn])
else:
inf = c._inferred_downstream_ports_from_spec(spec)
ports = str(inf) if inf is not None else "?"
ports += " *"
out.append((sn_s, ports, _local_hub_type_for_spec(c, spec)))
return out
def _parse_hub_table_from_show_hostcards_stdout(text: str) -> list[tuple[str, str]]:
"""Extract ``(serial, ports)`` from ``show_hostcards`` printed table."""
lines = text.splitlines()
in_table = False
rows: list[tuple[str, str]] = []
for ln in lines:
if "Hub" in ln and "Serial" in ln and "Ports" in ln and "|" in ln:
in_table = True
continue
if not in_table:
continue
if re.match(r"^\s*-+\s*$", ln):
continue
if not ln.strip():
break
if ln.strip().startswith("*"):
break
m = _HUB_TABLE_ROW_RE.match(ln)
if m:
rows.append((m.group(1), m.group(2).strip()))
elif rows:
break
return rows
def _print_banner(config_label: str) -> None:
print(_rule("="), flush=True)
print(f"Fi-Wi concentrator check [config: {config_label}]", flush=True)
print(_rule("="), flush=True)
print(flush=True)
def _print_ssh_and_hosts_summary() -> None:
from fiwi.ssh import SshNodeConfig
cfg = SshNodeConfig.load()
raw = (cfg.calibrate_remotes or "").strip()
print("SSH / remote hub routing (after paths + INI)", flush=True)
print(_rule(), flush=True)
print(f" FIWI_REMOTE_PYTHON → {cfg.python}", flush=True)
print(f" FIWI_REMOTE_SCRIPT → {cfg.script}", flush=True)
print(f" FIWI_SSH_BIN {cfg.ssh_bin}", flush=True)
if cfg.ssh_extra_argv:
print(f" FIWI_SSH_OPTS {' '.join(cfg.ssh_extra_argv)}", flush=True)
print(f" merged hub hosts {raw or '(none)'}", flush=True)
print(_rule(), flush=True)
print(flush=True)
def _parse_port_count(ports_str: str) -> int:
"""Integer downstream count from hub table ``Ports`` cell (strip trailing ``*``)."""
s = ports_str.strip().replace("*", "").strip()
if not s or s == "?":
return 0
try:
return max(0, int(s))
except ValueError:
return 0
def _norm_hub_serial(s: str) -> str:
return s.strip().upper()
def _hub_1based_on_host(hub_rows: list[ConsolidatedHubRow], row: ConsolidatedHubRow) -> int:
"""Hub index (1-based) for ``row`` among hubs on the same host (local vs same ``ssh_target``)."""
same = [r for r in hub_rows if r.ssh_target == row.ssh_target]
same.sort(key=lambda r: r.idx)
for i, r in enumerate(same, start=1):
if r.idx == row.idx:
return i
return 0
def _fiber_map_panel_lookup() -> dict[tuple[str | None, int, int], str]:
"""
Reverse map: (ssh target or ``None`` for local, hub 1-based on that host, port 0-based)
→ fiber_map ``fiber_ports`` key (panel / fiber id string).
SSH strings must match ``fiber_map.json`` (``ssh`` / ``remote`` / ``host``+``user``) exactly
where used; local entries must omit those fields.
"""
from fiwi import fiber_map_io as fm
doc = fm.load_fiber_map_document()
if not doc:
return {}
ports = doc.get("fiber_ports")
if not isinstance(ports, dict):
return {}
out: dict[tuple[str | None, int, int], str] = {}
for map_key, ent in ports.items():
if not isinstance(ent, dict):
continue
hp = fm.fiber_entry_hub_port(ent)
if hp is None:
continue
h1, p0 = hp
ssh = fm.fiber_ssh_target(ent)
sk: str | None = ssh.strip() if isinstance(ssh, str) and ssh.strip() else None
key = (sk, h1, p0)
prev = out.get(key)
mk = str(map_key)
if prev is not None and prev != mk:
out[key] = f"{prev},{mk}"
else:
out[key] = mk
return out
def _panel_cell(
lookup: dict[tuple[str | None, int, int], str],
hub_rows: list[ConsolidatedHubRow],
row: ConsolidatedHubRow,
port_0: int,
) -> str:
h1 = _hub_1based_on_host(hub_rows, row)
if h1 <= 0:
return ""
sk: str | None = row.ssh_target.strip() if row.ssh_target else None
return lookup.get((sk, h1, port_0), "")
def _panel_sort_tuple(pnl: str, hub_table_idx: int, port_0: int) -> tuple:
"""
Sort per-port rows by patch panel label: numeric ``fiber_ports`` keys first (min if comma-list),
then other labels, unmapped (—) last; tie-break hub # and port.
"""
s = (pnl or "").strip()
if not s or s == "":
return (2, 0, hub_table_idx, port_0)
nums: list[int] = []
for part in re.split(r"[\s,]+", s):
if part.isdigit():
nums.append(int(part))
if nums:
return (0, min(nums), hub_table_idx, port_0)
return (1, s, hub_table_idx, port_0)
def _merged_hub_hosts() -> list[str]:
from fiwi.ssh import SshNodeConfig
raw = (SshNodeConfig.load().calibrate_remotes or "").strip()
seen: list[str] = []
for part in raw.split(","):
h = part.strip()
if h and h not in seen:
seen.append(h)
return seen
def _build_consolidated_hub_rows(c: FiWiConcentrator) -> tuple[list[ConsolidatedHubRow], int]:
"""
Build consolidated hub rows (same order as the printed summary table).
Returns ``(rows, rc)`` where ``rc`` is 0 if every remote ``show_hostcards`` succeeded.
"""
from fiwi.ssh import SshNode
rc = 0
rows: list[ConsolidatedHubRow] = []
def _push(
serial: str,
ports_disp: str,
loc: str,
ssh_target: str | None,
hub_type: str = "?",
) -> None:
rows.append(
ConsolidatedHubRow(
idx=len(rows) + 1,
serial=serial,
ports_display=ports_disp,
n_ports=_parse_port_count(ports_disp),
location=loc,
ssh_target=ssh_target,
hub_type=hub_type,
)
)
for serial, ports, ht in _collect_local_hub_rows(c):
_push(serial, ports, "local", None, ht)
hosts = _merged_hub_hosts()
if not hosts:
print(" (no remote hosts in config — only local hubs listed.)", flush=True)
print(flush=True)
for host in hosts:
loc = _remote_location_label(host)
try:
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["show_hostcards"], timeout=90, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote {host} ({loc}): show_hostcards exit {code}", flush=True)
rc = 1
continue
parsed = _parse_hub_table_from_show_hostcards_stdout(out)
if not parsed:
print(
f" ! Remote {host} ({loc}): no hub table in output "
f"(empty discover / parse miss).",
flush=True,
)
rc = 1
continue
remote_types = _parse_serial_to_hub_type_from_hostcards_stdout(out)
for serial, ports in parsed:
ht = remote_types.get(_norm_hub_serial(serial), "?")
_push(serial, ports, loc, host, ht)
except Exception as exc:
print(f" ! Remote {host} ({loc}): {exc}", flush=True)
rc = 1
return rows, rc
def _local_usb_ent_by_serial() -> dict[str, dict[str, object]]:
"""Normalized hub serial → identity dict from :func:`fiwi.usb_probe.usb_acroname_hub_identity_list`."""
from fiwi import usb_probe as usb_mod
out: dict[str, dict[str, object]] = {}
for ent in usb_mod.usb_acroname_hub_identity_list():
if not isinstance(ent, dict):
continue
sn = _norm_hub_serial(str(ent.get("serial", "")))
if sn:
out[sn] = ent
return out
def _fetch_remote_usb_ent_by_serial(host: str) -> dict[str, dict[str, object]]:
from fiwi.ssh import SshNode
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["usb-hub-tty-json"], timeout=45, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
return {}
try:
data = json.loads(out.strip() or "[]")
except json.JSONDecodeError:
return {}
if not isinstance(data, list):
return {}
out_map: dict[str, dict[str, object]] = {}
for raw in data:
if not isinstance(raw, dict):
continue
sn = _norm_hub_serial(str(raw.get("serial", "")))
if sn:
out_map[sn] = raw
return out_map
def _usb_identity_display(ent: dict[str, object] | None, *, max_product: int = 42) -> str:
"""Human-readable USB row (matches short ``lsusb`` intent; values come from sysfs)."""
if not ent:
return ""
bus = ent.get("bus")
dev = ent.get("dev")
vp = str(ent.get("id") or "").strip()
prod = str(ent.get("product") or "").replace("\n", " ").strip()
if not prod:
prod = str(ent.get("manufacturer") or "").replace("\n", " ").strip()
if len(prod) > max_product:
prod = prod[: max_product - 3] + "..."
bits: list[str] = []
if isinstance(bus, int) and isinstance(dev, int):
bits.append(f"Bus {bus:03d} Dev {dev:03d}")
if vp:
bits.append(f"ID {vp}")
if prod:
bits.append(prod)
return " · ".join(bits)
def _enrich_hub_rows_usb(rows: list[ConsolidatedHubRow]) -> list[ConsolidatedHubRow]:
"""Fill ``usb_tty_hint`` and ``usb_identity_hint`` from local sysfs or ``usb-hub-tty-json`` over SSH."""
local = _local_usb_ent_by_serial()
remote_maps: dict[str, dict[str, dict[str, object]]] = {}
out: list[ConsolidatedHubRow] = []
for row in rows:
ent: dict[str, object] | None = None
if row.ssh_target is None:
ent = local.get(_norm_hub_serial(row.serial))
else:
if row.ssh_target not in remote_maps:
remote_maps[row.ssh_target] = _fetch_remote_usb_ent_by_serial(row.ssh_target)
ent = remote_maps[row.ssh_target].get(_norm_hub_serial(row.serial))
tty_hint = ""
if ent:
ttys = ent.get("tty")
if isinstance(ttys, list):
tty_hint = "+".join(str(x) for x in ttys if isinstance(x, str))
usb_line = _usb_identity_display(ent)
out.append(
replace(
row,
usb_tty_hint=tty_hint,
usb_identity_hint=usb_line,
)
)
return out
def _location_cell(row: ConsolidatedHubRow) -> str:
if row.usb_tty_hint:
return f"{row.location} · {row.usb_tty_hint}"
return row.location
def _usb_cell(row: ConsolidatedHubRow) -> str:
return row.usb_identity_hint if row.usb_identity_hint else ""
def _index_metrics_by_serial_port(items: list[dict[str, object]]) -> dict[str, dict[int, dict[str, object]]]:
out: dict[str, dict[int, dict[str, object]]] = {}
for raw in items:
if not isinstance(raw, dict):
continue
sn = _norm_hub_serial(str(raw.get("serial", "")))
try:
p = int(raw["port"])
except (KeyError, TypeError, ValueError):
continue
if p < 0:
continue
out.setdefault(sn, {})[p] = raw
return out
def _remote_fiwi_missing_port_metrics_json(stdout: str, stderr: str, exit_code: int) -> bool:
"""True when the remote ``fiwi.py`` is too old and does not implement ``port-metrics-json``."""
blob = (stdout + "\n" + stderr).lower()
return exit_code == 2 and "unknown command" in blob and "port-metrics-json" in blob
def _print_remote_fiwi_upgrade_help(host: str, loc: str) -> None:
"""Tell the user how to refresh the remote so ``port-metrics-json`` exists."""
from fiwi.ssh import SshNodeConfig
cfg = SshNodeConfig.load()
print(
f" ! Remote {host} ({loc}): that hosts fiwi.py does not support port-metrics-json "
"(repository on the remote is behind this one).",
flush=True,
)
print(" Fix: deploy the same FiWiManager revision to the remote, then verify:", flush=True)
print(f" • FIWI_REMOTE_SCRIPT → {cfg.script!r}", flush=True)
print(f" • FIWI_REMOTE_PYTHON → {cfg.python!r}", flush=True)
print(" • On the remote, from a shell:", flush=True)
print(f" {cfg.python} {cfg.script} port-metrics-json", flush=True)
print(" Expect a JSON array on stdout, not “Unknown command”.", flush=True)
def _fetch_remote_port_metrics(host: str) -> tuple[int, list[dict[str, object]], str, str]:
"""Run ``port-metrics-json`` over SSH. Returns ``(exit_code, rows, stdout, stderr)``."""
from fiwi.ssh import SshNode
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["port-metrics-json"], timeout=90, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
return code, [], out, err
try:
data = json.loads(out.strip() or "[]")
except json.JSONDecodeError:
return 1, [], out, err
if not isinstance(data, list):
return 1, [], out, err
typed: list[dict[str, object]] = [x for x in data if isinstance(x, dict)]
return 0, typed, out, err
_POWERCYCLE_SETTLE_SEC = 0.35
def _set_all_local_ports(c: FiWiConcentrator, on: bool) -> None:
"""Enable or disable every downstream port on every connected local hub."""
for stem in c.hubs:
n = c._port_count(stem)
for port in range(n):
if on:
stem.usb.setPortEnable(port)
else:
stem.usb.setPortDisable(port)
def _power_mismatches(
rows: list[dict[str, object]], want_on: bool, where: str
) -> list[str]:
"""Each row must have ``power`` of ON or OFF matching ``want_on``."""
want = "ON" if want_on else "OFF"
bad: list[str] = []
for row in rows:
pwr = row.get("power")
if pwr != want:
hub = row.get("hub", "?")
port = row.get("port", "?")
sn = row.get("serial", "?")
bad.append(
f"{where} hub {hub} port {port} serial {sn!r}: power={pwr!r} (expected {want})"
)
return bad
def _restore_all_ports_on(c: FiWiConcentrator, hosts: list[str]) -> None:
"""Best-effort: turn every local and configured-remote port on (cleanup after failed test)."""
from fiwi.ssh import SshNode
print(" (cleanup) Restoring all ports ON…", flush=True)
try:
if not c.hubs:
c.connect()
if c.hubs:
_set_all_local_ports(c, True)
except Exception as exc:
print(f" ! Local restore failed: {exc}", flush=True)
for host in hosts:
try:
code, _o, err = SshNode.parse(host).invoke_capture(
["on", "all"], timeout=120, defer=False
)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote {host}: on all restore exit {code}", flush=True)
except Exception as exc:
print(f" ! Remote {host}: on all restore: {exc}", flush=True)
def _run_powercycle_test(c: FiWiConcentrator) -> int:
"""
Turn all ports off, confirm via metrics; then on, confirm.
Covers local hubs (BrainStem) and each merged SSH hub host (``off``/``on all``).
"""
from fiwi.ssh import SshNode
hosts = _merged_hub_hosts()
if not c.hubs:
c.connect()
has_local = bool(c.hubs)
if not has_local and not hosts:
print("FAIL: no local USB hubs and no remote hub hosts configured.", flush=True)
return 1
print("Power-cycle self-test (disrupts USB power on local + remote hub hosts)", flush=True)
print(_rule(), flush=True)
if has_local:
print(f" Local: {len(c.hubs)} hub(s) on {socket.gethostname()}", flush=True)
else:
print(" Local: (no USB power hubs connected here)", flush=True)
if hosts:
print(f" Remote hub hosts: {', '.join(hosts)}", flush=True)
print(flush=True)
completed = False
settle = _POWERCYCLE_SETTLE_SEC
try:
if has_local:
print(" Step 1a — local: OFF all downstream ports…", flush=True)
_set_all_local_ports(c, False)
time.sleep(settle)
snap = c.port_metrics_snapshot()
if not snap:
print(" ! Local: no port-metrics after OFF (empty snapshot).", flush=True)
return 1
bad = _power_mismatches(snap, False, "local")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(" Step 1a — OK (all local ports report OFF).", flush=True)
for host in hosts:
loc = _remote_location_label(host)
print(f" Step 1b — remote {host} ({loc}): OFF all…", flush=True)
node = SshNode.parse(host)
code, _out, err = node.invoke_capture(["off", "all"], timeout=120, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote off all exit {code}", flush=True)
return 1
time.sleep(settle)
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
if code_m != 0:
print(f" ! Remote port-metrics-json exit {code_m} after OFF", flush=True)
return 1
if not rows:
print(f" ! Remote {host}: empty port-metrics after OFF.", flush=True)
return 1
bad = _power_mismatches(rows, False, f"remote {host}")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(f" Step 1b — OK ({host}: all OFF).", flush=True)
if has_local:
print(" Step 2a — local: ON all downstream ports…", flush=True)
_set_all_local_ports(c, True)
time.sleep(settle)
snap = c.port_metrics_snapshot()
if not snap:
print(" ! Local: no port-metrics after ON.", flush=True)
return 1
bad = _power_mismatches(snap, True, "local")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(" Step 2a — OK (all local ports report ON).", flush=True)
for host in hosts:
loc = _remote_location_label(host)
print(f" Step 2b — remote {host} ({loc}): ON all…", flush=True)
node = SshNode.parse(host)
code, _out, err = node.invoke_capture(["on", "all"], timeout=120, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote on all exit {code}", flush=True)
return 1
time.sleep(settle)
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
if code_m != 0:
print(f" ! Remote port-metrics-json exit {code_m} after ON", flush=True)
return 1
if not rows:
print(f" ! Remote {host}: empty port-metrics after ON.", flush=True)
return 1
bad = _power_mismatches(rows, True, f"remote {host}")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(f" Step 2b — OK ({host}: all ON).", flush=True)
completed = True
print(flush=True)
print("Power-cycle self-test passed.", flush=True)
return 0
finally:
if not completed:
_restore_all_ports_on(c, hosts)
def _print_per_port_power_table(
c: FiWiConcentrator,
hub_rows: list[ConsolidatedHubRow],
base_rc: int,
) -> int:
"""
``Power(N): Total …`` line (``N`` = count of ports reporting power ON), then per-port **mA**, **V**,
**W**, and Location (sorted by panel).
Returns ``base_rc`` ORd with failure if any ``port-metrics-json`` remote call fails.
"""
rc = base_rc
if not hub_rows:
print(" (no hubs — skipping port power table.)", flush=True)
print(flush=True)
return rc
panel_lookup = _fiber_map_panel_lookup()
local_by_sn: dict[str, dict[int, dict[str, object]]] = {}
if any(r.ssh_target is None for r in hub_rows):
local_by_sn = _index_metrics_by_serial_port(c.port_metrics_snapshot())
remote_cache: dict[str, dict[str, dict[int, dict[str, object]]]] = {}
remote_fail: set[str] = set()
remote_fail_detail: dict[str, tuple[int, str, str]] = {}
def _remote_index(ssh_target: str) -> dict[str, dict[int, dict[str, object]]]:
if ssh_target not in remote_cache:
code, payload, out, err = _fetch_remote_port_metrics(ssh_target)
if code == 0:
remote_cache[ssh_target] = _index_metrics_by_serial_port(payload)
else:
remote_fail.add(ssh_target)
remote_fail_detail[ssh_target] = (code, out, err)
remote_cache[ssh_target] = {}
return remote_cache[ssh_target]
# Collect rows; sum current (mA) and power (W) where metrics allow.
lines_out: list[tuple[tuple, str]] = []
total_ma = 0.0
n_ma = 0
total_power_w = 0.0
n_power = 0
n_on = 0
for row in hub_rows:
sn_key = _norm_hub_serial(row.serial)
if row.ssh_target is None:
by_port = local_by_sn.get(sn_key, {})
else:
by_port = _remote_index(row.ssh_target).get(sn_key, {})
for port in range(row.n_ports):
m = by_port.get(port)
cur_f: float | None = None
v_f: float | None = None
w_s = ""
if m is None:
ma_s, v_s = "", ""
else:
if m.get("power") == "ON":
n_on += 1
cur = m.get("current_ma")
if cur is None:
ma_s = ""
else:
try:
cur_f = float(cur)
ma_s = f"{cur_f:.2f}"
total_ma += cur_f
n_ma += 1
except (TypeError, ValueError):
ma_s = ""
vv = m.get("voltage_v")
if vv is None:
v_s = ""
else:
try:
v_f = float(vv)
v_s = f"{v_f:.3f}"
except (TypeError, ValueError):
v_s = ""
if cur_f is not None and v_f is not None:
total_power_w += v_f * (cur_f / 1000.0)
n_power += 1
w_s = f"{v_f * cur_f / 1000.0:.3f}"
pnl = _panel_cell(panel_lookup, hub_rows, row, port)
st = _panel_sort_tuple(pnl, row.idx, port)
line = (
f"{pnl:<8} | {row.idx:<4} | {row.serial:<12} | {port:<3} | "
f"{ma_s:>10} | {v_s:>8} | {w_s:>10} | {_location_cell(row)}"
)
lines_out.append((st, line))
if n_ma and n_power:
# Literal "120V/220V=…A/…A" suffix (labels, not f-string slash parsing).
summary = (
"Power({n}): Total {tw:.3f} W / {tm:.2f} mA (120V/220V={a120:.2f}A/{a220:.2f}A)"
).format(
n=n_on,
tw=total_power_w,
tm=total_ma,
a120=total_power_w / 120.0,
a220=total_power_w / 220.0,
)
elif n_ma:
summary = (
f"Power({n_on}): Total — W / {total_ma:.2f} mA (per port follows)"
)
else:
summary = f"Power({n_on}): — (per port follows)"
print(summary, flush=True)
print("-" * len(_PER_PORT_PWR_HDR), flush=True)
print(_PER_PORT_PWR_HDR, flush=True)
print("-" * len(_PER_PORT_PWR_HDR), flush=True)
lines_out.sort(key=lambda x: x[0])
for _sk, line in lines_out:
print(line, flush=True)
print(flush=True)
for host in sorted(remote_fail):
loc = _remote_location_label(host)
code, out, err = remote_fail_detail[host]
if _remote_fiwi_missing_port_metrics_json(out, err, code):
_print_remote_fiwi_upgrade_help(host, loc)
else:
print(
f" ! Remote {host} ({loc}): port-metrics-json failed (exit {code}).",
flush=True,
)
rc = 1
return rc
_RADIO_HEADS_HDR = (
f"{'Panel':<8} | {'chip_type (map)':<28} | {'Pwr':<5} | {'mA':>10} | {'V':>8} | {'Hub.Pt':<8}"
)
def _print_radio_heads_section(c: FiWiConcentrator) -> None:
"""
Walk every mapped radio head: panel-bound via :meth:`~fiwi.concentrator.FiWiConcentrator.patch_panel`
``.heads()``, then any other mapped ``fiber_ports`` rows (off-panel fiber ids).
Uses :class:`fiwi.radiohead.RadioHead` ``power`` / ``current`` / ``voltage`` (BrainStem or SSH
``port-metrics-json`` per head).
"""
from fiwi import fiber_map_io as fm
from fiwi.radiohead import RadioHead, RadioHeadEntry
print("Radio heads (all mapped fiber_ports → RadioHead)", flush=True)
print("-" * len(_RADIO_HEADS_HDR), flush=True)
doc = fm.load_fiber_map_document()
if not doc:
print(" (no fiber_map.json — skip.)", flush=True)
print(flush=True)
return
try:
from fiwi.site_setup import read_fiwi_site
site = read_fiwi_site(doc)
if site:
cn = site.get("concentrator_name") or ""
cl = site.get("concentrator_location") or ""
print(
f" Fi-Wi concentrator: {cn} | Location: {cl}",
flush=True,
)
bound = c.patch_panel(doc)
pp = bound.panel
name_s = pp.label.strip() if pp.label.strip() else ""
loc_s = pp.location.strip() if pp.location.strip() else ""
print(
f" Patch panel name: {name_s} | Location: {loc_s} | Ports: {pp.slots}",
flush=True,
)
print(flush=True)
heads: list[RadioHead] = list(bound.heads())
seen_map_keys = {rh.map_entry.map_key for rh in heads}
for ent in RadioHeadEntry.each_from_document(doc):
if not ent.is_mapped() or ent.map_key in seen_map_keys:
continue
heads.append(RadioHead(ent, c))
seen_map_keys.add(ent.map_key)
def _rh_sort_key(rh: RadioHead) -> tuple:
p = rh.patch_panel_port
if p is not None:
return (0, p, rh.map_entry.map_key)
mk = rh.map_entry.map_key
if mk.isdigit():
return (1, int(mk), mk)
return (2, mk, mk)
heads.sort(key=_rh_sort_key)
except (OSError, ValueError) as exc:
print(f" ! Could not load patch panel / map: {exc}", flush=True)
print(flush=True)
return
if not heads:
print(" (no mapped fiber_ports entries.)", flush=True)
print(flush=True)
return
print(_RADIO_HEADS_HDR, flush=True)
print("-" * len(_RADIO_HEADS_HDR), flush=True)
for rh in heads:
panel_s = str(rh.patch_panel_port) if rh.patch_panel_port is not None else ""
chip = rh.chip_type or ""
if len(chip) > 28:
chip = chip[:25] + "..."
hp = rh.map_entry.hub_port()
hub_pt = f"{hp[0]}.{hp[1]}" if hp else ""
rh.refresh_live()
pwr_b = rh.power
if pwr_b is True:
pwr_s = "ON"
elif pwr_b is False:
pwr_s = "OFF"
else:
pwr_s = "?"
cur = rh.current
ma_s = f"{cur:.2f}" if isinstance(cur, (int, float)) else ""
vv = rh.voltage
v_s = f"{vv:.3f}" if isinstance(vv, (int, float)) else ""
print(
f"{panel_s:<8} | {chip:<28} | {pwr_s:<5} | {ma_s:>10} | {v_s:>8} | {hub_pt:<8}",
flush=True,
)
print(flush=True)
def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow], base_rc: int) -> int:
"""Print consolidated hub table and port power / current / voltage table. Returns updated rc."""
rc = base_rc
print("USB power-control hubs (consolidated)", flush=True)
print("-" * len(_CONSOLIDATED_HUB_HDR), flush=True)
hn = socket.gethostname()
print(f" Local host: {hn} | Remote resolution: IPv4 when possible, else hostname", flush=True)
print(flush=True)
if not hub_rows:
print(
" (no hubs) — none on this machine and none parsed from remotes.",
flush=True,
)
print(flush=True)
return rc
print(_CONSOLIDATED_HUB_HDR, flush=True)
print("-" * len(_CONSOLIDATED_HUB_HDR), flush=True)
for row in hub_rows:
ht = row.hub_type[:_HUB_TYPE_COL_W] if len(row.hub_type) > _HUB_TYPE_COL_W else row.hub_type
print(
f"{row.idx:<4} | {ht:<{_HUB_TYPE_COL_W}} | {row.serial:<12} | {row.ports_display:<10} | "
f"{_location_cell(row):<28} | {_usb_cell(row)}",
flush=True,
)
print(flush=True)
rc = _print_per_port_power_table(c, hub_rows, rc)
return rc
def _print_pcie_catalog_section() -> None:
from fiwi.adnacom_pcie_catalog import print_adnacom_host_card_table
print("--- PCIe host-card catalog (reference) ---", flush=True)
print(flush=True)
print_adnacom_host_card_table()
print(flush=True)
def _prepend_tests_dir_to_syspath() -> None:
td = os.path.join(_ROOT, "tests")
if td not in sys.path:
sys.path.insert(0, td)
def _run_inrush_probe(c: FiWiConcentrator, args: argparse.Namespace) -> int:
"""Run :mod:`check_inrush` measurement after the hub report; **local** hubs only."""
assert args.inrush is not None
hub_1, port_0 = int(args.inrush[0]), int(args.inrush[1])
if hub_1 < 1:
print("check_concentrator --inrush: hub must be >= 1", file=sys.stderr, flush=True)
return 2
if port_0 < 0:
print("check_concentrator --inrush: port must be >= 0", file=sys.stderr, flush=True)
return 2
_prepend_tests_dir_to_syspath()
import check_inrush as ci # noqa: E402
if not c.hubs:
if not c.connect():
print(
"check_concentrator --inrush: no local USB power-control hubs connected.",
file=sys.stderr,
flush=True,
)
return 1
hi = hub_1 - 1
if hi < 0 or hi >= len(c.hubs):
print(
f"check_concentrator --inrush: hub {hub_1} invalid (have {len(c.hubs)} hub(s)).",
file=sys.stderr,
flush=True,
)
return 1
stem = c.hubs[hi]
n = c._port_count(stem)
if port_0 < 0 or port_0 >= n:
print(
f"check_concentrator --inrush: port {port_0} out of range for hub {hub_1} (0..{n - 1}).",
file=sys.stderr,
flush=True,
)
return 1
if not args.inrush_host_sample and port_0 != 0:
print(
"check_concentrator --inrush: on-hub reflex/inrush.reflex monitors port 0 only; "
f"you asked for port {port_0}. Recompile Reflex or use --inrush-host-sample.",
file=sys.stderr,
flush=True,
)
off_s = max(args.inrush_off_ms / 1000.0, 0.0)
sample_s = max(args.inrush_sample_ms / 1000.0, 0.01)
interval_s = max(args.inrush_interval_ms / 1000.0, 0.0005)
power_cycle = not args.inrush_no_power_cycle
if args.inrush_host_sample:
out = ci._measure_inrush_host(
c,
hi,
port_0,
off_s=off_s,
sample_s=sample_s,
interval_s=interval_s,
threshold_ma=args.inrush_threshold_ma,
power_cycle=power_cycle,
)
else:
out = ci._measure_inrush_on_hub(
c,
hi,
port_0,
off_s=off_s,
sample_s=sample_s,
power_cycle=power_cycle,
store_index=args.inrush_map_store_index,
map_slot=args.inrush_map_slot,
pointer_index=args.inrush_pointer_index,
rearm_map=not args.inrush_no_rearm_map,
)
if out.get("error") == "rearm_map_failed":
print(
"check_concentrator --inrush: could not re-arm map (store slotDisable/Enable failed). "
"Load reflex/inrush.map into --inrush-map-store-index / --inrush-map-slot, "
"or try --inrush-no-rearm-map.",
file=sys.stderr,
flush=True,
)
return 1
if out.get("error") == "scratchpad_read_failed":
print(
"check_concentrator --inrush: scratchpad read failed (pointer offset / hub type). "
"Confirm inrush.map is loaded, mapEnable ran, and pointer index matches Reflex.",
file=sys.stderr,
flush=True,
)
return 1
print(flush=True)
print("--- USB inrush (--inrush) ---", flush=True)
print(flush=True)
if args.inrush_json:
print(json.dumps(out), flush=True)
else:
ptr_idx = args.inrush_pointer_index
if out.get("mode") == "on-hub":
print(
f"Hub {out['hub']} port {out['port']} serial {out['serial']}",
flush=True,
)
print(f" mode: on-hub reflex (scratchpad via pointer {ptr_idx})", flush=True)
print(
f" peak current: {out['peak_ma']} mA (peak_ua={out['peak_ua']})",
flush=True,
)
print(
f" hub ticks: {out['ticks']} @ {out['sample_period_us']} µs → "
f"~{out['observation_us']} µs on timer grid",
flush=True,
)
print(
f" > {ci._REFLEX_THRESHOLD_MA} mA (reflex): ~{out['above_threshold_us']} µs "
f"({out['above_threshold_ticks']} ticks)",
flush=True,
)
print(
f" host wait: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']} "
f"store[{out['map_store_index']}] slot {out['map_slot']}",
flush=True,
)
if out["ticks"] == 0 and out.get("sample_period_us", 0) == 0:
print(
" ! ticks and sample_period_us are 0 — map likely not running or wrong pointer.",
file=sys.stderr,
flush=True,
)
else:
fat = out["first_above_threshold_ms"]
fat_s = f"{fat} ms" if fat is not None else "n/a"
print(
f"Hub {out['hub']} port {out['port']} serial {out['serial']}",
flush=True,
)
print(" mode: host-sample (getPortCurrent loop)", flush=True)
print(
f" peak current: {out['peak_ma']} mA "
f"(when max first rose: {out['peak_time_ms']} ms)",
flush=True,
)
print(
f" first > {out['above_threshold_ma']} mA: {fat_s}",
flush=True,
)
print(
f" > {out['above_threshold_ma']} mA for ~{out['above_threshold_ms']} ms "
f"({out['sample_count']} samples @ {out['interval_ms']} ms; wall {out['elapsed_ms']} ms)",
flush=True,
)
print(
f" nominal window: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']}",
flush=True,
)
print(flush=True)
return 0
def _print_consolidated_report(c: FiWiConcentrator) -> int:
_print_ssh_and_hosts_summary()
hub_rows, remote_rc = _build_consolidated_hub_rows(c)
hub_rows = _enrich_hub_rows_usb(hub_rows)
remote_rc = _print_usb_hub_tables(c, hub_rows, remote_rc)
_print_radio_heads_section(c)
_print_pcie_catalog_section()
return remote_rc
def test_concentrator() -> None:
c = _instantiate_concentrator()
c.disconnect()
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description=(
"FiWiConcentrator check: consolidated USB hub table, optional inrush probe, "
"or interactive panel calibrate (fiber_map.json — hub/port, chip_type, patch panel)."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"PROFILE selects <repo>/config/<PROFILE>.ini (e.g. uax24, uax4, default).\n"
"Same as environment variable FIWI_CONFIG."
),
)
p.add_argument(
"-c",
"--config",
metavar="PROFILE_OR_INI",
help="INI profile or absolute path (sets FIWI_CONFIG for this run)",
)
p.add_argument(
"--powercycle",
action="store_true",
help=(
"Self-test: all downstream ports OFF then ON on local hubs and each merged "
"remote hub host; verifies via port metrics; then print port power table "
"(disrupts USB power)."
),
)
p.add_argument(
"--inrush",
nargs=2,
type=int,
metavar=("HUB", "PORT"),
help=(
"After the report (or after --powercycle), run USB inrush on this **local** hub "
"(1-based) and downstream port (0-based). Same behavior as tests/check_inrush.py."
),
)
ir = p.add_argument_group("inrush options (only with --inrush)")
ir.add_argument(
"--inrush-host-sample",
action="store_true",
help="Poll getPortCurrent on the host instead of on-hub Reflex scratchpad",
)
ir.add_argument(
"--inrush-json",
action="store_true",
help="Print one JSON object for the inrush result",
)
ir.add_argument(
"--inrush-off-ms",
type=float,
default=250.0,
help="ms downstream port off before re-enable when power-cycling (default 250)",
)
ir.add_argument(
"--inrush-sample-ms",
type=float,
default=500.0,
help="Host wait after re-arm / power-on while hub samples (default 500)",
)
ir.add_argument(
"--inrush-interval-ms",
type=float,
default=1.0,
help="(host-sample) ms between getPortCurrent reads (default 1)",
)
ir.add_argument(
"--inrush-threshold-ma",
type=float,
default=50.0,
help="(host-sample) threshold mA for above-threshold tally (default 50)",
)
ir.add_argument(
"--inrush-no-power-cycle",
action="store_true",
help="Do not disable/enable the downstream port before measure",
)
ir.add_argument(
"--inrush-map-store-index",
type=int,
default=1,
help="stem.store index for slotDisable/Enable (default 1)",
)
ir.add_argument(
"--inrush-map-slot",
type=int,
default=0,
help="Store slot where inrush.map is loaded (default 0)",
)
ir.add_argument(
"--inrush-pointer-index",
type=int,
default=0,
help="stem.pointer index for Reflex scratchpad (default 0)",
)
ir.add_argument(
"--inrush-no-rearm-map",
action="store_true",
help="Do not slotDisable/slotEnable before measure (on-hub mode)",
)
p.add_argument(
"--panel-calibrate",
action="store_true",
help=(
"Interactive fiber_map.json build: patch panel slots, walk each USB downstream port "
"(one powered at a time), capture wlan+lspci for chip_type, map hub.port → panel id. "
"Same as 'python3 fiwi.py panel calibrate'. Requires a TTY; not with --powercycle/--inrush."
),
)
cal = p.add_argument_group("panel calibrate (only with --panel-calibrate)")
cal.add_argument(
"--calibrate-merge",
action="store_true",
help="Merge into existing fiber_map.json instead of clearing fiber_ports",
)
cal.add_argument(
"--calibrate-limit",
type=int,
default=None,
metavar="N",
help="Stop after the first N USB calibrate steps (hub.port walks)",
)
cal.add_argument(
"--calibrate-ssh",
action="append",
default=None,
metavar="USER@HOST",
help="Remote hub host for calibrate steps (repeat for several); also uses calibrate_remotes / env",
)
return p.parse_args()
def _calibrate_flags_without_panel(args: argparse.Namespace) -> bool:
if args.panel_calibrate:
return False
if args.calibrate_merge:
return True
if args.calibrate_limit is not None:
return True
if args.calibrate_ssh:
return True
return False
def _main_panel_calibrate(args: argparse.Namespace, label: str) -> int:
"""Run :meth:`FiWiConcentrator.panel_calibrate` (writes ``fiber_map.json``)."""
if args.powercycle or args.inrush is not None:
print(
"check_concentrator: --panel-calibrate cannot be combined with --powercycle or --inrush.",
file=sys.stderr,
flush=True,
)
return 2
if not sys.stdin.isatty():
print(
"check_concentrator: --panel-calibrate needs an interactive TTY (e.g. ssh -t host).",
file=sys.stderr,
flush=True,
)
return 2
_ensure_paths_configured()
from fiwi.brainstem_loader import load_brainstem
print(_rule("="), flush=True)
print(f"Fi-Wi panel calibrate → fiber_map.json [config: {label}]", flush=True)
print(_rule("="), flush=True)
print(flush=True)
print(
"Maps each USB hub downstream port to a patch-panel fiber id; records wlan snapshot "
"(sysfs + lspci/iw) as chip_type / wlan, optional PCIe metadata.\n"
"Keys: s / skip / . = skip port · q / quit / exit = stop · Ctrl-C = save fiber_map.json & exit.\n"
"Equivalent to: python3 fiwi.py panel calibrate "
+ ("merge " if args.calibrate_merge else "")
+ (f"{args.calibrate_limit} " if args.calibrate_limit is not None else "")
+ " ".join(f"--ssh {h}" for h in (args.calibrate_ssh or []))
+ "\n",
flush=True,
)
c = None
try:
load_brainstem()
c = _instantiate_concentrator()
c.panel_calibrate(
merge=args.calibrate_merge,
limit=args.calibrate_limit,
calibrate_ssh_hosts=list(args.calibrate_ssh or []),
)
except Exception as exc:
print(f"FAIL: {exc}", file=sys.stderr, flush=True)
return 1
finally:
if c is not None:
try:
c.disconnect()
except Exception:
pass
print(_rule("="), flush=True)
print(f"Panel calibrate finished [config: {label}]", flush=True)
print(_rule("="), flush=True)
return 0
def main() -> int:
try:
os.chdir(_ROOT)
except OSError as exc:
print(f"FAIL: cannot chdir to repo root {_ROOT!r}: {exc}", file=sys.stderr)
return 1
args = _parse_args()
if args.config:
os.environ["FIWI_CONFIG"] = args.config.strip()
label = os.environ.get("FIWI_CONFIG", "default (config/default.ini if present)")
if _calibrate_flags_without_panel(args):
print(
"check_concentrator: --calibrate-merge / --calibrate-limit / --calibrate-ssh "
"require --panel-calibrate.",
file=sys.stderr,
flush=True,
)
return 2
if args.panel_calibrate:
return _main_panel_calibrate(args, label)
_print_banner(label)
c = None
remote_fail = 0
try:
c = _instantiate_concentrator()
if args.powercycle:
remote_fail = _run_powercycle_test(c)
print(flush=True)
hub_rows, brc = _build_consolidated_hub_rows(c)
remote_fail = remote_fail or brc
hub_rows = _enrich_hub_rows_usb(hub_rows)
remote_fail = _print_per_port_power_table(c, hub_rows, remote_fail)
_print_radio_heads_section(c)
else:
remote_fail = _print_consolidated_report(c)
if args.inrush is not None:
remote_fail = remote_fail or _run_inrush_probe(c, args)
except AssertionError as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
except Exception as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
finally:
if c is not None:
try:
c.disconnect()
except Exception:
pass
print(_rule("="), flush=True)
if args.powercycle:
tag = "Power-cycle self-test OK" if remote_fail == 0 else "Power-cycle self-test FAILED"
print(f"{tag} [config: {label}]", flush=True)
else:
print(f"FiWiConcentrator() OK [config: {label}]", flush=True)
if remote_fail != 0:
print(" (one or more remote hub queries had errors — see above)", flush=True)
print(_rule("="), flush=True)
return remote_fail
if __name__ == "__main__":
raise SystemExit(main())