check_concentrator: USB identity column, TTY in Location, powercycle test.
- usb_probe: sysfs scan for 24ff hubs (bus/dev, VID:PID, strings) plus ttyUSB/ttyACM; usb_acroname_hub_identity_list(); usb-hub-tty-json returns full records. - check_concentrator: Location stays host+TTY; new USB column (lsusb-style line); enrich via local sysfs and remote usb-hub-tty-json per SSH host. - Add --powercycle: all ports OFF/verify then ON/verify (local + remotes), restore on failure. Made-with: Cursor
This commit is contained in:
parent
64a3e5a604
commit
63bb205383
|
|
@ -115,6 +115,7 @@ def _print_cli_help() -> None:
|
|||
" discover — USB power-control hubs (serial, port count); no port I/O\n"
|
||||
" show_hostcards — same as discover, concentrator 'hostcards' label\n"
|
||||
" port-metrics-json — JSON list: per-port power, current (mA), voltage (V) on connected hubs\n"
|
||||
" usb-hub-tty-json — JSON: Acroname (24ff) hubs: serial, tty[], bus, dev, id, manufacturer, product\n"
|
||||
" show_radioheads — all fiber_ports rows + power (same table as fiber status)\n"
|
||||
" status [target] — default command; target like all, 1.3, all.2\n"
|
||||
" fiber status — fiber_ports + power (local or per-entry ssh / host+user)\n"
|
||||
|
|
@ -241,6 +242,8 @@ def main() -> int:
|
|||
print(json.dumps([[h, p] for h, p in pairs]), flush=True)
|
||||
elif cmd == "lsusb-lines-json":
|
||||
print(json.dumps(usb.lsusb_lines()), flush=True)
|
||||
elif cmd == "usb-hub-tty-json":
|
||||
print(json.dumps(usb.usb_acroname_hub_identity_list()), flush=True)
|
||||
elif cmd == "wlan-info-json":
|
||||
from fiwi.ieee80211_dev import discover_wireless_for_map
|
||||
|
||||
|
|
@ -370,7 +373,7 @@ def main() -> int:
|
|||
else:
|
||||
print(f"Unknown command: {cmd!r}", file=sys.stderr, flush=True)
|
||||
print(
|
||||
"Try: --ssh user@host … | discover | show_hostcards | port-metrics-json | show_radioheads | calibrate-ports-json | … | help",
|
||||
"Try: --ssh user@host … | discover | show_hostcards | port-metrics-json | usb-hub-tty-json | show_radioheads | calibrate-ports-json | … | help",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -3,8 +3,14 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Acroname / BrainStem USB power-control hubs (see udev examples in concentrator).
|
||||
_ACRONAME_USB_VID = 0x24FF
|
||||
|
||||
|
||||
def lsusb_lines():
|
||||
|
|
@ -84,3 +90,152 @@ def lsusb_acroname_lines():
|
|||
async def alsusb_acroname_lines() -> list[str]:
|
||||
lines = await alsusb_lines()
|
||||
return [ln for ln in lines if "24ff:" in ln.lower() or " acroname" in ln.lower()]
|
||||
|
||||
|
||||
def _parse_sysfs_usb_serial(text: str) -> int | None:
|
||||
t = text.strip()
|
||||
if not t:
|
||||
return None
|
||||
try:
|
||||
return int(t, 0)
|
||||
except ValueError:
|
||||
try:
|
||||
return int(t, 16)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _read_sysfs_int(path: Path) -> int | None:
|
||||
try:
|
||||
return int(path.read_text().strip(), 10)
|
||||
except (OSError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _read_sysfs_hex16(path: Path) -> int | None:
|
||||
try:
|
||||
return int(path.read_text().strip(), 16)
|
||||
except (OSError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _tty_nodes_by_acroname_serial() -> dict[int, list[str]]:
|
||||
"""Map hub serial (int) → ``ttyUSB*`` / ``ttyACM*`` names for 24ff devices."""
|
||||
by_serial: dict[int, set[str]] = {}
|
||||
for pattern in ("/sys/class/tty/ttyUSB*", "/sys/class/tty/ttyACM*"):
|
||||
for tty_class in sorted(glob.glob(pattern)):
|
||||
dev_link = Path(tty_class) / "device"
|
||||
if not dev_link.exists():
|
||||
continue
|
||||
usb_dev = _usb_device_dir_for_tty_class_device(dev_link)
|
||||
if usb_dev is None:
|
||||
continue
|
||||
try:
|
||||
vid = int((usb_dev / "idVendor").read_text().strip(), 16)
|
||||
except (ValueError, OSError):
|
||||
continue
|
||||
if vid != _ACRONAME_USB_VID:
|
||||
continue
|
||||
try:
|
||||
sn_raw = (usb_dev / "serial").read_text()
|
||||
except OSError:
|
||||
continue
|
||||
sn = _parse_sysfs_usb_serial(sn_raw)
|
||||
if sn is None:
|
||||
continue
|
||||
by_serial.setdefault(sn, set()).add(os.path.basename(tty_class))
|
||||
return {k: sorted(v) for k, v in by_serial.items()}
|
||||
|
||||
|
||||
def _usb_device_dir_for_tty_class_device(device_link: Path) -> Path | None:
|
||||
"""
|
||||
From ``/sys/class/tty/ttyUSBn/device``, walk up to the USB *device* directory
|
||||
(the one that has ``idVendor`` / ``serial``, not the ``*:1.0`` interface).
|
||||
"""
|
||||
try:
|
||||
p = device_link.resolve()
|
||||
except OSError:
|
||||
return None
|
||||
for _ in range(20):
|
||||
vendor = p / "idVendor"
|
||||
serial = p / "serial"
|
||||
if vendor.is_file() and serial.is_file():
|
||||
return p
|
||||
if p == p.parent:
|
||||
break
|
||||
p = p.parent
|
||||
return None
|
||||
|
||||
|
||||
def usb_acroname_hub_identity_list() -> list[dict[str, object]]:
|
||||
"""
|
||||
One record per Acroname (24ff) USB device with a ``serial`` sysfs file — same facts as a short
|
||||
``lsusb`` line (bus, device, ID, strings), plus matching ``ttyUSB*`` / ``ttyACM*`` names when
|
||||
the hub exposes a serial TTY.
|
||||
|
||||
Keys: ``serial``, ``tty`` (list), ``bus``, ``dev``, ``id`` (``24ff:xxxx``), ``manufacturer``,
|
||||
``product``.
|
||||
"""
|
||||
tty_by_sn = _tty_nodes_by_acroname_serial()
|
||||
out_map: dict[int, dict[str, object]] = {}
|
||||
base = Path("/sys/bus/usb/devices")
|
||||
try:
|
||||
entries = sorted(base.iterdir(), key=lambda p: p.name)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
for p in entries:
|
||||
if not p.is_dir():
|
||||
continue
|
||||
if ":" in p.name:
|
||||
continue
|
||||
vid_f, ser_f = p / "idVendor", p / "serial"
|
||||
if not vid_f.is_file() or not ser_f.is_file():
|
||||
continue
|
||||
vid = _read_sysfs_hex16(vid_f)
|
||||
if vid is None or vid != _ACRONAME_USB_VID:
|
||||
continue
|
||||
sn = _parse_sysfs_usb_serial(ser_f.read_text())
|
||||
if sn is None:
|
||||
continue
|
||||
pid = _read_sysfs_hex16(p / "idProduct")
|
||||
bus = _read_sysfs_int(p / "busnum")
|
||||
dev = _read_sysfs_int(p / "devnum")
|
||||
man = ""
|
||||
prod = ""
|
||||
try:
|
||||
mf = p / "manufacturer"
|
||||
if mf.is_file():
|
||||
man = mf.read_text().strip()
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
pf = p / "product"
|
||||
if pf.is_file():
|
||||
prod = pf.read_text().strip()
|
||||
except OSError:
|
||||
pass
|
||||
vp = f"{vid:04x}:{pid:04x}" if pid is not None else f"{vid:04x}:????"
|
||||
out_map[sn] = {
|
||||
"serial": f"0x{sn:08X}",
|
||||
"tty": list(tty_by_sn.get(sn, [])),
|
||||
"bus": bus,
|
||||
"dev": dev,
|
||||
"id": vp,
|
||||
"manufacturer": man,
|
||||
"product": prod,
|
||||
}
|
||||
|
||||
return [out_map[k] for k in sorted(out_map.keys())]
|
||||
|
||||
|
||||
def usb_acroname_hub_tty_map() -> list[dict[str, object]]:
|
||||
"""
|
||||
Subset of :func:`usb_acroname_hub_identity_list` — only ``serial`` and ``tty`` keys per hub.
|
||||
|
||||
``tty`` may be an empty list when the device has no ``ttyUSB`` / ``ttyACM`` node.
|
||||
"""
|
||||
return [
|
||||
{"serial": x["serial"], "tty": list(x.get("tty") or [])}
|
||||
for x in usb_acroname_hub_identity_list()
|
||||
]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@
|
|||
"""
|
||||
Smoke check: :class:`fiwi.concentrator.FiWiConcentrator` plus consolidated USB hub and per-port
|
||||
metrics tables covering **this machine** and each configured **remote** (serial, ports, Location =
|
||||
``local`` or remote IPv4 when resolvable). Adnacom PCIe catalog last.
|
||||
host plus **TTY** when known, and a **USB** column with Bus/Device, VID:PID, and product string from
|
||||
sysfs — the same facts as a short ``lsusb`` line). Adnacom PCIe catalog last.
|
||||
|
||||
Requires BrainStem locally for hub enumeration on this host. Remotes use SSH ``show_hostcards``
|
||||
and ``port-metrics-json``; the remote tree must include that command (same revision as here).
|
||||
|
|
@ -14,6 +15,9 @@ and ``port-metrics-json``; the remote tree must include that command (same revis
|
|||
|
||||
``--config`` matches ``FIWI_CONFIG`` (profile or absolute ``*.ini``).
|
||||
|
||||
``--powercycle`` runs a destructive self-test on **local** USB hubs and each host in
|
||||
``FIWI_CALIBRATE_REMOTES`` / merged hub hosts: all ports OFF (verify), then all ON (verify).
|
||||
|
||||
With pytest::
|
||||
|
||||
FIWI_CONFIG=uax24 pytest tests/check_concentrator.py
|
||||
|
|
@ -27,7 +31,8 @@ import os
|
|||
import re
|
||||
import socket
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
from dataclasses import dataclass, replace
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -55,6 +60,10 @@ class ConsolidatedHubRow:
|
|||
n_ports: int
|
||||
location: str
|
||||
ssh_target: str | None
|
||||
#: e.g. ``ttyUSB0`` or ``ttyUSB0+ttyACM0`` from sysfs (24ff hubs); empty if unknown.
|
||||
usb_tty_hint: str = ""
|
||||
#: sysfs / ``lsusb``-style line: Bus/Dev, ID ``24ff:…``, product string.
|
||||
usb_identity_hint: str = ""
|
||||
|
||||
|
||||
def _rule(char: str = "-") -> str:
|
||||
|
|
@ -254,6 +263,106 @@ def _build_consolidated_hub_rows(c: FiWiConcentrator) -> tuple[list[Consolidated
|
|||
return rows, rc
|
||||
|
||||
|
||||
def _local_usb_ent_by_serial() -> dict[str, dict[str, object]]:
|
||||
"""Normalized hub serial → identity dict from :func:`fiwi.usb_probe.usb_acroname_hub_identity_list`."""
|
||||
from fiwi import usb_probe as usb_mod
|
||||
|
||||
out: dict[str, dict[str, object]] = {}
|
||||
for ent in usb_mod.usb_acroname_hub_identity_list():
|
||||
if not isinstance(ent, dict):
|
||||
continue
|
||||
sn = _norm_hub_serial(str(ent.get("serial", "")))
|
||||
if sn:
|
||||
out[sn] = ent
|
||||
return out
|
||||
|
||||
|
||||
def _fetch_remote_usb_ent_by_serial(host: str) -> dict[str, dict[str, object]]:
|
||||
from fiwi.ssh import SshNode
|
||||
|
||||
node = SshNode.parse(host)
|
||||
code, out, err = node.invoke_capture(["usb-hub-tty-json"], timeout=45, defer=False)
|
||||
if err.strip():
|
||||
print(err.rstrip(), file=sys.stderr, flush=True)
|
||||
if code != 0:
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(out.strip() or "[]")
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
if not isinstance(data, list):
|
||||
return {}
|
||||
out_map: dict[str, dict[str, object]] = {}
|
||||
for raw in data:
|
||||
if not isinstance(raw, dict):
|
||||
continue
|
||||
sn = _norm_hub_serial(str(raw.get("serial", "")))
|
||||
if sn:
|
||||
out_map[sn] = raw
|
||||
return out_map
|
||||
|
||||
|
||||
def _usb_identity_display(ent: dict[str, object] | None, *, max_product: int = 42) -> str:
|
||||
"""Human-readable USB row (matches short ``lsusb`` intent; values come from sysfs)."""
|
||||
if not ent:
|
||||
return ""
|
||||
bus = ent.get("bus")
|
||||
dev = ent.get("dev")
|
||||
vp = str(ent.get("id") or "").strip()
|
||||
prod = str(ent.get("product") or "").replace("\n", " ").strip()
|
||||
if not prod:
|
||||
prod = str(ent.get("manufacturer") or "").replace("\n", " ").strip()
|
||||
if len(prod) > max_product:
|
||||
prod = prod[: max_product - 3] + "..."
|
||||
bits: list[str] = []
|
||||
if isinstance(bus, int) and isinstance(dev, int):
|
||||
bits.append(f"Bus {bus:03d} Dev {dev:03d}")
|
||||
if vp:
|
||||
bits.append(f"ID {vp}")
|
||||
if prod:
|
||||
bits.append(prod)
|
||||
return " · ".join(bits)
|
||||
|
||||
|
||||
def _enrich_hub_rows_usb(rows: list[ConsolidatedHubRow]) -> list[ConsolidatedHubRow]:
|
||||
"""Fill ``usb_tty_hint`` and ``usb_identity_hint`` from local sysfs or ``usb-hub-tty-json`` over SSH."""
|
||||
local = _local_usb_ent_by_serial()
|
||||
remote_maps: dict[str, dict[str, dict[str, object]]] = {}
|
||||
out: list[ConsolidatedHubRow] = []
|
||||
for row in rows:
|
||||
ent: dict[str, object] | None = None
|
||||
if row.ssh_target is None:
|
||||
ent = local.get(_norm_hub_serial(row.serial))
|
||||
else:
|
||||
if row.ssh_target not in remote_maps:
|
||||
remote_maps[row.ssh_target] = _fetch_remote_usb_ent_by_serial(row.ssh_target)
|
||||
ent = remote_maps[row.ssh_target].get(_norm_hub_serial(row.serial))
|
||||
tty_hint = ""
|
||||
if ent:
|
||||
ttys = ent.get("tty")
|
||||
if isinstance(ttys, list):
|
||||
tty_hint = "+".join(str(x) for x in ttys if isinstance(x, str))
|
||||
usb_line = _usb_identity_display(ent)
|
||||
out.append(
|
||||
replace(
|
||||
row,
|
||||
usb_tty_hint=tty_hint,
|
||||
usb_identity_hint=usb_line,
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def _location_cell(row: ConsolidatedHubRow) -> str:
|
||||
if row.usb_tty_hint:
|
||||
return f"{row.location} · {row.usb_tty_hint}"
|
||||
return row.location
|
||||
|
||||
|
||||
def _usb_cell(row: ConsolidatedHubRow) -> str:
|
||||
return row.usb_identity_hint if row.usb_identity_hint else "—"
|
||||
|
||||
|
||||
def _index_metrics_by_serial_port(items: list[dict[str, object]]) -> dict[str, dict[int, dict[str, object]]]:
|
||||
out: dict[str, dict[int, dict[str, object]]] = {}
|
||||
for raw in items:
|
||||
|
|
@ -314,6 +423,180 @@ def _fetch_remote_port_metrics(host: str) -> tuple[int, list[dict[str, object]],
|
|||
return 0, typed, out, err
|
||||
|
||||
|
||||
_POWERCYCLE_SETTLE_SEC = 0.35
|
||||
|
||||
|
||||
def _set_all_local_ports(c: FiWiConcentrator, on: bool) -> None:
|
||||
"""Enable or disable every downstream port on every connected local hub."""
|
||||
for stem in c.hubs:
|
||||
n = c._port_count(stem)
|
||||
for port in range(n):
|
||||
if on:
|
||||
stem.usb.setPortEnable(port)
|
||||
else:
|
||||
stem.usb.setPortDisable(port)
|
||||
|
||||
|
||||
def _power_mismatches(
|
||||
rows: list[dict[str, object]], want_on: bool, where: str
|
||||
) -> list[str]:
|
||||
"""Each row must have ``power`` of ON or OFF matching ``want_on``."""
|
||||
want = "ON" if want_on else "OFF"
|
||||
bad: list[str] = []
|
||||
for row in rows:
|
||||
pwr = row.get("power")
|
||||
if pwr != want:
|
||||
hub = row.get("hub", "?")
|
||||
port = row.get("port", "?")
|
||||
sn = row.get("serial", "?")
|
||||
bad.append(
|
||||
f"{where} hub {hub} port {port} serial {sn!r}: power={pwr!r} (expected {want})"
|
||||
)
|
||||
return bad
|
||||
|
||||
|
||||
def _restore_all_ports_on(c: FiWiConcentrator, hosts: list[str]) -> None:
|
||||
"""Best-effort: turn every local and configured-remote port on (cleanup after failed test)."""
|
||||
from fiwi.ssh import SshNode
|
||||
|
||||
print(" (cleanup) Restoring all ports ON…", flush=True)
|
||||
try:
|
||||
if not c.hubs:
|
||||
c.connect()
|
||||
if c.hubs:
|
||||
_set_all_local_ports(c, True)
|
||||
except Exception as exc:
|
||||
print(f" ! Local restore failed: {exc}", flush=True)
|
||||
for host in hosts:
|
||||
try:
|
||||
code, _o, err = SshNode.parse(host).invoke_capture(
|
||||
["on", "all"], timeout=120, defer=False
|
||||
)
|
||||
if err.strip():
|
||||
print(err.rstrip(), file=sys.stderr, flush=True)
|
||||
if code != 0:
|
||||
print(f" ! Remote {host}: on all restore exit {code}", flush=True)
|
||||
except Exception as exc:
|
||||
print(f" ! Remote {host}: on all restore: {exc}", flush=True)
|
||||
|
||||
|
||||
def _run_powercycle_test(c: FiWiConcentrator) -> int:
|
||||
"""
|
||||
Turn all ports off, confirm via metrics; then on, confirm.
|
||||
Covers local hubs (BrainStem) and each merged SSH hub host (``off``/``on all``).
|
||||
"""
|
||||
from fiwi.ssh import SshNode
|
||||
|
||||
hosts = _merged_hub_hosts()
|
||||
if not c.hubs:
|
||||
c.connect()
|
||||
has_local = bool(c.hubs)
|
||||
if not has_local and not hosts:
|
||||
print("FAIL: no local USB hubs and no remote hub hosts configured.", flush=True)
|
||||
return 1
|
||||
|
||||
print("Power-cycle self-test (disrupts USB power on local + remote hub hosts)", flush=True)
|
||||
print(_rule(), flush=True)
|
||||
if has_local:
|
||||
print(f" Local: {len(c.hubs)} hub(s) on {socket.gethostname()}", flush=True)
|
||||
else:
|
||||
print(" Local: (no USB power hubs connected here)", flush=True)
|
||||
if hosts:
|
||||
print(f" Remote hub hosts: {', '.join(hosts)}", flush=True)
|
||||
print(flush=True)
|
||||
|
||||
completed = False
|
||||
settle = _POWERCYCLE_SETTLE_SEC
|
||||
try:
|
||||
if has_local:
|
||||
print(" Step 1a — local: OFF all downstream ports…", flush=True)
|
||||
_set_all_local_ports(c, False)
|
||||
time.sleep(settle)
|
||||
snap = c.port_metrics_snapshot()
|
||||
if not snap:
|
||||
print(" ! Local: no port-metrics after OFF (empty snapshot).", flush=True)
|
||||
return 1
|
||||
bad = _power_mismatches(snap, False, "local")
|
||||
if bad:
|
||||
for line in bad:
|
||||
print(f" ! {line}", flush=True)
|
||||
return 1
|
||||
print(" Step 1a — OK (all local ports report OFF).", flush=True)
|
||||
|
||||
for host in hosts:
|
||||
loc = _remote_location_label(host)
|
||||
print(f" Step 1b — remote {host} ({loc}): OFF all…", flush=True)
|
||||
node = SshNode.parse(host)
|
||||
code, _out, err = node.invoke_capture(["off", "all"], timeout=120, defer=False)
|
||||
if err.strip():
|
||||
print(err.rstrip(), file=sys.stderr, flush=True)
|
||||
if code != 0:
|
||||
print(f" ! Remote off all exit {code}", flush=True)
|
||||
return 1
|
||||
time.sleep(settle)
|
||||
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
|
||||
if code_m != 0:
|
||||
print(f" ! Remote port-metrics-json exit {code_m} after OFF", flush=True)
|
||||
return 1
|
||||
if not rows:
|
||||
print(f" ! Remote {host}: empty port-metrics after OFF.", flush=True)
|
||||
return 1
|
||||
bad = _power_mismatches(rows, False, f"remote {host}")
|
||||
if bad:
|
||||
for line in bad:
|
||||
print(f" ! {line}", flush=True)
|
||||
return 1
|
||||
print(f" Step 1b — OK ({host}: all OFF).", flush=True)
|
||||
|
||||
if has_local:
|
||||
print(" Step 2a — local: ON all downstream ports…", flush=True)
|
||||
_set_all_local_ports(c, True)
|
||||
time.sleep(settle)
|
||||
snap = c.port_metrics_snapshot()
|
||||
if not snap:
|
||||
print(" ! Local: no port-metrics after ON.", flush=True)
|
||||
return 1
|
||||
bad = _power_mismatches(snap, True, "local")
|
||||
if bad:
|
||||
for line in bad:
|
||||
print(f" ! {line}", flush=True)
|
||||
return 1
|
||||
print(" Step 2a — OK (all local ports report ON).", flush=True)
|
||||
|
||||
for host in hosts:
|
||||
loc = _remote_location_label(host)
|
||||
print(f" Step 2b — remote {host} ({loc}): ON all…", flush=True)
|
||||
node = SshNode.parse(host)
|
||||
code, _out, err = node.invoke_capture(["on", "all"], timeout=120, defer=False)
|
||||
if err.strip():
|
||||
print(err.rstrip(), file=sys.stderr, flush=True)
|
||||
if code != 0:
|
||||
print(f" ! Remote on all exit {code}", flush=True)
|
||||
return 1
|
||||
time.sleep(settle)
|
||||
code_m, rows, _o2, _e2 = _fetch_remote_port_metrics(host)
|
||||
if code_m != 0:
|
||||
print(f" ! Remote port-metrics-json exit {code_m} after ON", flush=True)
|
||||
return 1
|
||||
if not rows:
|
||||
print(f" ! Remote {host}: empty port-metrics after ON.", flush=True)
|
||||
return 1
|
||||
bad = _power_mismatches(rows, True, f"remote {host}")
|
||||
if bad:
|
||||
for line in bad:
|
||||
print(f" ! {line}", flush=True)
|
||||
return 1
|
||||
print(f" Step 2b — OK ({host}: all ON).", flush=True)
|
||||
|
||||
completed = True
|
||||
print(flush=True)
|
||||
print("Power-cycle self-test passed.", flush=True)
|
||||
return 0
|
||||
finally:
|
||||
if not completed:
|
||||
_restore_all_ports_on(c, hosts)
|
||||
|
||||
|
||||
def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow], base_rc: int) -> int:
|
||||
"""Print consolidated hub table and per-port power / current / voltage table. Returns updated rc."""
|
||||
rc = base_rc
|
||||
|
|
@ -331,10 +614,16 @@ def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow
|
|||
print(flush=True)
|
||||
return rc
|
||||
|
||||
print(f"{'#':<4} | {'Serial':<12} | {'Ports':<10} | Location", flush=True)
|
||||
print("-" * 52, flush=True)
|
||||
print(
|
||||
f"{'#':<4} | {'Serial':<12} | {'Ports':<10} | {'Location (host / TTY)':<28} | USB (Bus / ID / product)",
|
||||
flush=True,
|
||||
)
|
||||
print("-" * 100, flush=True)
|
||||
for row in hub_rows:
|
||||
print(f"{row.idx:<4} | {row.serial:<12} | {row.ports_display:<10} | {row.location}", flush=True)
|
||||
print(
|
||||
f"{row.idx:<4} | {row.serial:<12} | {row.ports_display:<10} | {_location_cell(row):<28} | {_usb_cell(row)}",
|
||||
flush=True,
|
||||
)
|
||||
print(flush=True)
|
||||
|
||||
# Per-port metrics: same hub order and port counts as consolidated table.
|
||||
|
|
@ -360,10 +649,11 @@ def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow
|
|||
print("Per-port power (consolidated hub order)", flush=True)
|
||||
print(_rule(), flush=True)
|
||||
print(
|
||||
f"{'Hub#':<4} | {'Serial':<12} | {'Pt':<3} | {'Pwr':<5} | {'mA':>8} | {'V':>8} | Location",
|
||||
f"{'Hub#':<4} | {'Serial':<12} | {'Pt':<3} | {'Pwr':<5} | {'mA':>8} | {'V':>8} | "
|
||||
f"{'Loc (host/TTY)':<22} | USB (Bus / ID / product)",
|
||||
flush=True,
|
||||
)
|
||||
print("-" * 62, flush=True)
|
||||
print("-" * 110, flush=True)
|
||||
|
||||
for row in hub_rows:
|
||||
sn_key = _norm_hub_serial(row.serial)
|
||||
|
|
@ -395,7 +685,8 @@ def _print_usb_hub_tables(c: FiWiConcentrator, hub_rows: list[ConsolidatedHubRow
|
|||
except (TypeError, ValueError):
|
||||
v_s = "—"
|
||||
print(
|
||||
f"{row.idx:<4} | {row.serial:<12} | {port:<3} | {pwr:<5} | {ma_s:>8} | {v_s:>8} | {row.location}",
|
||||
f"{row.idx:<4} | {row.serial:<12} | {port:<3} | {pwr:<5} | {ma_s:>8} | {v_s:>8} | "
|
||||
f"{_location_cell(row):<22} | {_usb_cell(row)}",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
|
|
@ -428,6 +719,7 @@ def _print_pcie_catalog_section() -> None:
|
|||
def _print_consolidated_report(c: FiWiConcentrator) -> int:
|
||||
_print_ssh_and_hosts_summary()
|
||||
hub_rows, remote_rc = _build_consolidated_hub_rows(c)
|
||||
hub_rows = _enrich_hub_rows_usb(hub_rows)
|
||||
remote_rc = _print_usb_hub_tables(c, hub_rows, remote_rc)
|
||||
_print_pcie_catalog_section()
|
||||
return remote_rc
|
||||
|
|
@ -453,6 +745,14 @@ def _parse_args() -> argparse.Namespace:
|
|||
metavar="PROFILE_OR_INI",
|
||||
help="INI profile or absolute path (sets FIWI_CONFIG for this run)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--powercycle",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Self-test only: all downstream ports OFF then ON on local hubs and each "
|
||||
"merged remote hub host; verifies via port metrics (disrupts USB power)."
|
||||
),
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
|
|
@ -473,7 +773,10 @@ def main() -> int:
|
|||
remote_fail = 0
|
||||
try:
|
||||
c = _instantiate_concentrator()
|
||||
remote_fail = _print_consolidated_report(c)
|
||||
if args.powercycle:
|
||||
remote_fail = _run_powercycle_test(c)
|
||||
else:
|
||||
remote_fail = _print_consolidated_report(c)
|
||||
except AssertionError as exc:
|
||||
print(f"FAIL: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
|
@ -488,9 +791,13 @@ def main() -> int:
|
|||
pass
|
||||
|
||||
print(_rule("="), flush=True)
|
||||
print(f"FiWiConcentrator() OK [config: {label}]", flush=True)
|
||||
if remote_fail != 0:
|
||||
print(" (one or more remote hub queries had errors — see above)", flush=True)
|
||||
if args.powercycle:
|
||||
tag = "Power-cycle self-test OK" if remote_fail == 0 else "Power-cycle self-test FAILED"
|
||||
print(f"{tag} [config: {label}]", flush=True)
|
||||
else:
|
||||
print(f"FiWiConcentrator() OK [config: {label}]", flush=True)
|
||||
if remote_fail != 0:
|
||||
print(" (one or more remote hub queries had errors — see above)", flush=True)
|
||||
print(_rule("="), flush=True)
|
||||
return remote_fail
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue