FiWiManager/fiwi/cli.py

373 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Fi-Wi framework CLI: argv dispatch, --ssh, fiber-map forwarding."""
import getopt
import json
import os
import sys
from typing import NamedTuple
# Opaque argv tokens so getopt.gnu_getopt does not treat calibrate's --ssh as unknown.
_SSH_BLOCK_FMT = "__FIWI_SSH_BLOCK_{}__"
from fiwi.concentrator import FiWiConcentrator
from fiwi.brainstem_loader import load_brainstem
from fiwi.ssh_dispatch import dispatch_fiber_mapped_ssh_if_needed
from fiwi.ssh_node import SshNode
from fiwi import usb_probe as usb
class _GlobalCli(NamedTuple):
"""Result of GNU-style option scan (``getopt.gnu_getopt``)."""
want_help: bool
remote_host: str | None
positional: list[str]
def _shield_ssh_remote_pairs(argv: list[str]) -> tuple[list[str], list[tuple[str, str]]]:
"""Replace ``--ssh`` / ``--remote`` + value with opaque operands for getopt."""
pairs: list[tuple[str, str]] = []
out: list[str] = []
i = 0
while i < len(argv):
if argv[i] in ("--ssh", "--remote"):
if i + 1 >= len(argv):
out.append(argv[i])
i += 1
continue
pairs.append((argv[i], argv[i + 1]))
out.append(_SSH_BLOCK_FMT.format(len(pairs) - 1))
i += 2
continue
out.append(argv[i])
i += 1
return out, pairs
def _unshield_ssh_remote_pairs(args: list[str], pairs: list[tuple[str, str]]) -> list[str]:
out: list[str] = []
for a in args:
if a.startswith("__FIWI_SSH_BLOCK_") and a.endswith("__"):
try:
idx = int(a[len("__FIWI_SSH_BLOCK_") : -len("__")])
except ValueError:
out.append(a)
continue
if 0 <= idx < len(pairs):
k, v = pairs[idx]
out.extend([k, v])
else:
out.append(a)
else:
out.append(a)
return out
def _parse_global_cli(argv: list[str]) -> _GlobalCli | None:
"""
Parse global options with ``getopt.gnu_getopt`` (GNU interleaving). ``--ssh`` /
``--remote`` plus host are shielded first so ``panel calibrate … --ssh`` is
not rejected as an unknown long option.
"""
shielded, ssh_pairs = _shield_ssh_remote_pairs(argv)
try:
opts, args = getopt.gnu_getopt(
shielded,
"h",
("help", "async"),
)
except getopt.GetoptError as exc:
print(f"fiwi: {exc}", file=sys.stderr, flush=True)
return None
want_help = False
for opt, _val in opts:
if opt == "--async":
os.environ["FIWI_REMOTE_DEFER"] = "1"
elif opt in ("-h", "--help"):
want_help = True
pos = _unshield_ssh_remote_pairs(args, ssh_pairs)
remote_host: str | None = None
if len(pos) >= 1 and pos[0] in ("--ssh", "--remote"):
if len(pos) < 2 or not pos[1].strip():
print(
"fiwi: --ssh / --remote requires user@host\n"
"Usage: fiwi.py --ssh user@host <command> [args...]",
file=sys.stderr,
flush=True,
)
return None
remote_host = pos[1].strip()
pos = pos[2:]
return _GlobalCli(
want_help=want_help,
remote_host=remote_host,
positional=pos,
)
def _print_cli_help() -> None:
print(
"Fi-Wi test framework — CLI\n"
"Usage: fiwi.py [--async] <command> [target]\n"
" --async set FIWI_REMOTE_DEFER: deferred calls spawn ssh child processes immediately;\n"
" panel calibrate overlaps them (join via handle.result(); no Python threads).\n"
" Or set FIWI_REMOTE_DEFER=1 / remote_ssh.env / config/*.ini (FIWI_CONFIG=profile).\n"
" discover — USB power-control hubs (serial, port count); no port I/O\n"
" show_hostcards — same as discover, concentrator 'hostcards' label\n"
" show_radioheads — all fiber_ports rows + power (same table as fiber status)\n"
" status [target] — default command; target like all, 1.3, all.2\n"
" fiber status — fiber_ports + power (local or per-entry ssh / host+user)\n"
" fiber chip <id> [save] — local lsusb probe (SSH-mapped ports: wlan/pcie from calibrate, not fiber chip)\n"
" wlan-info-json — machine-readable wireless NIC snapshot (sysfs + lspci/iw); used by panel calibrate over SSH\n"
" power fiber-port <id> on|off — power by fiber key (ssh forward if map says so)\n"
" panel status — rack positions 1…N (N = patch_panel.slots, default 24)\n"
" panel calibrate … — set patch panel size first, then USB hub walk → fiber_map.json\n"
" panel on|off|reboot|reboot-force <n>\n"
" on|off [target] reboot|reboot-force [target] setup verify\n"
"\n"
"Remote (hubs on another host — no local brainstem needed):\n"
" fiwi.py --ssh user@host discover\n"
" remote_ssh.env next to fiwi.py (see remote_ssh.env.example) or env vars:\n"
" FIWI_REMOTE_PYTHON remote interpreter (default python3)\n"
" FIWI_REMOTE_SCRIPT remote script path (default /usr/local/bin/fiwi.py)\n"
" FIWI_SSH_OPTS e.g. '-o BatchMode=yes'\n"
" FIWI_CALIBRATE_REMOTES / FIWI_REMOTE_HUBS optional comma-separated user@host for panel calibrate\n"
" (or [remote_hubs] hosts in config/*.ini)\n"
" Pi: venv + requirements.txt; udev 24ff.\n"
"\n"
"fiber_map.json fiber_ports entries may set ssh routing (hubs on another machine):\n"
' "ssh": "user@host" or "remote": "" or "host": "ip", "user": "pi"\n'
" On the SSH destination, the same fiber id should be local (omit ssh) so commands are not re-forwarded.\n"
' Optional "pcie": { bus, switch, slot, adapter_port, sfp_serial, board_serial, … } — calibrate can fill via 16+SFP.\n'
"\n"
"Hybrid calibrate: put {\"calibrate_remotes\": [\"pi@ip\"]} in fiber_map.json or pass --ssh per host;\n"
" order is all local downstream ports, then each remotes ports (see calibrate-ports-json on the Pi)."
)
def _parse_panel_calibrate_argv(args):
"""
``panel calibrate [merge] [N] [--ssh user@host] …``
Returns ``(merge, limit, calibrate_ssh_hosts)``.
"""
merge = False
limit = None
hosts = []
i = 0
while i < len(args):
a = args[i]
low = a.lower()
if low == "merge":
merge = True
i += 1
continue
if low == "--ssh":
if i + 1 >= len(args):
print("panel calibrate: --ssh requires user@host", file=sys.stderr, flush=True)
sys.exit(2)
hosts.append(args[i + 1].strip())
i += 2
continue
if a.isdigit():
limit = int(a)
i += 1
continue
print(f"panel calibrate: unknown argument {a!r}", file=sys.stderr, flush=True)
sys.exit(2)
return merge, limit, hosts
def main() -> int:
ga = _parse_global_cli(sys.argv[1:])
if ga is None:
return 2
if ga.want_help:
_print_cli_help()
return 0
pos = ga.positional
if ga.remote_host is not None:
if not pos:
print(
"Usage: fiwi.py --ssh user@host <command> [args...]\n"
" Example: fiwi.py --ssh rjmcmahon@192.168.1.39 discover\n"
" If brainstem is in a Pi venv: copy remote_ssh.env.example → remote_ssh.env next to\n"
" this script on the PC where you run --ssh (paths in the file are on the Pi).\n"
" Or export FIWI_REMOTE_PYTHON / FIWI_REMOTE_SCRIPT.\n"
" On the Pi: pip install -r requirements.txt in that venv; udev 24ff.",
file=sys.stderr,
flush=True,
)
return 2
return SshNode.parse(ga.remote_host).invoke(pos, defer=False)
rc_ssh_map = dispatch_fiber_mapped_ssh_if_needed(pos)
if rc_ssh_map is not None:
return rc_ssh_map
os.write(2, b"fiwi: start\n")
try:
load_brainstem()
except Exception as exc:
print(f"fiwi: failed to import brainstem: {exc}", file=sys.stderr, flush=True)
if isinstance(exc, ImportError):
print(
" If this text came from `fiwi.py --ssh …`: the remote used system python3 by default.\n"
" On your PC export FIWI_REMOTE_PYTHON to the Pi venvs python and\n"
" FIWI_REMOTE_SCRIPT to that fiwi.py (absolute paths on the Pi).",
file=sys.stderr,
flush=True,
)
return 1
concentrator = FiWiConcentrator()
try:
cmd = pos[0].lower() if pos else "status"
target = pos[1] if len(pos) > 1 else "all"
if cmd == "status":
concentrator.status(target)
elif cmd == "calibrate-ports-json":
if not concentrator.hubs and not concentrator.connect():
print("[]", flush=True)
else:
pairs = concentrator._ordered_downstream_ports()
print(json.dumps([[h, p] for h, p in pairs]), flush=True)
elif cmd == "lsusb-lines-json":
print(json.dumps(usb.lsusb_lines()), flush=True)
elif cmd == "wlan-info-json":
from fiwi.ieee80211_dev import discover_wireless_for_map
print(json.dumps(discover_wireless_for_map()), flush=True)
elif cmd == "discover":
concentrator.discover()
elif cmd == "show_hostcards":
concentrator.show_hostcards()
elif cmd == "show_radioheads":
concentrator.show_radioheads()
elif cmd == "power":
if len(pos) < 4 or pos[1].lower() != "fiber-port":
print(
"Usage: fiwi.py power fiber-port <fiber_port_id> on|off\n"
" Uses fiber_map.json; per-entry ssh / host+user forwards to that host (see help).",
file=sys.stderr,
flush=True,
)
return 2
try:
fp_n = int(pos[2])
except ValueError:
print("fiber_port_id must be an integer.", file=sys.stderr, flush=True)
return 2
mode = pos[3].lower()
if mode not in ("on", "off"):
print("Last argument must be on or off.", file=sys.stderr, flush=True)
return 2
concentrator.fiber_power(mode, fp_n)
elif cmd == "fiber":
if len(pos) < 2:
print(
"Usage: fiwi.py fiber status\n"
" fiwi.py fiber chip <fiber_port_id> [save]\n"
" status — hub.port, Route, power, and saved previews from fiber_map.json\n"
" chip — local lsusb diff only (not used for SSH-mapped / PCIe-fiber paths)",
file=sys.stderr,
flush=True,
)
return 2
sub = pos[1].lower()
if sub == "status":
concentrator.fiber_map_status()
elif sub == "chip":
if len(pos) < 3:
print(
"Usage: fiwi.py fiber chip <fiber_port_id> [save]",
file=sys.stderr,
flush=True,
)
return 2
try:
chip_fp = int(pos[2])
except ValueError:
print("fiber_port_id must be an integer.", file=sys.stderr, flush=True)
return 2
save_chip = len(pos) >= 4 and pos[3].lower() == "save"
concentrator.fiber_chip(chip_fp, save=save_chip)
else:
print(f"Unknown fiber subcommand: {sub!r}", file=sys.stderr, flush=True)
return 2
elif cmd == "panel":
if len(pos) < 2:
print(
"Usage: fiwi.py panel status\n"
" fiwi.py panel on|off <panel_port>\n"
" fiwi.py panel reboot|reboot-force <panel_port>\n"
" fiwi.py panel calibrate [merge] [<N>] [--ssh user@host] …\n"
" calibrate: local hub ports first, then each --ssh host, calibrate_remotes in JSON, and/or\n"
" FIWI_CALIBRATE_REMOTES in remote_ssh.env (comma-separated) for one-command hybrid.\n"
" merge / N as before; remote steps set \"ssh\" on new fiber_ports entries.\n"
" Calibrate starts by setting patch_panel.slots (front-panel positions); panel <n> is 1…slots.\n"
" Use power fiber-port for arbitrary fiber ids beyond the panel if needed.\n"
" Preset: fiber_map.rpi20.json → fiber_map.json for 8+8+4 → fiber ports 120.",
file=sys.stderr,
flush=True,
)
return 2
sub = pos[1].lower()
if sub == "status":
concentrator.panel_status()
elif sub == "calibrate":
cal_args = pos[2:]
merge, limit, cal_hosts = _parse_panel_calibrate_argv(cal_args)
concentrator.panel_calibrate(merge=merge, limit=limit, calibrate_ssh_hosts=cal_hosts)
elif sub in ("on", "off"):
if len(pos) < 3:
print(
f"Usage: fiwi.py panel {sub} <1-N> (N = patch_panel.slots in fiber_map, default 24)",
file=sys.stderr,
flush=True,
)
return 2
concentrator.panel_power(sub, int(pos[2]))
elif sub == "reboot":
if len(pos) < 3:
print(
"Usage: fiwi.py panel reboot <1-N> (N from fiber_map patch_panel.slots)",
file=sys.stderr,
flush=True,
)
return 2
concentrator.panel_reboot(int(pos[2]), skip_empty=True)
elif sub == "reboot-force":
if len(pos) < 3:
print(
"Usage: fiwi.py panel reboot-force <1-N> (N from fiber_map patch_panel.slots)",
file=sys.stderr,
flush=True,
)
return 2
concentrator.panel_reboot(int(pos[2]), skip_empty=False)
else:
print(f"Unknown panel subcommand: {sub!r}", file=sys.stderr, flush=True)
return 2
elif cmd in ("on", "off"):
if not concentrator.power(cmd, target):
return 1
elif cmd in ("reboot", "reboot-force"):
concentrator.reboot(target, skip_empty=(cmd == "reboot"))
elif cmd == "setup":
concentrator.setup_udev()
elif cmd == "verify":
concentrator.verify()
elif cmd == "help" or cmd == "--help":
_print_cli_help()
else:
print(f"Unknown command: {cmd!r}", file=sys.stderr, flush=True)
print(
"Try: --ssh user@host … | discover | show_hostcards | show_radioheads | calibrate-ports-json | … | help",
file=sys.stderr,
flush=True,
)
return 2
finally:
concentrator.disconnect()
return 0