FiWiManager/fiwi/site_setup.py

621 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
**Primary operator entry** for Fi-Wi on a rig: site metadata, fiber map layout, optional
USB power baseline, optional hub relay bootstrap, optional panel calibrate — usually as::
site_setup.py -c uax24
Optional ``--relay`` / ``--calibrate`` only **trigger** those steps; relay **hosts** come from
``config/*.ini`` (``[remote_hubs] hosts``) and env. ``--relay`` runs
:class:`~fiwi.fiwi_relay.relay_app.FiWiRelayBootstrapApp` in-process (same as ``python -m fiwi.fiwi_relay``).
``--calibrate`` runs :func:`fiwi.cli.run_panel_calibrate` in-process.
``--ssh`` is only for **extra** calibrate targets beyond ``calibrate_remotes`` in the map / profile.
Writes the fiber map (default ``maps/fiber_map.json`` under the install root) keys:
* ``fiwi_site`` — ``concentrator_name``, ``concentrator_location``
* ``patch_panel`` — ``slots``, ``label`` (panel name), ``location``
* ``calibrate_remotes`` — ``user@host`` list for hybrid panel calibrate
Interactive setup can optionally **power down** local downstream USB ports and run ``fiwi off all``
on configured SSH hub hosts **after** saving the map, only when you are **not** continuing to panel
calibrate (``panel calibrate`` repeats that baseline itself).
``calibrate_remotes`` defaults from the active INI profile (e.g. ``[remote_hubs] hosts`` in
``config/uax24.ini``) when you use ``site_setup.py -c uax24``.
Per-strand labels use optional ``fiber_ports[<id>].name`` and ``.location`` (see
:class:`~fiwi.radiohead.RadioHeadEntry`).
"""
from __future__ import annotations
import argparse
import os
import shutil
import socket
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from fiwi import fiber_map_io as fm
from fiwi.paths import fiber_map_path
from fiwi.patch_panel import BoundPatchPanel, PatchPanel, default_panel_ports
if TYPE_CHECKING:
from fiwi.concentrator import FiWiConcentrator
from fiwi.radiohead import RadioHead
FIWI_SITE_KEY = "fiwi_site"
def _fiwi_repo_root() -> str:
"""Directory containing ``fiwi.py`` (parent of package ``fiwi``)."""
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def merge_calibrate_remotes(doc: Dict[str, Any], hosts: List[str]) -> Dict[str, Any]:
"""Set ``calibrate_remotes`` to deduped ``user@host`` list, or remove key if empty."""
doc = fm.ensure_fiber_map_document(doc)
seen: List[str] = []
for h in hosts:
s = (h or "").strip()
if s and "@" in s and s not in seen:
seen.append(s)
if seen:
doc["calibrate_remotes"] = seen
elif "calibrate_remotes" in doc:
del doc["calibrate_remotes"]
return doc
def read_fiwi_site(doc: Dict[str, Any]) -> Dict[str, str]:
"""Return ``fiwi_site`` object from a map document, or empty dict."""
raw = doc.get(FIWI_SITE_KEY)
if not isinstance(raw, dict):
return {}
out: Dict[str, str] = {}
for k in ("concentrator_name", "concentrator_location"):
v = raw.get(k)
if isinstance(v, str) and v.strip():
out[k] = v.strip()
return out
def merge_fiwi_site(
doc: Dict[str, Any],
*,
concentrator_name: str,
concentrator_location: str,
) -> Dict[str, Any]:
"""Attach ``fiwi_site``; empty strings omit that field (existing key removed if both empty)."""
doc = fm.ensure_fiber_map_document(doc)
site: Dict[str, str] = {}
if concentrator_name.strip():
site["concentrator_name"] = concentrator_name.strip()
if concentrator_location.strip():
site["concentrator_location"] = concentrator_location.strip()
if site:
doc[FIWI_SITE_KEY] = site
elif FIWI_SITE_KEY in doc:
del doc[FIWI_SITE_KEY]
return doc
def merge_patch_panel_meta(
doc: Dict[str, Any],
*,
slots: int,
label: str,
location: str,
) -> Dict[str, Any]:
"""Merge ``patch_panel`` slots / name / location; preserves unrelated keys in the blob."""
doc = fm.ensure_fiber_map_document(doc)
if slots < 1 or slots > 256:
raise ValueError("patch panel slots must be 1256")
pp = PatchPanel(slots=slots, label=label.strip(), location=location.strip())
blob: Dict[str, Any] = {}
prev = doc.get("patch_panel")
if isinstance(prev, dict):
blob.update(prev)
blob.update(pp.to_map_blob())
doc["patch_panel"] = blob
return doc
def open_fiwi_stack() -> Tuple["FiWiConcentrator", BoundPatchPanel, List["RadioHead"]]:
"""
Load ``fiber_map.json``, open a :class:`~fiwi.concentrator.FiWiConcentrator`, bound patch panel,
and all mapped :class:`~fiwi.radiohead.RadioHead` instances (panel order, then off-panel fiber ids).
Raises:
FileNotFoundError: if ``fiber_map.json`` is missing.
"""
from fiwi.concentrator import FiWiConcentrator
from fiwi.radiohead import RadioHead, RadioHeadEntry
doc = fm.load_fiber_map_document()
if doc is None:
raise FileNotFoundError(
f"Missing {fiber_map_path()}. Run site_setup.py or copy fiber_map.example.json."
)
c = FiWiConcentrator()
bound = c.patch_panel(doc)
heads: List[RadioHead] = list(bound.heads())
seen = {h.map_entry.map_key for h in heads}
for ent in RadioHeadEntry.each_from_document(doc):
if ent.is_mapped() and ent.map_key not in seen:
heads.append(RadioHead(ent, c))
seen.add(ent.map_key)
def _sort_key(h: RadioHead) -> tuple:
p = h.patch_panel_port
if p is not None:
return (0, p, h.map_entry.map_key)
mk = h.map_entry.map_key
if mk.isdigit():
return (1, int(mk), mk)
return (2, mk, mk)
heads.sort(key=_sort_key)
return c, bound, heads
def _prompt(default: str, label: str) -> str:
try:
raw = input(f" {label} [{default}]: ").strip()
except EOFError:
raw = ""
return raw if raw else default
def _prompt_usb_power_down(*, remotes: List[str], has_local_hubs: bool) -> bool:
bits: List[str] = []
if has_local_hubs:
bits.append("every downstream port on connected local USB hubs")
if remotes:
bits.append("`fiwi off all` on " + ", ".join(remotes))
print(
"\n--- USB power baseline ---\n\n"
"Turn OFF " + " and ".join(bits) + "\n"
" (You are not running panel calibrate this session; calibrate would do this baseline for you.)\n",
flush=True,
)
try:
raw = input(" Power down now? [Y/n]: ").strip().lower()
except EOFError:
return True
return raw not in ("n", "no")
def _run_early_usb_power_down_if_requested(doc: Dict[str, Any], args: argparse.Namespace) -> None:
if args.no_power_down:
return
from fiwi.concentrator import FiWiConcentrator
remotes = list(fm.resolve_calibrate_ssh_targets(doc, extra_cli_hosts=()).hosts)
c = FiWiConcentrator()
try:
if not c.hubs:
c.connect(quiet=True)
has_local = bool(c.hubs)
if not has_local and not remotes:
print(
"\n(USB power baseline skipped: no local hubs opened and no SSH hub hosts in profile/map.)\n",
flush=True,
)
return
if not _prompt_usb_power_down(remotes=remotes, has_local_hubs=has_local):
print(" Skipping USB power-down.\n", flush=True)
return
c.power_down_all_downstream_ports(remotes, context="Site setup")
print(" USB baseline done.\n", flush=True)
finally:
c.disconnect()
def _timestamped_fiber_map_backup_path(original: str) -> str:
"""``fiber_map.json`` → ``fiber_map.json.20260403T143022`` (suffix if collision)."""
d = os.path.dirname(os.path.abspath(original))
base = os.path.basename(original)
stamp = datetime.now().strftime("%Y%m%dT%H%M%S")
candidate = os.path.join(d, f"{base}.{stamp}")
if not os.path.exists(candidate):
return candidate
for i in range(2, 10_000):
alt = os.path.join(d, f"{base}.{stamp}.{i}")
if not os.path.exists(alt):
return alt
raise OSError("could not allocate a unique backup filename")
def _prompt_existing_map_action(path: str) -> str:
"""
Ask what to do when ``fiber_map.json`` already exists.
Returns:
``\"merge\"`` — load and update metadata only.
``\"delete\"`` — move file to a timestamped backup and start ``{\"fiber_ports\": {}}``.
``\"exit\"`` — caller should exit without writing.
"""
print(
f"\n{path} already exists.\n\n"
" [m] Merge — keep fiber_ports and everything else; update only\n"
" fiwi_site, patch_panel, and calibrate_remotes\n"
" [d] New map — move this file to a timestamped backup, then start fresh\n"
" [q] Quit — exit without changing anything\n",
flush=True,
)
while True:
try:
raw = input(" Continue? [m/d/q] (default: m): ").strip().lower()
except EOFError:
return "exit"
if not raw or raw in ("m", "merge"):
return "merge"
if raw in ("d", "delete", "fresh"):
return "delete"
if raw in ("q", "quit", "exit", "x"):
return "exit"
print(" Enter m, d, or q.", flush=True)
def _run_interactive(args: argparse.Namespace) -> Dict[str, Any]:
from fiwi.config import resolved_config_path
from fiwi import paths as paths_mod
from fiwi.ssh import SshNodeConfig
ini_path = resolved_config_path(paths_mod.base_dir())
prof = (os.environ.get("FIWI_CONFIG") or "").strip()
if ini_path:
print(
f"Active profile: {ini_path} (FIWI_CONFIG={prof or 'default'})\n",
flush=True,
)
elif prof:
print(
f"FIWI_CONFIG={prof!r} — no matching config/*.ini; using env / code defaults only.\n",
flush=True,
)
path = fiber_map_path()
exists = os.path.isfile(path)
if exists:
action = _prompt_existing_map_action(path)
if action == "exit":
raise SystemExit(0)
if action == "delete":
backup = _timestamped_fiber_map_backup_path(path)
try:
shutil.move(path, backup)
except OSError as exc:
print(f"Could not move {path}{backup}: {exc}", file=sys.stderr, flush=True)
raise SystemExit(1) from exc
exists = False
print(f"Moved {path}{backup}\nStarting a new map.\n", flush=True)
doc = fm.load_fiber_map_document() if exists else {"fiber_ports": {}}
doc = fm.ensure_fiber_map_document(doc)
site = read_fiwi_site(doc)
pp = PatchPanel.from_map_blob(doc.get("patch_panel"))
def_cn = site.get("concentrator_name", args.concentrator_name or "") or socket.gethostname()
def_cl = site.get("concentrator_location", args.concentrator_location or "")
def_slots = pp.slots if pp is not None else default_panel_ports()
def_plab = (pp.label if pp else "") or args.panel_name or ""
def_ploc = (pp.location if pp else "") or args.panel_location or ""
print("\n--- Fi-Wi site (concentrator / this machine) ---\n", flush=True)
cn = _prompt(def_cn, "Concentrator name (e.g. rack or hostname label)")
cl = _prompt(def_cl, "Concentrator location (e.g. lab, datacenter row)")
print("\n--- Patch panel ---\n", flush=True)
slots_s = _prompt(str(def_slots), "Number of front-panel positions (slots)")
try:
slots = int(slots_s)
except ValueError:
print(" Invalid slot count.", file=sys.stderr, flush=True)
raise SystemExit(2) from None
plab = _prompt(def_plab, "Patch panel name / label")
ploc = _prompt(def_ploc, "Patch panel location (e.g. bay, room)")
print("\n--- Hub hosts (hybrid panel calibrate) ---\n", flush=True)
ssh_cfg = SshNodeConfig.load()
from_profile: List[str] = []
if ssh_cfg.calibrate_remotes:
from_profile = [
x.strip()
for x in ssh_cfg.calibrate_remotes.split(",")
if x.strip() and "@" in x
]
if from_profile and ini_path:
print(
f" Discovered from profile: {', '.join(from_profile)}\n",
flush=True,
)
elif from_profile:
print(
f" Discovered from environment: {', '.join(from_profile)}\n",
flush=True,
)
existing_cr = fm.calibrate_remotes_hosts(doc)
if existing_cr:
def_rel = ",".join(existing_cr)
elif from_profile:
def_rel = ",".join(from_profile)
else:
def_rel = ""
rel_raw = _prompt(
def_rel,
"Hub SSH targets (comma-separated user@host); Enter=default; -=omit",
)
if rel_raw.strip() == "-":
relay_hosts: List[str] = []
elif rel_raw.strip():
relay_hosts = [
x.strip()
for x in rel_raw.split(",")
if x.strip() and "@" in x
]
else:
relay_hosts = [
x.strip()
for x in def_rel.split(",")
if x.strip() and "@" in x
]
doc = merge_fiwi_site(doc, concentrator_name=cn, concentrator_location=cl)
doc = merge_patch_panel_meta(doc, slots=slots, label=plab, location=ploc)
doc = merge_calibrate_remotes(doc, relay_hosts)
return doc
def _run_batch(args: argparse.Namespace) -> Dict[str, Any]:
path = fiber_map_path()
exists = os.path.isfile(path)
if exists and not args.merge:
print(
f"{path} exists; use --merge for batch updates.",
file=sys.stderr,
flush=True,
)
raise SystemExit(2)
doc = fm.load_fiber_map_document() if exists else {"fiber_ports": {}}
doc = fm.ensure_fiber_map_document(doc)
if args.slots is None or args.slots < 1:
print("--batch requires --slots N (>= 1).", file=sys.stderr, flush=True)
raise SystemExit(2)
doc = merge_fiwi_site(
doc,
concentrator_name=args.concentrator_name or "",
concentrator_location=args.concentrator_location or "",
)
doc = merge_patch_panel_meta(
doc,
slots=int(args.slots),
label=args.panel_name or "",
location=args.panel_location or "",
)
return doc
def _profile_argv_prefix() -> List[str]:
prof = (os.environ.get("FIWI_CONFIG") or "").strip()
return ["-c", prof] if prof else []
def build_site_setup_argument_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Fi-Wi site setup — map metadata, optional chained relay + calibrate. "
"Relay SSH targets are read from the INI (e.g. [remote_hubs] hosts); "
"--relay/--calibrate only choose whether to run those steps after saving the map. "
"Run from the install directory next to fiwi.py; use -c PROFILE."
),
)
p.add_argument(
"-c",
"--config",
metavar="PROFILE",
help=(
"INI profile or path (sets FIWI_CONFIG before loading paths — e.g. uax24 → "
"patch_panel defaults, remote_hubs hosts for calibrate_remotes prompt)"
),
)
p.add_argument(
"--merge",
action="store_true",
help="Required with --batch when fiber_map.json already exists (interactive uses m/d/q prompt).",
)
p.add_argument(
"--relay",
action="store_true",
help=(
"After saving the map, run fiwi relay bootstrap in-process (FiWiRelayBootstrapApp; "
"same -c profile). Uses [remote_hubs] / env from config — this flag does not set hosts."
),
)
p.add_argument(
"--calibrate",
action="store_true",
help=(
"After saving the map (and after --relay if given), run panel calibrate merge in-process "
"(fiwi.cli.run_panel_calibrate) plus any --ssh. "
"Interactive runs also ask [Y/n] to calibrate unless --no-calibrate-prompt."
),
)
p.add_argument(
"--batch",
action="store_true",
help="Non-interactive: supply --slots and optional name/location strings",
)
p.add_argument("--concentrator-name", default="", metavar="STR")
p.add_argument("--concentrator-location", default="", metavar="STR")
p.add_argument("--panel-name", default="", metavar="STR")
p.add_argument("--panel-location", default="", metavar="STR")
p.add_argument("--slots", type=int, default=None, metavar="N")
p.add_argument(
"--ssh",
action="append",
default=[],
metavar="user@host",
help="Only with --calibrate: extra --ssh user@host for panel calibrate (repeatable)",
)
p.add_argument(
"--no-power-down",
action="store_true",
help=(
"Skip the optional USB power-down prompt when not calibrating (interactive only; "
"redundant with panel calibrates baseline if you use --calibrate)"
),
)
p.add_argument(
"--no-calibrate-prompt",
action="store_true",
help=(
"After saving the map, do not ask to start panel calibrate (interactive only); "
"use --calibrate to run it without a prompt."
),
)
return p
class SiteSetupApp:
"""Orchestrates fiber map metadata, optional USB power-down, relay bootstrap, panel calibrate."""
def __init__(self, args: argparse.Namespace) -> None:
self._args = args
def run(self) -> int:
import fiwi.paths as paths_mod
args = self._args
paths_mod.configure(_fiwi_repo_root())
if args.batch:
doc = _run_batch(args)
else:
if not sys.stdin.isatty():
print(
"site_setup: need a TTY for prompts, or use --batch with --slots.",
file=sys.stderr,
flush=True,
)
return 2
doc = _run_interactive(args)
out_path = fiber_map_path()
fm.write_fiber_map_document(doc)
print(f"\nWrote {out_path}", flush=True)
site = read_fiwi_site(doc)
pp = PatchPanel.from_map_blob(doc.get("patch_panel"))
if site:
print(
f" fiwi_site: {site.get('concentrator_name', '')!r} @ "
f"{site.get('concentrator_location', '')!r}",
flush=True,
)
if pp:
print(
f" patch_panel: {pp.slots} ports, name={pp.label or ''!r}, "
f"location={pp.location or ''!r}",
flush=True,
)
cr = fm.calibrate_remotes_hosts(doc)
if cr:
print(f" calibrate_remotes: {', '.join(cr)}", flush=True)
map_p = fiber_map_path()
prof = (os.environ.get("FIWI_CONFIG") or "").strip()
ex_prof = prof if prof else "uax24"
do_calibrate = bool(args.calibrate)
if (
not args.batch
and sys.stdin.isatty()
and not do_calibrate
and not args.no_calibrate_prompt
):
try:
ans = input(
"\nStart panel calibrate now (merge from map)? [Y/n]: "
).strip().lower()
except EOFError:
ans = "n"
if not ans or ans in ("y", "yes"):
do_calibrate = True
if not do_calibrate:
if not args.batch and sys.stdin.isatty():
_run_early_usb_power_down_if_requested(doc, args)
print(
"\nStopped after saving the map (no calibrate this run).\n"
"Optional chained steps — relay hosts come from config/ INI, not the CLI:\n"
f" site_setup.py -c {ex_prof} --relay\n"
f" site_setup.py -c {ex_prof} --calibrate\n"
f" site_setup.py -c {ex_prof} --relay --calibrate [--ssh user@host …]\n"
f"If calibrate skips the Pi: check FIWI_FIBER_MAP vs {map_p}; "
f"bootstrap with --relay if needed.\n"
"Per-strand labels: fiber_ports[\"<id>\"].name / .location\n",
flush=True,
)
else:
print(
"\nContinuing to panel calibrate…\n"
"(With `merge`, fiwi asks once to confirm patch panel from the map — Enter to keep, "
"or `e` to change slots/label/location.)\n",
flush=True,
)
root = _fiwi_repo_root()
if args.relay:
if not sys.stdin.isatty():
print("site_setup: --relay needs a TTY.", file=sys.stderr, flush=True)
return 2
from fiwi.fiwi_relay.relay_app import FiWiRelayBootstrapApp
relay_argv = _profile_argv_prefix()
print(
"\nRunning fiwi relay bootstrap (in-process"
+ (f"; {' '.join(relay_argv)}" if relay_argv else "")
+ ")…\n",
flush=True,
)
relay_rc = FiWiRelayBootstrapApp(argv=relay_argv, install_root=root).run()
if relay_rc != 0:
return relay_rc
if do_calibrate:
if not sys.stdin.isatty():
print("site_setup: panel calibrate needs a TTY.", file=sys.stderr, flush=True)
return 2
from fiwi.cli import run_panel_calibrate
extra_ssh = [str(h).strip() for h in (args.ssh or []) if str(h).strip()]
print(
"\nRunning panel calibrate (in-process, merge=True)"
+ (f"; extra --ssh: {', '.join(extra_ssh)}" if extra_ssh else "")
+ ".\n",
flush=True,
)
return run_panel_calibrate(
merge=True,
limit=None,
calibrate_ssh_hosts=extra_ssh,
emit_start_line=True,
)
return 0
def main(argv: List[str] | None = None) -> int:
args = build_site_setup_argument_parser().parse_args(argv)
if args.config:
os.environ["FIWI_CONFIG"] = args.config.strip()
return SiteSetupApp(args).run()
if __name__ == "__main__":
raise SystemExit(main())