468 lines
14 KiB
Python
468 lines
14 KiB
Python
"""Load/save fiber_map.json; parse hub/port bindings and chip metadata."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
from collections.abc import Iterable, Mapping
|
||
from dataclasses import dataclass
|
||
from typing import Any
|
||
|
||
from fiwi.adnacom_pcie_catalog import (
|
||
ADNACOM_KNOWN_CARDS,
|
||
board_serial_tail,
|
||
pcie_from_card_and_lane,
|
||
print_catalog_menu,
|
||
short_bdf,
|
||
)
|
||
from fiwi.paths import base_dir, fiber_map_path
|
||
|
||
|
||
def calibrate_remotes_hosts(doc) -> list[str]:
|
||
"""
|
||
Normalize ``doc['calibrate_remotes']`` to ``user@host`` strings.
|
||
|
||
Accepts a JSON **array** of strings or a single comma-separated **string**
|
||
(panel calibrate and site setup both use this).
|
||
"""
|
||
if not isinstance(doc, dict):
|
||
return []
|
||
cr = doc.get("calibrate_remotes")
|
||
if isinstance(cr, list):
|
||
return [str(x).strip() for x in cr if str(x).strip() and "@" in str(x)]
|
||
if isinstance(cr, str) and cr.strip():
|
||
return [p.strip() for p in cr.split(",") if p.strip() and "@" in p]
|
||
return []
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class CalibrateSshTargetPlan:
|
||
"""
|
||
Resolved ``user@host`` hub targets for hybrid Fi-Wi (panel calibrate, ``off all``, relay).
|
||
|
||
Order is stable: explicit CLI hosts, then ``doc['calibrate_remotes']``, then
|
||
:attr:`~fiwi.ssh.SshNodeConfig.calibrate_remotes` (INI + ``FIWI_CALIBRATE_REMOTES`` /
|
||
``FIWI_REMOTE_HUBS``), each phase deduped.
|
||
"""
|
||
|
||
hosts: tuple[str, ...]
|
||
first_introduced_via_ssh_config: tuple[str, ...]
|
||
"""Hosts whose first appearance was only from config/env, not map or ``extra_cli_hosts``."""
|
||
|
||
|
||
def resolve_calibrate_ssh_targets(
|
||
doc: Mapping[str, Any] | None,
|
||
*,
|
||
extra_cli_hosts: Iterable[str] | None = None,
|
||
) -> CalibrateSshTargetPlan:
|
||
"""
|
||
Single place that merges calibrate SSH targets (map + profile + optional ``--ssh``).
|
||
|
||
Call after :func:`fiwi.paths.configure` / INI load so :class:`~fiwi.ssh.SshNodeConfig` resolves.
|
||
"""
|
||
from fiwi.ssh import SshNodeConfig
|
||
|
||
hosts: list[str] = []
|
||
seen: set[str] = set()
|
||
introduced_cfg: list[str] = []
|
||
|
||
for raw in extra_cli_hosts or ():
|
||
t = str(raw).strip()
|
||
if not t or "@" not in t or t in seen:
|
||
continue
|
||
seen.add(t)
|
||
hosts.append(t)
|
||
|
||
if isinstance(doc, dict):
|
||
for t in calibrate_remotes_hosts(doc):
|
||
if t in seen:
|
||
continue
|
||
seen.add(t)
|
||
hosts.append(t)
|
||
|
||
cfg_line = (SshNodeConfig.load().calibrate_remotes or "").strip()
|
||
if cfg_line:
|
||
for part in cfg_line.split(","):
|
||
t = part.strip()
|
||
if not t or "@" not in t or t in seen:
|
||
continue
|
||
seen.add(t)
|
||
hosts.append(t)
|
||
introduced_cfg.append(t)
|
||
|
||
return CalibrateSshTargetPlan(
|
||
hosts=tuple(hosts),
|
||
first_introduced_via_ssh_config=tuple(introduced_cfg),
|
||
)
|
||
|
||
|
||
def ensure_fiber_map_document(doc):
|
||
if not isinstance(doc, dict):
|
||
raise ValueError("map root must be a JSON object")
|
||
out = dict(doc)
|
||
fp = out.get("fiber_ports")
|
||
if fp is None:
|
||
out["fiber_ports"] = {}
|
||
elif not isinstance(fp, dict):
|
||
raise ValueError("fiber_ports must be a JSON object")
|
||
return out
|
||
|
||
|
||
def load_fiber_map_document():
|
||
"""
|
||
Load the configured fiber map if present; return None if missing.
|
||
|
||
If the configured path (usually ``maps/fiber_map.json``) is absent but a legacy
|
||
``fiber_map.json`` exists in the install root, that file is loaded so older
|
||
layouts keep working until you save (which writes the configured path).
|
||
"""
|
||
fpath = fiber_map_path()
|
||
if os.path.isfile(fpath):
|
||
with open(fpath, encoding="utf-8") as f:
|
||
return ensure_fiber_map_document(json.load(f))
|
||
leg = os.path.join(base_dir(), "fiber_map.json")
|
||
if (
|
||
os.path.normpath(os.path.abspath(leg))
|
||
!= os.path.normpath(os.path.abspath(fpath))
|
||
and os.path.isfile(leg)
|
||
):
|
||
with open(leg, encoding="utf-8") as f:
|
||
return ensure_fiber_map_document(json.load(f))
|
||
return None
|
||
|
||
|
||
def write_fiber_map_document(doc, *, path=None):
|
||
"""Write the fiber map (default :func:`~fiwi.paths.fiber_map_path`) from a document dict."""
|
||
p = path if path is not None else fiber_map_path()
|
||
out = ensure_fiber_map_document(doc)
|
||
parent = os.path.dirname(os.path.abspath(p))
|
||
if parent:
|
||
os.makedirs(parent, exist_ok=True)
|
||
with open(p, "w", encoding="utf-8") as f:
|
||
json.dump(out, f, indent=2)
|
||
f.write("\n")
|
||
|
||
|
||
def load_fiber_map_or_exit():
|
||
doc = load_fiber_map_document()
|
||
if doc is None:
|
||
print(
|
||
f"Missing {fiber_map_path()}.\n"
|
||
" Copy fiber_map.example.json → maps/fiber_map.json (or set FIWI_FIBER_MAP) "
|
||
"and set fiber_ports "
|
||
'(e.g. "5": {"hub": 1, "port": 2, "ssh": "rjmcmahon@192.168.1.39"}). '
|
||
"Use ssh / remote / host+user when USB hubs are reached via SSH.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
return doc
|
||
|
||
|
||
def fiber_sort_key(key):
|
||
s = str(key).strip()
|
||
if s.isdigit():
|
||
return (0, int(s))
|
||
return (1, s)
|
||
|
||
|
||
def parse_hub_port_entry(entry):
|
||
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
|
||
if entry is None:
|
||
return None
|
||
if isinstance(entry, str):
|
||
entry = entry.strip()
|
||
if not entry or entry.lower() == "null":
|
||
return None
|
||
try:
|
||
h_s, p_s = entry.split(".", 1)
|
||
return int(h_s), int(p_s)
|
||
except ValueError:
|
||
return None
|
||
if isinstance(entry, dict):
|
||
try:
|
||
return int(entry["hub"]), int(entry["port"])
|
||
except (KeyError, TypeError, ValueError):
|
||
return None
|
||
return None
|
||
|
||
|
||
def fiber_entry_hub_port(entry):
|
||
return parse_hub_port_entry(entry)
|
||
|
||
|
||
def fiber_ssh_target(entry):
|
||
"""
|
||
Optional SSH destination for a fiber_ports entry (hubs live on another host).
|
||
- "ssh": "user@host" or "user@ip"
|
||
- or "remote": same
|
||
- or "host": "ip" with optional "user" (default root)
|
||
"""
|
||
if not isinstance(entry, dict):
|
||
return None
|
||
s = entry.get("ssh") or entry.get("remote")
|
||
if isinstance(s, str) and s.strip():
|
||
return s.strip()
|
||
host = entry.get("host")
|
||
if isinstance(host, str) and host.strip():
|
||
user = entry.get("user")
|
||
u = user.strip() if isinstance(user, str) and user.strip() else "root"
|
||
return f"{u}@{host.strip()}"
|
||
return None
|
||
|
||
|
||
def chip_fields_from_lsusb_lines(lines):
|
||
"""Build fiber_map.json fields from full lsusb lines (see fiber chip … save)."""
|
||
if not lines:
|
||
return {}
|
||
ids = []
|
||
descs = []
|
||
for line in lines:
|
||
m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I)
|
||
if m:
|
||
ids.append(f"{m.group(1)}:{m.group(2)}".lower())
|
||
descs.append(m.group(3).strip())
|
||
out = {
|
||
"usb_lsusb_lines": list(lines),
|
||
"chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||
}
|
||
if ids:
|
||
out["usb_id"] = ids[0]
|
||
out["chip_type"] = descs[0] if descs[0] else ids[0]
|
||
if len(ids) > 1:
|
||
out["usb_ids"] = ids
|
||
return out
|
||
|
||
|
||
def stored_chip_preview(entry, width=26):
|
||
"""Short label for status tables from saved probe metadata."""
|
||
if not isinstance(entry, dict):
|
||
return ""
|
||
|
||
def _take(v):
|
||
if not isinstance(v, str) or not v.strip():
|
||
return ""
|
||
s = v.strip().replace("\n", " ")
|
||
if len(s) > width:
|
||
return s[: width - 2] + ".."
|
||
return s
|
||
|
||
for k in ("chip_type",):
|
||
got = _take(entry.get(k))
|
||
if got:
|
||
return got
|
||
wlan = entry.get("wlan")
|
||
if isinstance(wlan, dict):
|
||
prim = wlan.get("primary")
|
||
if isinstance(prim, dict):
|
||
for k in ("chip_label", "product"):
|
||
got = _take(prim.get(k))
|
||
if got:
|
||
return got
|
||
for k in ("usb_description", "usb_id"):
|
||
got = _take(entry.get(k))
|
||
if got:
|
||
return got
|
||
return ""
|
||
|
||
|
||
def stored_pcie_preview(entry, width=22):
|
||
"""
|
||
Short label from optional fiber_ports[].pcie object (manual or external tooling).
|
||
|
||
Supported keys (all optional):
|
||
bus_address / pci_address — e.g. 0000:c1:00.0 (display shortens leading 0000:)
|
||
switch_chip / chip — e.g. PEX8718
|
||
slot_port / downstream_port — integer downstream slot index (e.g. monitor "Port 0")
|
||
board_label — e.g. H3
|
||
board_serial — optional host board serial string
|
||
adapter_port — host adapter lane, e.g. SFP 1–4 from monitor tools
|
||
sfp_serial — transceiver serial for that lane (from catalog or manual)
|
||
link_capability, link_state — free-text strings
|
||
"""
|
||
if not isinstance(entry, dict):
|
||
return ""
|
||
pcie = entry.get("pcie")
|
||
if not isinstance(pcie, dict):
|
||
return ""
|
||
bus = pcie.get("bus_address") or pcie.get("pci_address") or ""
|
||
if isinstance(bus, str):
|
||
bus = bus.strip()
|
||
if bus.startswith("0000:"):
|
||
bus = bus[5:]
|
||
chip = pcie.get("switch_chip") or pcie.get("chip") or ""
|
||
if isinstance(chip, str):
|
||
chip = chip.strip()
|
||
board = pcie.get("board_label") or pcie.get("board") or ""
|
||
if isinstance(board, str):
|
||
board = board.strip()
|
||
slot = pcie.get("slot_port")
|
||
if slot is None:
|
||
slot = pcie.get("downstream_port")
|
||
ap = pcie.get("adapter_port")
|
||
sfp_sn = pcie.get("sfp_serial")
|
||
if isinstance(sfp_sn, str) and sfp_sn.strip():
|
||
sfp_sn = sfp_sn.strip()
|
||
if len(sfp_sn) > 10:
|
||
sfp_sn = sfp_sn[-10:]
|
||
else:
|
||
sfp_sn = ""
|
||
bits = []
|
||
if board:
|
||
bits.append(board)
|
||
if chip:
|
||
bits.append(chip)
|
||
if slot is not None and str(slot).strip() != "":
|
||
bits.append(f"slot{slot}")
|
||
if ap is not None and str(ap).strip() != "":
|
||
bits.append(f"SFP{ap}")
|
||
if sfp_sn:
|
||
bits.append(sfp_sn)
|
||
if bus:
|
||
bits.append(bus)
|
||
s = " ".join(bits) if bits else ""
|
||
if not s:
|
||
return ""
|
||
s = s.replace("\n", " ")
|
||
if len(s) > width:
|
||
return s[: width - 2] + ".."
|
||
return s
|
||
|
||
|
||
def _prompt_pcie_fields_interactive(pcie_start: dict) -> dict:
|
||
"""TTY prompts; non-empty answers override or set keys. EOF on first line → return pcie_start."""
|
||
pcie = dict(pcie_start) if isinstance(pcie_start, dict) else {}
|
||
labels = [
|
||
(
|
||
"bus_address",
|
||
"PCI bus / BDF (e.g. 0000:c1:00.0 or c1:00.0)",
|
||
"str",
|
||
),
|
||
(
|
||
"switch_chip",
|
||
"PCIe switch chip (e.g. PEX8718)",
|
||
"str",
|
||
),
|
||
(
|
||
"slot_port",
|
||
'Downstream slot index (integer; monitor "Port 0" → 0)',
|
||
"int",
|
||
),
|
||
(
|
||
"board_label",
|
||
"Board label (e.g. H3)",
|
||
"str",
|
||
),
|
||
(
|
||
"board_serial",
|
||
"Board serial (optional, e.g. 4C-0A-3D-…)",
|
||
"str",
|
||
),
|
||
(
|
||
"adapter_port",
|
||
"Host adapter / SFP lane (integer, e.g. 1–4)",
|
||
"int",
|
||
),
|
||
(
|
||
"sfp_serial",
|
||
"SFP module serial (optional)",
|
||
"str",
|
||
),
|
||
(
|
||
"link_capability",
|
||
"Link capability (e.g. Gen3 x4)",
|
||
"str",
|
||
),
|
||
(
|
||
"link_state",
|
||
"Link state (e.g. Gen3 x4)",
|
||
"str",
|
||
),
|
||
]
|
||
for key, label, kind in labels:
|
||
try:
|
||
raw = input(f" {label} [empty=keep previous]: ").strip()
|
||
except EOFError:
|
||
print(" (EOF — leaving earlier PCIe answers as-is.)", flush=True)
|
||
return pcie
|
||
if not raw:
|
||
continue
|
||
if kind == "int":
|
||
try:
|
||
pcie[key] = int(raw, 10)
|
||
except ValueError:
|
||
print(f" Ignored {key!r} (not an integer).", flush=True)
|
||
else:
|
||
pcie[key] = raw
|
||
return pcie
|
||
|
||
|
||
def prompt_pcie_metadata_for_calibrate(existing_pcie):
|
||
"""
|
||
Optional TTY step after assigning a fiber id during panel calibrate.
|
||
|
||
Prints the Adnacom card list (no paste): pick ``1``–``6`` then SFP lane ``1``–``4``,
|
||
or ``m`` for typed fields, ``c`` to clear, Enter to keep.
|
||
|
||
Returns ``(action, payload)``:
|
||
``("keep", None)`` — leave ``entry['pcie']`` unchanged (still from copied base map).
|
||
``("clear", None)`` — remove ``pcie`` from this entry.
|
||
``("set", dict)`` — replace with ``dict`` (caller drops ``pcie`` if dict is empty).
|
||
"""
|
||
print_catalog_menu()
|
||
try:
|
||
r = input(
|
||
" PCIe? [Enter/s/skip=keep, 1–6=card, m=manual, c=clear, Ctrl-C=save & exit calibrate]: "
|
||
).strip().lower()
|
||
except EOFError:
|
||
return ("keep", None)
|
||
if r in ("s", "skip"):
|
||
return ("keep", None)
|
||
if r in ("c", "clear"):
|
||
return ("clear", None)
|
||
if r in ("m", "manual"):
|
||
base = existing_pcie if isinstance(existing_pcie, dict) else {}
|
||
newp = _prompt_pcie_fields_interactive(base)
|
||
return ("set", newp)
|
||
if r.isdigit():
|
||
n = int(r, 10)
|
||
if 1 <= n <= len(ADNACOM_KNOWN_CARDS):
|
||
card = ADNACOM_KNOWN_CARDS[n - 1]
|
||
bdf = short_bdf(card["bus_address"])
|
||
tail = board_serial_tail(card["board_serial"])
|
||
print(
|
||
f" Selected card {n}: {bdf} board SN …{tail} (pick SFP 1–4 in Monitor)",
|
||
flush=True,
|
||
)
|
||
try:
|
||
lane_s = input(
|
||
" SFP lane on that card [1–4], Enter=cancel: "
|
||
).strip()
|
||
except EOFError:
|
||
return ("keep", None)
|
||
if not lane_s:
|
||
print(" Cancelled — PCIe unchanged.", flush=True)
|
||
return ("keep", None)
|
||
try:
|
||
lane = int(lane_s, 10)
|
||
except ValueError:
|
||
print(" Not an integer — PCIe unchanged.", flush=True)
|
||
return ("keep", None)
|
||
if lane < 1 or lane > 4:
|
||
print(" Use 1–4 — PCIe unchanged.", flush=True)
|
||
return ("keep", None)
|
||
newp = pcie_from_card_and_lane(card, lane)
|
||
print(
|
||
f" Stored: {bdf} SFP{lane} module SN {newp['sfp_serial']}",
|
||
flush=True,
|
||
)
|
||
return ("set", newp)
|
||
if r:
|
||
print(" Unrecognised — PCIe unchanged.", flush=True)
|
||
return ("keep", None)
|
||
|
||
|