"""Load/save fiber_map.json and legacy panel_map.json; parse entries and chip fields.""" import json import os import re import sys import time from hubmgr.adnacom_pcie_catalog import ( ADNACOM_KNOWN_CARDS, board_serial_tail, pcie_from_card_and_lane, print_catalog_menu, short_bdf, ) from hubmgr.constants import PANEL_SLOTS from hubmgr.paths import fiber_map_path, panel_map_path def ensure_fiber_map_document(doc): if not isinstance(doc, dict): raise ValueError("map root must be a JSON object") out = dict(doc) fp = out.get("fiber_ports") if fp is None: out["fiber_ports"] = {} elif not isinstance(fp, dict): raise ValueError("fiber_ports must be a JSON object") return out def document_from_legacy_panel_array(slots): fiber_ports = {} for idx, slot in enumerate(slots): if slot is not None: fiber_ports[str(idx + 1)] = {"hub": slot[0], "port": slot[1]} return {"fiber_ports": fiber_ports} def load_fiber_map_document(): """ Load routing map: prefer fiber_map.json (object keyed by fiber port). If missing, migrate in-memory from legacy panel_map.json (24-slot array). Returns None if neither file exists. """ fpath = fiber_map_path() ppath = panel_map_path() if os.path.isfile(fpath): with open(fpath, encoding="utf-8") as f: return ensure_fiber_map_document(json.load(f)) if os.path.isfile(ppath): slots = read_panel_map_file(ppath) return ensure_fiber_map_document(document_from_legacy_panel_array(slots)) return None def load_fiber_map_or_exit(): doc = load_fiber_map_document() if doc is None: print( f"Missing {fiber_map_path()} (or legacy {panel_map_path()}).\n" " Copy fiber_map.example.json → fiber_map.json and set fiber_ports " '(e.g. "5": {"hub": 1, "port": 2, "ssh": "pi@192.168.1.39"}). ' "Use ssh / remote / host+user when Acroname hubs are reached via SSH.", file=sys.stderr, flush=True, ) sys.exit(1) return doc def fiber_sort_key(key): s = str(key).strip() if s.isdigit(): return (0, int(s)) return (1, s) def parse_panel_map_entry(entry): """Return (hub_1based, port_0based) or None if unmapped / invalid.""" if entry is None: return None if isinstance(entry, str): entry = entry.strip() if not entry or entry.lower() == "null": return None try: h_s, p_s = entry.split(".", 1) return int(h_s), int(p_s) except ValueError: return None if isinstance(entry, dict): try: return int(entry["hub"]), int(entry["port"]) except (KeyError, TypeError, ValueError): return None return None def fiber_entry_hub_port(entry): return parse_panel_map_entry(entry) def fiber_ssh_target(entry): """ Optional SSH destination for a fiber_ports entry (hubs live on another host). - "ssh": "user@host" or "user@ip" - or "remote": same - or "host": "ip" with optional "user" (default root) """ if not isinstance(entry, dict): return None s = entry.get("ssh") or entry.get("remote") if isinstance(s, str) and s.strip(): return s.strip() host = entry.get("host") if isinstance(host, str) and host.strip(): user = entry.get("user") u = user.strip() if isinstance(user, str) and user.strip() else "root" return f"{u}@{host.strip()}" return None def chip_fields_from_lsusb_lines(lines): """Build fiber_map.json fields from full lsusb lines (see fiber chip … save).""" if not lines: return {} ids = [] descs = [] for line in lines: m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I) if m: ids.append(f"{m.group(1)}:{m.group(2)}".lower()) descs.append(m.group(3).strip()) out = { "usb_lsusb_lines": list(lines), "chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } if ids: out["usb_id"] = ids[0] out["chip_type"] = descs[0] if descs[0] else ids[0] if len(ids) > 1: out["usb_ids"] = ids return out def stored_chip_preview(entry, width=26): """Short label for status tables from saved probe metadata.""" if not isinstance(entry, dict): return "" for k in ("chip_type", "usb_description", "usb_id"): v = entry.get(k) if isinstance(v, str) and v.strip(): s = v.strip().replace("\n", " ") if len(s) > width: return s[: width - 2] + ".." return s return "" def stored_pcie_preview(entry, width=22): """ Short label from optional fiber_ports[].pcie object (manual or external tooling). Supported keys (all optional): bus_address / pci_address — e.g. 0000:c1:00.0 (display shortens leading 0000:) switch_chip / chip — e.g. PEX8718 slot_port / downstream_port — integer downstream slot index (e.g. monitor "Port 0") board_label — e.g. H3 board_serial — optional host board serial string adapter_port — host adapter lane, e.g. SFP 1–4 from monitor tools sfp_serial — transceiver serial for that lane (from catalog or manual) link_capability, link_state — free-text strings """ if not isinstance(entry, dict): return "" pcie = entry.get("pcie") if not isinstance(pcie, dict): return "" bus = pcie.get("bus_address") or pcie.get("pci_address") or "" if isinstance(bus, str): bus = bus.strip() if bus.startswith("0000:"): bus = bus[5:] chip = pcie.get("switch_chip") or pcie.get("chip") or "" if isinstance(chip, str): chip = chip.strip() board = pcie.get("board_label") or pcie.get("board") or "" if isinstance(board, str): board = board.strip() slot = pcie.get("slot_port") if slot is None: slot = pcie.get("downstream_port") ap = pcie.get("adapter_port") sfp_sn = pcie.get("sfp_serial") if isinstance(sfp_sn, str) and sfp_sn.strip(): sfp_sn = sfp_sn.strip() if len(sfp_sn) > 10: sfp_sn = sfp_sn[-10:] else: sfp_sn = "" bits = [] if board: bits.append(board) if chip: bits.append(chip) if slot is not None and str(slot).strip() != "": bits.append(f"slot{slot}") if ap is not None and str(ap).strip() != "": bits.append(f"SFP{ap}") if sfp_sn: bits.append(sfp_sn) if bus: bits.append(bus) s = " ".join(bits) if bits else "" if not s: return "" s = s.replace("\n", " ") if len(s) > width: return s[: width - 2] + ".." return s def _prompt_pcie_fields_interactive(pcie_start: dict) -> dict: """TTY prompts; non-empty answers override or set keys. EOF on first line → return pcie_start.""" pcie = dict(pcie_start) if isinstance(pcie_start, dict) else {} labels = [ ( "bus_address", "PCI bus / BDF (e.g. 0000:c1:00.0 or c1:00.0)", "str", ), ( "switch_chip", "PCIe switch chip (e.g. PEX8718)", "str", ), ( "slot_port", 'Downstream slot index (integer; monitor "Port 0" → 0)', "int", ), ( "board_label", "Board label (e.g. H3)", "str", ), ( "board_serial", "Board serial (optional, e.g. 4C-0A-3D-…)", "str", ), ( "adapter_port", "Host adapter / SFP lane (integer, e.g. 1–4)", "int", ), ( "sfp_serial", "SFP module serial (optional)", "str", ), ( "link_capability", "Link capability (e.g. Gen3 x4)", "str", ), ( "link_state", "Link state (e.g. Gen3 x4)", "str", ), ] for key, label, kind in labels: try: raw = input(f" {label} [empty=keep previous]: ").strip() except EOFError: print(" (EOF — leaving earlier PCIe answers as-is.)", flush=True) return pcie if not raw: continue if kind == "int": try: pcie[key] = int(raw, 10) except ValueError: print(f" Ignored {key!r} (not an integer).", flush=True) else: pcie[key] = raw return pcie def prompt_pcie_metadata_for_calibrate(existing_pcie): """ Optional TTY step after assigning a fiber id during panel calibrate. Prints the Adnacom card list (no paste): pick ``1``–``6`` then SFP lane ``1``–``4``, or ``m`` for typed fields, ``c`` to clear, Enter to keep. Returns ``(action, payload)``: ``("keep", None)`` — leave ``entry['pcie']`` unchanged (still from copied base map). ``("clear", None)`` — remove ``pcie`` from this entry. ``("set", dict)`` — replace with ``dict`` (caller drops ``pcie`` if dict is empty). """ print_catalog_menu() try: r = input( " PCIe? [Enter=keep, 1–6=card from list, m=manual, c=clear]: " ).strip().lower() except EOFError: return ("keep", None) if r in ("c", "clear"): return ("clear", None) if r in ("m", "manual"): base = existing_pcie if isinstance(existing_pcie, dict) else {} newp = _prompt_pcie_fields_interactive(base) return ("set", newp) if r.isdigit(): n = int(r, 10) if 1 <= n <= len(ADNACOM_KNOWN_CARDS): card = ADNACOM_KNOWN_CARDS[n - 1] bdf = short_bdf(card["bus_address"]) tail = board_serial_tail(card["board_serial"]) print( f" Selected card {n}: {bdf} board SN …{tail} (pick SFP 1–4 in Monitor)", flush=True, ) try: lane_s = input( " SFP lane on that card [1–4], Enter=cancel: " ).strip() except EOFError: return ("keep", None) if not lane_s: print(" Cancelled — PCIe unchanged.", flush=True) return ("keep", None) try: lane = int(lane_s, 10) except ValueError: print(" Not an integer — PCIe unchanged.", flush=True) return ("keep", None) if lane < 1 or lane > 4: print(" Use 1–4 — PCIe unchanged.", flush=True) return ("keep", None) newp = pcie_from_card_and_lane(card, lane) print( f" Stored: {bdf} SFP{lane} module SN {newp['sfp_serial']}", flush=True, ) return ("set", newp) if r: print(" Unrecognised — PCIe unchanged.", flush=True) return ("keep", None) def read_panel_map_file(path): """Load and normalize panel map JSON to a list of length PANEL_SLOTS.""" with open(path, encoding="utf-8") as f: data = json.load(f) if not isinstance(data, list): raise ValueError("panel map must be a JSON array") slots = [parse_panel_map_entry(x) for x in data] while len(slots) < PANEL_SLOTS: slots.append(None) return slots[:PANEL_SLOTS]