UmberHubManager/fiwi/fiber_map_io.py

356 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Load/save fiber_map.json; parse hub/port bindings and chip metadata."""
import json
import os
import re
import sys
import time
from fiwi.adnacom_pcie_catalog import (
ADNACOM_KNOWN_CARDS,
board_serial_tail,
pcie_from_card_and_lane,
print_catalog_menu,
short_bdf,
)
from fiwi.paths import fiber_map_path
def ensure_fiber_map_document(doc):
if not isinstance(doc, dict):
raise ValueError("map root must be a JSON object")
out = dict(doc)
fp = out.get("fiber_ports")
if fp is None:
out["fiber_ports"] = {}
elif not isinstance(fp, dict):
raise ValueError("fiber_ports must be a JSON object")
return out
def load_fiber_map_document():
"""Load ``fiber_map.json`` if present; return None if missing."""
fpath = fiber_map_path()
if not os.path.isfile(fpath):
return None
with open(fpath, encoding="utf-8") as f:
return ensure_fiber_map_document(json.load(f))
def load_fiber_map_or_exit():
doc = load_fiber_map_document()
if doc is None:
print(
f"Missing {fiber_map_path()}.\n"
" Copy fiber_map.example.json → fiber_map.json and set fiber_ports "
'(e.g. "5": {"hub": 1, "port": 2, "ssh": "pi@192.168.1.39"}). '
"Use ssh / remote / host+user when USB hubs are reached via SSH.",
file=sys.stderr,
flush=True,
)
sys.exit(1)
return doc
def fiber_sort_key(key):
s = str(key).strip()
if s.isdigit():
return (0, int(s))
return (1, s)
def parse_hub_port_entry(entry):
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
if entry is None:
return None
if isinstance(entry, str):
entry = entry.strip()
if not entry or entry.lower() == "null":
return None
try:
h_s, p_s = entry.split(".", 1)
return int(h_s), int(p_s)
except ValueError:
return None
if isinstance(entry, dict):
try:
return int(entry["hub"]), int(entry["port"])
except (KeyError, TypeError, ValueError):
return None
return None
def fiber_entry_hub_port(entry):
return parse_hub_port_entry(entry)
def fiber_ssh_target(entry):
"""
Optional SSH destination for a fiber_ports entry (hubs live on another host).
- "ssh": "user@host" or "user@ip"
- or "remote": same
- or "host": "ip" with optional "user" (default root)
"""
if not isinstance(entry, dict):
return None
s = entry.get("ssh") or entry.get("remote")
if isinstance(s, str) and s.strip():
return s.strip()
host = entry.get("host")
if isinstance(host, str) and host.strip():
user = entry.get("user")
u = user.strip() if isinstance(user, str) and user.strip() else "root"
return f"{u}@{host.strip()}"
return None
def chip_fields_from_lsusb_lines(lines):
"""Build fiber_map.json fields from full lsusb lines (see fiber chip … save)."""
if not lines:
return {}
ids = []
descs = []
for line in lines:
m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I)
if m:
ids.append(f"{m.group(1)}:{m.group(2)}".lower())
descs.append(m.group(3).strip())
out = {
"usb_lsusb_lines": list(lines),
"chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if ids:
out["usb_id"] = ids[0]
out["chip_type"] = descs[0] if descs[0] else ids[0]
if len(ids) > 1:
out["usb_ids"] = ids
return out
def stored_chip_preview(entry, width=26):
"""Short label for status tables from saved probe metadata."""
if not isinstance(entry, dict):
return ""
def _take(v):
if not isinstance(v, str) or not v.strip():
return ""
s = v.strip().replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
for k in ("chip_type",):
got = _take(entry.get(k))
if got:
return got
wlan = entry.get("wlan")
if isinstance(wlan, dict):
prim = wlan.get("primary")
if isinstance(prim, dict):
for k in ("chip_label", "product"):
got = _take(prim.get(k))
if got:
return got
for k in ("usb_description", "usb_id"):
got = _take(entry.get(k))
if got:
return got
return ""
def stored_pcie_preview(entry, width=22):
"""
Short label from optional fiber_ports[].pcie object (manual or external tooling).
Supported keys (all optional):
bus_address / pci_address — e.g. 0000:c1:00.0 (display shortens leading 0000:)
switch_chip / chip — e.g. PEX8718
slot_port / downstream_port — integer downstream slot index (e.g. monitor "Port 0")
board_label — e.g. H3
board_serial — optional host board serial string
adapter_port — host adapter lane, e.g. SFP 14 from monitor tools
sfp_serial — transceiver serial for that lane (from catalog or manual)
link_capability, link_state — free-text strings
"""
if not isinstance(entry, dict):
return ""
pcie = entry.get("pcie")
if not isinstance(pcie, dict):
return ""
bus = pcie.get("bus_address") or pcie.get("pci_address") or ""
if isinstance(bus, str):
bus = bus.strip()
if bus.startswith("0000:"):
bus = bus[5:]
chip = pcie.get("switch_chip") or pcie.get("chip") or ""
if isinstance(chip, str):
chip = chip.strip()
board = pcie.get("board_label") or pcie.get("board") or ""
if isinstance(board, str):
board = board.strip()
slot = pcie.get("slot_port")
if slot is None:
slot = pcie.get("downstream_port")
ap = pcie.get("adapter_port")
sfp_sn = pcie.get("sfp_serial")
if isinstance(sfp_sn, str) and sfp_sn.strip():
sfp_sn = sfp_sn.strip()
if len(sfp_sn) > 10:
sfp_sn = sfp_sn[-10:]
else:
sfp_sn = ""
bits = []
if board:
bits.append(board)
if chip:
bits.append(chip)
if slot is not None and str(slot).strip() != "":
bits.append(f"slot{slot}")
if ap is not None and str(ap).strip() != "":
bits.append(f"SFP{ap}")
if sfp_sn:
bits.append(sfp_sn)
if bus:
bits.append(bus)
s = " ".join(bits) if bits else ""
if not s:
return ""
s = s.replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
def _prompt_pcie_fields_interactive(pcie_start: dict) -> dict:
"""TTY prompts; non-empty answers override or set keys. EOF on first line → return pcie_start."""
pcie = dict(pcie_start) if isinstance(pcie_start, dict) else {}
labels = [
(
"bus_address",
"PCI bus / BDF (e.g. 0000:c1:00.0 or c1:00.0)",
"str",
),
(
"switch_chip",
"PCIe switch chip (e.g. PEX8718)",
"str",
),
(
"slot_port",
'Downstream slot index (integer; monitor "Port 0" → 0)',
"int",
),
(
"board_label",
"Board label (e.g. H3)",
"str",
),
(
"board_serial",
"Board serial (optional, e.g. 4C-0A-3D-…)",
"str",
),
(
"adapter_port",
"Host adapter / SFP lane (integer, e.g. 14)",
"int",
),
(
"sfp_serial",
"SFP module serial (optional)",
"str",
),
(
"link_capability",
"Link capability (e.g. Gen3 x4)",
"str",
),
(
"link_state",
"Link state (e.g. Gen3 x4)",
"str",
),
]
for key, label, kind in labels:
try:
raw = input(f" {label} [empty=keep previous]: ").strip()
except EOFError:
print(" (EOF — leaving earlier PCIe answers as-is.)", flush=True)
return pcie
if not raw:
continue
if kind == "int":
try:
pcie[key] = int(raw, 10)
except ValueError:
print(f" Ignored {key!r} (not an integer).", flush=True)
else:
pcie[key] = raw
return pcie
def prompt_pcie_metadata_for_calibrate(existing_pcie):
"""
Optional TTY step after assigning a fiber id during panel calibrate.
Prints the Adnacom card list (no paste): pick ``1````6`` then SFP lane ``1````4``,
or ``m`` for typed fields, ``c`` to clear, Enter to keep.
Returns ``(action, payload)``:
``("keep", None)`` — leave ``entry['pcie']`` unchanged (still from copied base map).
``("clear", None)`` — remove ``pcie`` from this entry.
``("set", dict)`` — replace with ``dict`` (caller drops ``pcie`` if dict is empty).
"""
print_catalog_menu()
try:
r = input(
" PCIe? [Enter=keep, 16=card from list, m=manual, c=clear]: "
).strip().lower()
except EOFError:
return ("keep", None)
if r in ("c", "clear"):
return ("clear", None)
if r in ("m", "manual"):
base = existing_pcie if isinstance(existing_pcie, dict) else {}
newp = _prompt_pcie_fields_interactive(base)
return ("set", newp)
if r.isdigit():
n = int(r, 10)
if 1 <= n <= len(ADNACOM_KNOWN_CARDS):
card = ADNACOM_KNOWN_CARDS[n - 1]
bdf = short_bdf(card["bus_address"])
tail = board_serial_tail(card["board_serial"])
print(
f" Selected card {n}: {bdf} board SN …{tail} (pick SFP 14 in Monitor)",
flush=True,
)
try:
lane_s = input(
" SFP lane on that card [14], Enter=cancel: "
).strip()
except EOFError:
return ("keep", None)
if not lane_s:
print(" Cancelled — PCIe unchanged.", flush=True)
return ("keep", None)
try:
lane = int(lane_s, 10)
except ValueError:
print(" Not an integer — PCIe unchanged.", flush=True)
return ("keep", None)
if lane < 1 or lane > 4:
print(" Use 14 — PCIe unchanged.", flush=True)
return ("keep", None)
newp = pcie_from_card_and_lane(card, lane)
print(
f" Stored: {bdf} SFP{lane} module SN {newp['sfp_serial']}",
flush=True,
)
return ("set", newp)
if r:
print(" Unrecognised — PCIe unchanged.", flush=True)
return ("keep", None)