FiWiManager/fiwi/fiber_map_io.py

468 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Load/save fiber_map.json; parse hub/port bindings and chip metadata."""
from __future__ import annotations
import json
import os
import re
import sys
import time
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any
from fiwi.adnacom_pcie_catalog import (
ADNACOM_KNOWN_CARDS,
board_serial_tail,
pcie_from_card_and_lane,
print_catalog_menu,
short_bdf,
)
from fiwi.paths import base_dir, fiber_map_path
def calibrate_remotes_hosts(doc) -> list[str]:
"""
Normalize ``doc['calibrate_remotes']`` to ``user@host`` strings.
Accepts a JSON **array** of strings or a single comma-separated **string**
(panel calibrate and site setup both use this).
"""
if not isinstance(doc, dict):
return []
cr = doc.get("calibrate_remotes")
if isinstance(cr, list):
return [str(x).strip() for x in cr if str(x).strip() and "@" in str(x)]
if isinstance(cr, str) and cr.strip():
return [p.strip() for p in cr.split(",") if p.strip() and "@" in p]
return []
@dataclass(frozen=True)
class CalibrateSshTargetPlan:
"""
Resolved ``user@host`` hub targets for hybrid Fi-Wi (panel calibrate, ``off all``, relay).
Order is stable: explicit CLI hosts, then ``doc['calibrate_remotes']``, then
:attr:`~fiwi.ssh.SshNodeConfig.calibrate_remotes` (INI + ``FIWI_CALIBRATE_REMOTES`` /
``FIWI_REMOTE_HUBS``), each phase deduped.
"""
hosts: tuple[str, ...]
first_introduced_via_ssh_config: tuple[str, ...]
"""Hosts whose first appearance was only from config/env, not map or ``extra_cli_hosts``."""
def resolve_calibrate_ssh_targets(
doc: Mapping[str, Any] | None,
*,
extra_cli_hosts: Iterable[str] | None = None,
) -> CalibrateSshTargetPlan:
"""
Single place that merges calibrate SSH targets (map + profile + optional ``--ssh``).
Call after :func:`fiwi.paths.configure` / INI load so :class:`~fiwi.ssh.SshNodeConfig` resolves.
"""
from fiwi.ssh import SshNodeConfig
hosts: list[str] = []
seen: set[str] = set()
introduced_cfg: list[str] = []
for raw in extra_cli_hosts or ():
t = str(raw).strip()
if not t or "@" not in t or t in seen:
continue
seen.add(t)
hosts.append(t)
if isinstance(doc, dict):
for t in calibrate_remotes_hosts(doc):
if t in seen:
continue
seen.add(t)
hosts.append(t)
cfg_line = (SshNodeConfig.load().calibrate_remotes or "").strip()
if cfg_line:
for part in cfg_line.split(","):
t = part.strip()
if not t or "@" not in t or t in seen:
continue
seen.add(t)
hosts.append(t)
introduced_cfg.append(t)
return CalibrateSshTargetPlan(
hosts=tuple(hosts),
first_introduced_via_ssh_config=tuple(introduced_cfg),
)
def ensure_fiber_map_document(doc):
if not isinstance(doc, dict):
raise ValueError("map root must be a JSON object")
out = dict(doc)
fp = out.get("fiber_ports")
if fp is None:
out["fiber_ports"] = {}
elif not isinstance(fp, dict):
raise ValueError("fiber_ports must be a JSON object")
return out
def load_fiber_map_document():
"""
Load the configured fiber map if present; return None if missing.
If the configured path (usually ``maps/fiber_map.json``) is absent but a legacy
``fiber_map.json`` exists in the install root, that file is loaded so older
layouts keep working until you save (which writes the configured path).
"""
fpath = fiber_map_path()
if os.path.isfile(fpath):
with open(fpath, encoding="utf-8") as f:
return ensure_fiber_map_document(json.load(f))
leg = os.path.join(base_dir(), "fiber_map.json")
if (
os.path.normpath(os.path.abspath(leg))
!= os.path.normpath(os.path.abspath(fpath))
and os.path.isfile(leg)
):
with open(leg, encoding="utf-8") as f:
return ensure_fiber_map_document(json.load(f))
return None
def write_fiber_map_document(doc, *, path=None):
"""Write the fiber map (default :func:`~fiwi.paths.fiber_map_path`) from a document dict."""
p = path if path is not None else fiber_map_path()
out = ensure_fiber_map_document(doc)
parent = os.path.dirname(os.path.abspath(p))
if parent:
os.makedirs(parent, exist_ok=True)
with open(p, "w", encoding="utf-8") as f:
json.dump(out, f, indent=2)
f.write("\n")
def load_fiber_map_or_exit():
doc = load_fiber_map_document()
if doc is None:
print(
f"Missing {fiber_map_path()}.\n"
" Copy fiber_map.example.json → maps/fiber_map.json (or set FIWI_FIBER_MAP) "
"and set fiber_ports "
'(e.g. "5": {"hub": 1, "port": 2, "ssh": "rjmcmahon@192.168.1.39"}). '
"Use ssh / remote / host+user when USB hubs are reached via SSH.",
file=sys.stderr,
flush=True,
)
sys.exit(1)
return doc
def fiber_sort_key(key):
s = str(key).strip()
if s.isdigit():
return (0, int(s))
return (1, s)
def parse_hub_port_entry(entry):
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
if entry is None:
return None
if isinstance(entry, str):
entry = entry.strip()
if not entry or entry.lower() == "null":
return None
try:
h_s, p_s = entry.split(".", 1)
return int(h_s), int(p_s)
except ValueError:
return None
if isinstance(entry, dict):
try:
return int(entry["hub"]), int(entry["port"])
except (KeyError, TypeError, ValueError):
return None
return None
def fiber_entry_hub_port(entry):
return parse_hub_port_entry(entry)
def fiber_ssh_target(entry):
"""
Optional SSH destination for a fiber_ports entry (hubs live on another host).
- "ssh": "user@host" or "user@ip"
- or "remote": same
- or "host": "ip" with optional "user" (default root)
"""
if not isinstance(entry, dict):
return None
s = entry.get("ssh") or entry.get("remote")
if isinstance(s, str) and s.strip():
return s.strip()
host = entry.get("host")
if isinstance(host, str) and host.strip():
user = entry.get("user")
u = user.strip() if isinstance(user, str) and user.strip() else "root"
return f"{u}@{host.strip()}"
return None
def chip_fields_from_lsusb_lines(lines):
"""Build fiber_map.json fields from full lsusb lines (see fiber chip … save)."""
if not lines:
return {}
ids = []
descs = []
for line in lines:
m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I)
if m:
ids.append(f"{m.group(1)}:{m.group(2)}".lower())
descs.append(m.group(3).strip())
out = {
"usb_lsusb_lines": list(lines),
"chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if ids:
out["usb_id"] = ids[0]
out["chip_type"] = descs[0] if descs[0] else ids[0]
if len(ids) > 1:
out["usb_ids"] = ids
return out
def stored_chip_preview(entry, width=26):
"""Short label for status tables from saved probe metadata."""
if not isinstance(entry, dict):
return ""
def _take(v):
if not isinstance(v, str) or not v.strip():
return ""
s = v.strip().replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
for k in ("chip_type",):
got = _take(entry.get(k))
if got:
return got
wlan = entry.get("wlan")
if isinstance(wlan, dict):
prim = wlan.get("primary")
if isinstance(prim, dict):
for k in ("chip_label", "product"):
got = _take(prim.get(k))
if got:
return got
for k in ("usb_description", "usb_id"):
got = _take(entry.get(k))
if got:
return got
return ""
def stored_pcie_preview(entry, width=22):
"""
Short label from optional fiber_ports[].pcie object (manual or external tooling).
Supported keys (all optional):
bus_address / pci_address — e.g. 0000:c1:00.0 (display shortens leading 0000:)
switch_chip / chip — e.g. PEX8718
slot_port / downstream_port — integer downstream slot index (e.g. monitor "Port 0")
board_label — e.g. H3
board_serial — optional host board serial string
adapter_port — host adapter lane, e.g. SFP 14 from monitor tools
sfp_serial — transceiver serial for that lane (from catalog or manual)
link_capability, link_state — free-text strings
"""
if not isinstance(entry, dict):
return ""
pcie = entry.get("pcie")
if not isinstance(pcie, dict):
return ""
bus = pcie.get("bus_address") or pcie.get("pci_address") or ""
if isinstance(bus, str):
bus = bus.strip()
if bus.startswith("0000:"):
bus = bus[5:]
chip = pcie.get("switch_chip") or pcie.get("chip") or ""
if isinstance(chip, str):
chip = chip.strip()
board = pcie.get("board_label") or pcie.get("board") or ""
if isinstance(board, str):
board = board.strip()
slot = pcie.get("slot_port")
if slot is None:
slot = pcie.get("downstream_port")
ap = pcie.get("adapter_port")
sfp_sn = pcie.get("sfp_serial")
if isinstance(sfp_sn, str) and sfp_sn.strip():
sfp_sn = sfp_sn.strip()
if len(sfp_sn) > 10:
sfp_sn = sfp_sn[-10:]
else:
sfp_sn = ""
bits = []
if board:
bits.append(board)
if chip:
bits.append(chip)
if slot is not None and str(slot).strip() != "":
bits.append(f"slot{slot}")
if ap is not None and str(ap).strip() != "":
bits.append(f"SFP{ap}")
if sfp_sn:
bits.append(sfp_sn)
if bus:
bits.append(bus)
s = " ".join(bits) if bits else ""
if not s:
return ""
s = s.replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
def _prompt_pcie_fields_interactive(pcie_start: dict) -> dict:
"""TTY prompts; non-empty answers override or set keys. EOF on first line → return pcie_start."""
pcie = dict(pcie_start) if isinstance(pcie_start, dict) else {}
labels = [
(
"bus_address",
"PCI bus / BDF (e.g. 0000:c1:00.0 or c1:00.0)",
"str",
),
(
"switch_chip",
"PCIe switch chip (e.g. PEX8718)",
"str",
),
(
"slot_port",
'Downstream slot index (integer; monitor "Port 0" → 0)',
"int",
),
(
"board_label",
"Board label (e.g. H3)",
"str",
),
(
"board_serial",
"Board serial (optional, e.g. 4C-0A-3D-…)",
"str",
),
(
"adapter_port",
"Host adapter / SFP lane (integer, e.g. 14)",
"int",
),
(
"sfp_serial",
"SFP module serial (optional)",
"str",
),
(
"link_capability",
"Link capability (e.g. Gen3 x4)",
"str",
),
(
"link_state",
"Link state (e.g. Gen3 x4)",
"str",
),
]
for key, label, kind in labels:
try:
raw = input(f" {label} [empty=keep previous]: ").strip()
except EOFError:
print(" (EOF — leaving earlier PCIe answers as-is.)", flush=True)
return pcie
if not raw:
continue
if kind == "int":
try:
pcie[key] = int(raw, 10)
except ValueError:
print(f" Ignored {key!r} (not an integer).", flush=True)
else:
pcie[key] = raw
return pcie
def prompt_pcie_metadata_for_calibrate(existing_pcie):
"""
Optional TTY step after assigning a fiber id during panel calibrate.
Prints the Adnacom card list (no paste): pick ``1````6`` then SFP lane ``1````4``,
or ``m`` for typed fields, ``c`` to clear, Enter to keep.
Returns ``(action, payload)``:
``("keep", None)`` — leave ``entry['pcie']`` unchanged (still from copied base map).
``("clear", None)`` — remove ``pcie`` from this entry.
``("set", dict)`` — replace with ``dict`` (caller drops ``pcie`` if dict is empty).
"""
print_catalog_menu()
try:
r = input(
" PCIe? [Enter/s/skip=keep, 16=card, m=manual, c=clear, Ctrl-C=save & exit calibrate]: "
).strip().lower()
except EOFError:
return ("keep", None)
if r in ("s", "skip"):
return ("keep", None)
if r in ("c", "clear"):
return ("clear", None)
if r in ("m", "manual"):
base = existing_pcie if isinstance(existing_pcie, dict) else {}
newp = _prompt_pcie_fields_interactive(base)
return ("set", newp)
if r.isdigit():
n = int(r, 10)
if 1 <= n <= len(ADNACOM_KNOWN_CARDS):
card = ADNACOM_KNOWN_CARDS[n - 1]
bdf = short_bdf(card["bus_address"])
tail = board_serial_tail(card["board_serial"])
print(
f" Selected card {n}: {bdf} board SN …{tail} (pick SFP 14 in Monitor)",
flush=True,
)
try:
lane_s = input(
" SFP lane on that card [14], Enter=cancel: "
).strip()
except EOFError:
return ("keep", None)
if not lane_s:
print(" Cancelled — PCIe unchanged.", flush=True)
return ("keep", None)
try:
lane = int(lane_s, 10)
except ValueError:
print(" Not an integer — PCIe unchanged.", flush=True)
return ("keep", None)
if lane < 1 or lane > 4:
print(" Use 14 — PCIe unchanged.", flush=True)
return ("keep", None)
newp = pcie_from_card_and_lane(card, lane)
print(
f" Stored: {bdf} SFP{lane} module SN {newp['sfp_serial']}",
flush=True,
)
return ("set", newp)
if r:
print(" Unrecognised — PCIe unchanged.", flush=True)
return ("keep", None)