FiWiManager/tests/check_concentrator.py

947 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Smoke check: :class:`fiwi.concentrator.FiWiConcentrator` plus consolidated USB hub and per-port
metrics tables covering **this machine** and each configured **remote**. The consolidated hub table
has **Location** (``local`` or remote IP/hostname, plus **tty** names when known) and **USB**
(Bus/Device, VID:PID, product from sysfs). The per-port table starts with **Panel** (``fiber_ports``
key / rack position when ``hub``, ``port``, and optional ``ssh`` match this run), rows sorted by panel
number; a **Power: Total … W / … mA (per port follows)** line heads the section, then the column header
and rows.
(no USB column).
Adnacom PCIe catalog last.
Requires BrainStem locally for hub enumeration on this host. Remotes use SSH ``show_hostcards``
and ``port-metrics-json``; the remote tree must include that command (same revision as here).
**Standalone**::
python tests/check_concentrator.py
python tests/check_concentrator.py --config uax24
``--config`` matches ``FIWI_CONFIG`` (profile or absolute ``*.ini``).
``--powercycle`` runs a destructive self-test on **local** USB hubs and each host in
``FIWI_CALIBRATE_REMOTES`` / merged hub hosts: all ports OFF (verify), then all ON (verify),
then prints the port power table (snapshot after the test).
With pytest::
FIWI_CONFIG=uax24 pytest tests/check_concentrator.py
"""
from __future__ import annotations
import argparse
import json
import os
import re
import socket
import sys
import time
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fiwi.concentrator import FiWiConcentrator
_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
_WIDTH = 62
_paths_configured = False
_CONSOLIDATED_HUB_HDR = (
f"{'#':<4} | {'Serial':<12} | {'Ports':<10} | {'Location':<28} | USB (Bus / ID / product)"
)
_PER_PORT_PWR_HDR = (
f"{'Panel':<8} | {'Hub#':<4} | {'Serial':<12} | {'Pt':<3} | {'Pwr':<5} | "
f"{'mA':>8} | {'V':>8} | Location"
)
_HUB_TABLE_ROW_RE = re.compile(
r"^\s*\d+\s+\|\s+(0x[0-9A-Fa-f]+)\s+\|\s+(.+)\s*$"
)
@dataclass(frozen=True)
class ConsolidatedHubRow:
"""One row of the consolidated hub table (local or remote)."""
idx: int
serial: str
ports_display: str
n_ports: int
location: str
ssh_target: str | None
#: e.g. ``ttyUSB0`` or ``ttyUSB0+ttyACM0`` from sysfs (24ff hubs); empty if unknown.
usb_tty_hint: str = ""
#: sysfs / ``lsusb``-style line: Bus/Dev, ID ``24ff:…``, product string.
usb_identity_hint: str = ""
def _rule(char: str = "-") -> str:
return char * _WIDTH
def _ensure_paths_configured() -> None:
global _paths_configured
if _paths_configured:
return
import fiwi.paths as paths_mod
paths_mod.configure(_ROOT)
_paths_configured = True
def _instantiate_concentrator() -> FiWiConcentrator:
_ensure_paths_configured()
from fiwi.concentrator import FiWiConcentrator
c = FiWiConcentrator()
assert c.hubs == [], "new concentrator should have no connected hubs yet"
return c
def _remote_location_label(ssh_target: str) -> str:
"""IPv4 for ``user@host`` when possible; otherwise the host part."""
if "@" not in ssh_target:
host = ssh_target.strip()
else:
host = ssh_target.split("@", 1)[1].strip()
if re.fullmatch(r"(?:\d{1,3}\.){3}\d{1,3}", host):
return host
try:
return socket.gethostbyname(host)
except OSError:
return host
def _collect_local_hub_rows(c: FiWiConcentrator) -> list[tuple[str, str]]:
"""
``(serial_hex, ports_display)`` per local USB hub, same rules as
:meth:`fiwi.concentrator.FiWiConcentrator._print_usb_hub_summary_table`.
"""
specs = c._enumerate_usb_specs()
if not specs:
return []
if not c.hubs:
c._connect_specs(specs)
by_sn = c._serial_to_opened_port_count()
out: list[tuple[str, str]] = []
for spec in specs:
sn = spec.serial_number
sn_s = f"0x{sn:08X}"
if sn in by_sn:
ports = str(by_sn[sn])
else:
inf = c._inferred_downstream_ports_from_spec(spec)
ports = str(inf) if inf is not None else "?"
ports += " *"
out.append((sn_s, ports))
return out
def _parse_hub_table_from_show_hostcards_stdout(text: str) -> list[tuple[str, str]]:
"""Extract ``(serial, ports)`` from ``show_hostcards`` printed table."""
lines = text.splitlines()
in_table = False
rows: list[tuple[str, str]] = []
for ln in lines:
if "Hub" in ln and "Serial" in ln and "Ports" in ln and "|" in ln:
in_table = True
continue
if not in_table:
continue
if re.match(r"^\s*-+\s*$", ln):
continue
if not ln.strip():
break
if ln.strip().startswith("*"):
break
m = _HUB_TABLE_ROW_RE.match(ln)
if m:
rows.append((m.group(1), m.group(2).strip()))
elif rows:
break
return rows
def _print_banner(config_label: str) -> None:
print(_rule("="), flush=True)
print(f"Fi-Wi concentrator check [config: {config_label}]", flush=True)
print(_rule("="), flush=True)
print(flush=True)
def _print_ssh_and_hosts_summary() -> None:
from fiwi.ssh import SshNodeConfig
cfg = SshNodeConfig.load()
raw = (cfg.calibrate_remotes or "").strip()
print("SSH / remote hub routing (after paths + INI)", flush=True)
print(_rule(), flush=True)
print(f" FIWI_REMOTE_PYTHON → {cfg.python}", flush=True)
print(f" FIWI_REMOTE_SCRIPT → {cfg.script}", flush=True)
print(f" FIWI_SSH_BIN {cfg.ssh_bin}", flush=True)
if cfg.ssh_extra_argv:
print(f" FIWI_SSH_OPTS {' '.join(cfg.ssh_extra_argv)}", flush=True)
print(f" merged hub hosts {raw or '(none)'}", flush=True)
print(_rule(), flush=True)
print(flush=True)
def _parse_port_count(ports_str: str) -> int:
"""Integer downstream count from hub table ``Ports`` cell (strip trailing ``*``)."""
s = ports_str.strip().replace("*", "").strip()
if not s or s == "?":
return 0
try:
return max(0, int(s))
except ValueError:
return 0
def _norm_hub_serial(s: str) -> str:
return s.strip().upper()
def _hub_1based_on_host(hub_rows: list[ConsolidatedHubRow], row: ConsolidatedHubRow) -> int:
"""Hub index (1-based) for ``row`` among hubs on the same host (local vs same ``ssh_target``)."""
same = [r for r in hub_rows if r.ssh_target == row.ssh_target]
same.sort(key=lambda r: r.idx)
for i, r in enumerate(same, start=1):
if r.idx == row.idx:
return i
return 0
def _fiber_map_panel_lookup() -> dict[tuple[str | None, int, int], str]:
"""
Reverse map: (ssh target or ``None`` for local, hub 1-based on that host, port 0-based)
→ fiber_map ``fiber_ports`` key (panel / fiber id string).
SSH strings must match ``fiber_map.json`` (``ssh`` / ``remote`` / ``host``+``user``) exactly
where used; local entries must omit those fields.
"""
from fiwi import fiber_map_io as fm
doc = fm.load_fiber_map_document()
if not doc:
return {}
ports = doc.get("fiber_ports")
if not isinstance(ports, dict):
return {}
out: dict[tuple[str | None, int, int], str] = {}
for map_key, ent in ports.items():
if not isinstance(ent, dict):
continue
hp = fm.fiber_entry_hub_port(ent)
if hp is None:
continue
h1, p0 = hp
ssh = fm.fiber_ssh_target(ent)
sk: str | None = ssh.strip() if isinstance(ssh, str) and ssh.strip() else None
key = (sk, h1, p0)
prev = out.get(key)
mk = str(map_key)
if prev is not None and prev != mk:
out[key] = f"{prev},{mk}"
else:
out[key] = mk
return out
def _panel_cell(
lookup: dict[tuple[str | None, int, int], str],
hub_rows: list[ConsolidatedHubRow],
row: ConsolidatedHubRow,
port_0: int,
) -> str:
h1 = _hub_1based_on_host(hub_rows, row)
if h1 <= 0:
return ""
sk: str | None = row.ssh_target.strip() if row.ssh_target else None
return lookup.get((sk, h1, port_0), "")
def _panel_sort_tuple(pnl: str, hub_table_idx: int, port_0: int) -> tuple:
"""
Sort per-port rows by patch panel label: numeric ``fiber_ports`` keys first (min if comma-list),
then other labels, unmapped (—) last; tie-break hub # and port.
"""
s = (pnl or "").strip()
if not s or s == "":
return (2, 0, hub_table_idx, port_0)
nums: list[int] = []
for part in re.split(r"[\s,]+", s):
if part.isdigit():
nums.append(int(part))
if nums:
return (0, min(nums), hub_table_idx, port_0)
return (1, s, hub_table_idx, port_0)
def _merged_hub_hosts() -> list[str]:
from fiwi.ssh import SshNodeConfig
raw = (SshNodeConfig.load().calibrate_remotes or "").strip()
seen: list[str] = []
for part in raw.split(","):
h = part.strip()
if h and h not in seen:
seen.append(h)
return seen
def _build_consolidated_hub_rows(c: FiWiConcentrator) -> tuple[list[ConsolidatedHubRow], int]:
"""
Build consolidated hub rows (same order as the printed summary table).
Returns ``(rows, rc)`` where ``rc`` is 0 if every remote ``show_hostcards`` succeeded.
"""
from fiwi.ssh import SshNode
rc = 0
rows: list[ConsolidatedHubRow] = []
def _push(serial: str, ports_disp: str, loc: str, ssh_target: str | None) -> None:
rows.append(
ConsolidatedHubRow(
idx=len(rows) + 1,
serial=serial,
ports_display=ports_disp,
n_ports=_parse_port_count(ports_disp),
location=loc,
ssh_target=ssh_target,
)
)
for serial, ports in _collect_local_hub_rows(c):
_push(serial, ports, "local", None)
hosts = _merged_hub_hosts()
if not hosts:
print(" (no remote hosts in config — only local hubs listed.)", flush=True)
print(flush=True)
for host in hosts:
loc = _remote_location_label(host)
try:
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["show_hostcards"], timeout=90, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote {host} ({loc}): show_hostcards exit {code}", flush=True)
rc = 1
continue
parsed = _parse_hub_table_from_show_hostcards_stdout(out)
if not parsed:
print(
f" ! Remote {host} ({loc}): no hub table in output "
f"(empty discover / parse miss).",
flush=True,
)
rc = 1
continue
for serial, ports in parsed:
_push(serial, ports, loc, host)
except Exception as exc:
print(f" ! Remote {host} ({loc}): {exc}", flush=True)
rc = 1
return rows, rc
def _local_usb_ent_by_serial() -> dict[str, dict[str, object]]:
"""Normalized hub serial → identity dict from :func:`fiwi.usb_probe.usb_acroname_hub_identity_list`."""
from fiwi import usb_probe as usb_mod
out: dict[str, dict[str, object]] = {}
for ent in usb_mod.usb_acroname_hub_identity_list():
if not isinstance(ent, dict):
continue
sn = _norm_hub_serial(str(ent.get("serial", "")))
if sn:
out[sn] = ent
return out
def _fetch_remote_usb_ent_by_serial(host: str) -> dict[str, dict[str, object]]:
from fiwi.ssh import SshNode
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["usb-hub-tty-json"], timeout=45, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
return {}
try:
data = json.loads(out.strip() or "[]")
except json.JSONDecodeError:
return {}
if not isinstance(data, list):
return {}
out_map: dict[str, dict[str, object]] = {}
for raw in data:
if not isinstance(raw, dict):
continue
sn = _norm_hub_serial(str(raw.get("serial", "")))
if sn:
out_map[sn] = raw
return out_map
def _usb_identity_display(ent: dict[str, object] | None, *, max_product: int = 42) -> str:
"""Human-readable USB row (matches short ``lsusb`` intent; values come from sysfs)."""
if not ent:
return ""
bus = ent.get("bus")
dev = ent.get("dev")
vp = str(ent.get("id") or "").strip()
prod = str(ent.get("product") or "").replace("\n", " ").strip()
if not prod:
prod = str(ent.get("manufacturer") or "").replace("\n", " ").strip()
if len(prod) > max_product:
prod = prod[: max_product - 3] + "..."
bits: list[str] = []
if isinstance(bus, int) and isinstance(dev, int):
bits.append(f"Bus {bus:03d} Dev {dev:03d}")
if vp:
bits.append(f"ID {vp}")
if prod:
bits.append(prod)
return " · ".join(bits)
def _enrich_hub_rows_usb(rows: list[ConsolidatedHubRow]) -> list[ConsolidatedHubRow]:
"""Fill ``usb_tty_hint`` and ``usb_identity_hint`` from local sysfs or ``usb-hub-tty-json`` over SSH."""
local = _local_usb_ent_by_serial()
remote_maps: dict[str, dict[str, dict[str, object]]] = {}
out: list[ConsolidatedHubRow] = []
for row in rows:
ent: dict[str, object] | None = None
if row.ssh_target is None:
ent = local.get(_norm_hub_serial(row.serial))
else:
if row.ssh_target not in remote_maps:
remote_maps[row.ssh_target] = _fetch_remote_usb_ent_by_serial(row.ssh_target)
ent = remote_maps[row.ssh_target].get(_norm_hub_serial(row.serial))
tty_hint = ""
if ent:
ttys = ent.get("tty")
if isinstance(ttys, list):
tty_hint = "+".join(str(x) for x in ttys if isinstance(x, str))
usb_line = _usb_identity_display(ent)
out.append(
replace(
row,
usb_tty_hint=tty_hint,
usb_identity_hint=usb_line,
)
)
return out
def _location_cell(row: ConsolidatedHubRow) -> str:
if row.usb_tty_hint:
return f"{row.location} · {row.usb_tty_hint}"
return row.location
def _usb_cell(row: ConsolidatedHubRow) -> str:
return row.usb_identity_hint if row.usb_identity_hint else ""
def _index_metrics_by_serial_port(items: list[dict[str, object]]) -> dict[str, dict[int, dict[str, object]]]:
out: dict[str, dict[int, dict[str, object]]] = {}
for raw in items:
if not isinstance(raw, dict):
continue
sn = _norm_hub_serial(str(raw.get("serial", "")))
try:
p = int(raw["port"])
except (KeyError, TypeError, ValueError):
continue
if p < 0:
continue
out.setdefault(sn, {})[p] = raw
return out
def _remote_fiwi_missing_port_metrics_json(stdout: str, stderr: str, exit_code: int) -> bool:
"""True when the remote ``fiwi.py`` is too old and does not implement ``port-metrics-json``."""
blob = (stdout + "\n" + stderr).lower()
return exit_code == 2 and "unknown command" in blob and "port-metrics-json" in blob
def _print_remote_fiwi_upgrade_help(host: str, loc: str) -> None:
"""Tell the user how to refresh the remote so ``port-metrics-json`` exists."""
from fiwi.ssh import SshNodeConfig
cfg = SshNodeConfig.load()
print(
f" ! Remote {host} ({loc}): that hosts fiwi.py does not support port-metrics-json "
"(repository on the remote is behind this one).",
flush=True,
)
print(" Fix: deploy the same FiWiManager revision to the remote, then verify:", flush=True)
print(f" • FIWI_REMOTE_SCRIPT → {cfg.script!r}", flush=True)
print(f" • FIWI_REMOTE_PYTHON → {cfg.python!r}", flush=True)
print(" • On the remote, from a shell:", flush=True)
print(f" {cfg.python} {cfg.script} port-metrics-json", flush=True)
print(" Expect a JSON array on stdout, not “Unknown command”.", flush=True)
def _fetch_remote_port_metrics(host: str) -> tuple[int, list[dict[str, object]], str, str]:
"""Run ``port-metrics-json`` over SSH. Returns ``(exit_code, rows, stdout, stderr)``."""
from fiwi.ssh import SshNode
node = SshNode.parse(host)
code, out, err = node.invoke_capture(["port-metrics-json"], timeout=90, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
return code, [], out, err
try:
data = json.loads(out.strip() or "[]")
except json.JSONDecodeError:
return 1, [], out, err
if not isinstance(data, list):
return 1, [], out, err
typed: list[dict[str, object]] = [x for x in data if isinstance(x, dict)]
return 0, typed, out, err
_POWERCYCLE_SETTLE_SEC = 0.35
def _set_all_local_ports(c: FiWiConcentrator, on: bool) -> None:
"""Enable or disable every downstream port on every connected local hub."""
for stem in c.hubs:
n = c._port_count(stem)
for port in range(n):
if on:
stem.usb.setPortEnable(port)
else:
stem.usb.setPortDisable(port)
def _power_mismatches(
rows: list[dict[str, object]], want_on: bool, where: str
) -> list[str]:
"""Each row must have ``power`` of ON or OFF matching ``want_on``."""
want = "ON" if want_on else "OFF"
bad: list[str] = []
for row in rows:
pwr = row.get("power")
if pwr != want:
hub = row.get("hub", "?")
port = row.get("port", "?")
sn = row.get("serial", "?")
bad.append(
f"{where} hub {hub} port {port} serial {sn!r}: power={pwr!r} (expected {want})"
)
return bad
def _restore_all_ports_on(c: FiWiConcentrator, hosts: list[str]) -> None:
"""Best-effort: turn every local and configured-remote port on (cleanup after failed test)."""
from fiwi.ssh import SshNode
print(" (cleanup) Restoring all ports ON…", flush=True)
try:
if not c.hubs:
c.connect()
if c.hubs:
_set_all_local_ports(c, True)
except Exception as exc:
print(f" ! Local restore failed: {exc}", flush=True)
for host in hosts:
try:
code, _o, err = SshNode.parse(host).invoke_capture(
["on", "all"], timeout=120, defer=False
)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote {host}: on all restore exit {code}", flush=True)
except Exception as exc:
print(f" ! Remote {host}: on all restore: {exc}", flush=True)
def _run_powercycle_test(c: FiWiConcentrator) -> int:
"""
Turn all ports off, confirm via metrics; then on, confirm.
Covers local hubs (BrainStem) and each merged SSH hub host (``off``/``on all``).
"""
from fiwi.ssh import SshNode
hosts = _merged_hub_hosts()
if not c.hubs:
c.connect()
has_local = bool(c.hubs)
if not has_local and not hosts:
print("FAIL: no local USB hubs and no remote hub hosts configured.", flush=True)
return 1
print("Power-cycle self-test (disrupts USB power on local + remote hub hosts)", flush=True)
print(_rule(), flush=True)
if has_local:
print(f" Local: {len(c.hubs)} hub(s) on {socket.gethostname()}", flush=True)
else:
print(" Local: (no USB power hubs connected here)", flush=True)
if hosts:
print(f" Remote hub hosts: {', '.join(hosts)}", flush=True)
print(flush=True)
completed = False
settle = _POWERCYCLE_SETTLE_SEC
try:
if has_local:
print(" Step 1a — local: OFF all downstream ports…", flush=True)
_set_all_local_ports(c, False)
time.sleep(settle)
snap = c.port_metrics_snapshot()
if not snap:
print(" ! Local: no port-metrics after OFF (empty snapshot).", flush=True)
return 1
bad = _power_mismatches(snap, False, "local")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(" Step 1a — OK (all local ports report OFF).", flush=True)
for host in hosts:
loc = _remote_location_label(host)
print(f" Step 1b — remote {host} ({loc}): OFF all…", flush=True)
node = SshNode.parse(host)
code, _out, err = node.invoke_capture(["off", "all"], timeout=120, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote off all exit {code}", flush=True)
return 1
time.sleep(settle)
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
if code_m != 0:
print(f" ! Remote port-metrics-json exit {code_m} after OFF", flush=True)
return 1
if not rows:
print(f" ! Remote {host}: empty port-metrics after OFF.", flush=True)
return 1
bad = _power_mismatches(rows, False, f"remote {host}")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(f" Step 1b — OK ({host}: all OFF).", flush=True)
if has_local:
print(" Step 2a — local: ON all downstream ports…", flush=True)
_set_all_local_ports(c, True)
time.sleep(settle)
snap = c.port_metrics_snapshot()
if not snap:
print(" ! Local: no port-metrics after ON.", flush=True)
return 1
bad = _power_mismatches(snap, True, "local")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(" Step 2a — OK (all local ports report ON).", flush=True)
for host in hosts:
loc = _remote_location_label(host)
print(f" Step 2b — remote {host} ({loc}): ON all…", flush=True)
node = SshNode.parse(host)
code, _out, err = node.invoke_capture(["on", "all"], timeout=120, defer=False)
if err.strip():
print(err.rstrip(), file=sys.stderr, flush=True)
if code != 0:
print(f" ! Remote on all exit {code}", flush=True)
return 1
time.sleep(settle)
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
if code_m != 0:
print(f" ! Remote port-metrics-json exit {code_m} after ON", flush=True)
return 1
if not rows:
print(f" ! Remote {host}: empty port-metrics after ON.", flush=True)
return 1
bad = _power_mismatches(rows, True, f"remote {host}")
if bad:
for line in bad:
print(f" ! {line}", flush=True)
return 1
print(f" Step 2b — OK ({host}: all ON).", flush=True)
completed = True
print(flush=True)
print("Power-cycle self-test passed.", flush=True)
return 0
finally:
if not completed:
_restore_all_ports_on(c, hosts)
def _print_per_port_power_table(
c: FiWiConcentrator,
hub_rows: list[ConsolidatedHubRow],
base_rc: int,
) -> int:
"""
``Power: Total …`` line, then per-port current (mA), voltage (V), and Location (sorted by panel).
Returns ``base_rc`` ORd with failure if any ``port-metrics-json`` remote call fails.
"""
rc = base_rc
if not hub_rows:
print(" (no hubs — skipping port power table.)", flush=True)
print(flush=True)
return rc
panel_lookup = _fiber_map_panel_lookup()
local_by_sn: dict[str, dict[int, dict[str, object]]] = {}
if any(r.ssh_target is None for r in hub_rows):
local_by_sn = _index_metrics_by_serial_port(c.port_metrics_snapshot())
remote_cache: dict[str, dict[str, dict[int, dict[str, object]]]] = {}
remote_fail: set[str] = set()
remote_fail_detail: dict[str, tuple[int, str, str]] = {}
def _remote_index(ssh_target: str) -> dict[str, dict[int, dict[str, object]]]:
if ssh_target not in remote_cache:
code, payload, out, err = _fetch_remote_port_metrics(ssh_target)
if code == 0:
remote_cache[ssh_target] = _index_metrics_by_serial_port(payload)
else:
remote_fail.add(ssh_target)
remote_fail_detail[ssh_target] = (code, out, err)
remote_cache[ssh_target] = {}
return remote_cache[ssh_target]
# Collect rows; sum current (mA) and power (W) where metrics allow.
lines_out: list[tuple[tuple, str]] = []
total_ma = 0.0
n_ma = 0
total_power_w = 0.0
n_power = 0
for row in hub_rows:
sn_key = _norm_hub_serial(row.serial)
if row.ssh_target is None:
by_port = local_by_sn.get(sn_key, {})
else:
by_port = _remote_index(row.ssh_target).get(sn_key, {})
for port in range(row.n_ports):
m = by_port.get(port)
cur_f: float | None = None
v_f: float | None = None
if m is None:
pwr, ma_s, v_s = "?", "", ""
else:
pwr = str(m.get("power", "?"))
cur = m.get("current_ma")
if cur is None:
ma_s = ""
else:
try:
cur_f = float(cur)
ma_s = f"{cur_f:.2f}"
total_ma += cur_f
n_ma += 1
except (TypeError, ValueError):
ma_s = ""
vv = m.get("voltage_v")
if vv is None:
v_s = ""
else:
try:
v_f = float(vv)
v_s = f"{v_f:.3f}"
except (TypeError, ValueError):
v_s = ""
if cur_f is not None and v_f is not None:
total_power_w += v_f * (cur_f / 1000.0)
n_power += 1
pnl = _panel_cell(panel_lookup, hub_rows, row, port)
st = _panel_sort_tuple(pnl, row.idx, port)
line = (
f"{pnl:<8} | {row.idx:<4} | {row.serial:<12} | {port:<3} | {pwr:<5} | "
f"{ma_s:>8} | {v_s:>8} | {_location_cell(row)}"
)
lines_out.append((st, line))
if n_ma and n_power:
summary = (
f"Power: Total {total_power_w:.3f} W / {total_ma:.2f} mA (per port follows)"
)
elif n_ma:
summary = f"Power: Total — W / {total_ma:.2f} mA (per port follows)"
else:
summary = "Power: — (per port follows)"
print(summary, flush=True)
print("-" * len(_PER_PORT_PWR_HDR), flush=True)
print(_PER_PORT_PWR_HDR, flush=True)
print("-" * len(_PER_PORT_PWR_HDR), flush=True)
lines_out.sort(key=lambda x: x[0])
for _sk, line in lines_out:
print(line, flush=True)
print(flush=True)
for host in sorted(remote_fail):
loc = _remote_location_label(host)
code, out, err = remote_fail_detail[host]
if _remote_fiwi_missing_port_metrics_json(out, err, code):
_print_remote_fiwi_upgrade_help(host, loc)
else:
print(
f" ! Remote {host} ({loc}): port-metrics-json failed (exit {code}).",
flush=True,
)
rc = 1
return rc
def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow], base_rc: int) -> int:
"""Print consolidated hub table and port power / current / voltage table. Returns updated rc."""
rc = base_rc
print("USB power-control hubs (consolidated)", flush=True)
print("-" * len(_CONSOLIDATED_HUB_HDR), flush=True)
hn = socket.gethostname()
print(f" Local host: {hn} | Remote resolution: IPv4 when possible, else hostname", flush=True)
print(flush=True)
if not hub_rows:
print(
" (no hubs) — none on this machine and none parsed from remotes.",
flush=True,
)
print(flush=True)
return rc
print(_CONSOLIDATED_HUB_HDR, flush=True)
print("-" * len(_CONSOLIDATED_HUB_HDR), flush=True)
for row in hub_rows:
print(
f"{row.idx:<4} | {row.serial:<12} | {row.ports_display:<10} | {_location_cell(row):<28} | {_usb_cell(row)}",
flush=True,
)
print(flush=True)
rc = _print_per_port_power_table(c, hub_rows, rc)
return rc
def _print_pcie_catalog_section() -> None:
from fiwi.adnacom_pcie_catalog import print_adnacom_host_card_table
print("--- PCIe host-card catalog (reference) ---", flush=True)
print(flush=True)
print_adnacom_host_card_table()
print(flush=True)
def _print_consolidated_report(c: FiWiConcentrator) -> int:
_print_ssh_and_hosts_summary()
hub_rows, remote_rc = _build_consolidated_hub_rows(c)
hub_rows = _enrich_hub_rows_usb(hub_rows)
remote_rc = _print_usb_hub_tables(c, hub_rows, remote_rc)
_print_pcie_catalog_section()
return remote_rc
def test_concentrator() -> None:
c = _instantiate_concentrator()
c.disconnect()
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Check FiWiConcentrator: consolidated local + remote USB hub table.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"PROFILE selects <repo>/config/<PROFILE>.ini (e.g. uax24, uax4, default).\n"
"Same as environment variable FIWI_CONFIG."
),
)
p.add_argument(
"-c",
"--config",
metavar="PROFILE_OR_INI",
help="INI profile or absolute path (sets FIWI_CONFIG for this run)",
)
p.add_argument(
"--powercycle",
action="store_true",
help=(
"Self-test: all downstream ports OFF then ON on local hubs and each merged "
"remote hub host; verifies via port metrics; then print port power table "
"(disrupts USB power)."
),
)
return p.parse_args()
def main() -> int:
try:
os.chdir(_ROOT)
except OSError as exc:
print(f"FAIL: cannot chdir to repo root {_ROOT!r}: {exc}", file=sys.stderr)
return 1
args = _parse_args()
if args.config:
os.environ["FIWI_CONFIG"] = args.config.strip()
label = os.environ.get("FIWI_CONFIG", "default (config/default.ini if present)")
_print_banner(label)
c = None
remote_fail = 0
try:
c = _instantiate_concentrator()
if args.powercycle:
remote_fail = _run_powercycle_test(c)
print(flush=True)
hub_rows, brc = _build_consolidated_hub_rows(c)
remote_fail = remote_fail or brc
hub_rows = _enrich_hub_rows_usb(hub_rows)
remote_fail = _print_per_port_power_table(c, hub_rows, remote_fail)
else:
remote_fail = _print_consolidated_report(c)
except AssertionError as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
except Exception as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
finally:
if c is not None:
try:
c.disconnect()
except Exception:
pass
print(_rule("="), flush=True)
if args.powercycle:
tag = "Power-cycle self-test OK" if remote_fail == 0 else "Power-cycle self-test FAILED"
print(f"{tag} [config: {label}]", flush=True)
else:
print(f"FiWiConcentrator() OK [config: {label}]", flush=True)
if remote_fail != 0:
print(" (one or more remote hub queries had errors — see above)", flush=True)
print(_rule("="), flush=True)
return remote_fail
if __name__ == "__main__":
raise SystemExit(main())