Calibrate PCIe metadata, Adnacom catalog, hybrid calibrate env.

Add optional fiber_ports.pcie (board_serial, sfp_serial) and PCIe column in
panel/fiber status. During panel calibrate, prompt to pick known H3 cards 1–6
and SFP lane 1–4 from adnacom_pcie_catalog, or manual/clear/keep.

HUB_MANAGER_CALIBRATE_REMOTES in remote_ssh.env merges into calibrate SSH
hosts so one panel calibrate run can cover local and remotes without repeating
--ssh. Document in remote_ssh.env.example and help.

Made-with: Cursor
This commit is contained in:
Robert McMahon 2026-03-29 17:36:05 -07:00
parent e7869dd9f2
commit 6d2fb4b957
7 changed files with 448 additions and 18 deletions

View File

@ -6,7 +6,18 @@
"port": 0,
"usb_id": "0bda:8812",
"chip_type": "Realtek RTL8812AU",
"chip_profiled_at": "2026-03-27T12:00:00Z"
"chip_profiled_at": "2026-03-27T12:00:00Z",
"pcie": {
"bus_address": "0000:c1:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-5B-13-03",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"adapter_port": 1,
"sfp_serial": "6C82510221389"
}
},
"2": { "hub": 1, "port": 1 },
"5": { "hub": 1, "port": 4 },

View File

@ -53,7 +53,7 @@ class AcronameManager:
stemmod.brainstem.stem.USBHub2x4,
]
model = getattr(spec, "model", None)
defs = getattr(brainstem, "defs", None)
defs = getattr(stemmod.brainstem, "defs", None)
preferred = []
if defs is not None and model is not None:
for mid, cls in (
@ -353,10 +353,11 @@ class AcronameManager:
if need_local and not self.hubs and not self.connect():
return
print(
f"{'Panel':<7} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | {'Chip (saved)':<28}",
f"{'Panel':<7} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | "
f"{'Chip (saved)':<28} | {'PCIe (saved)':<22}",
flush=True,
)
print("-" * 95)
print("-" * 120)
for idx in range(PANEL_SLOTS):
panel_n = idx + 1
key = str(panel_n)
@ -364,8 +365,12 @@ class AcronameManager:
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
chip_s = fm.stored_chip_preview(entry)
pcie_s = fm.stored_pcie_preview(entry)
if tup is None:
print(f"{panel_n:<7} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28}", flush=True)
print(
f"{panel_n:<7} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
hub_1, port_0 = tup
route = (ssh if ssh else "local")[:18]
@ -378,14 +383,16 @@ class AcronameManager:
else:
pwr, cur = rs.parse_status_line_for_hub_port(out, hub_1, port_0)
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | {chip_s:<28}",
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | {chip_s:<28}",
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
@ -393,7 +400,8 @@ class AcronameManager:
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | {chip_s:<28}",
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
@ -402,7 +410,8 @@ class AcronameManager:
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | {chip_s:<28}",
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
@ -544,10 +553,11 @@ class AcronameManager:
ports = doc["fiber_ports"]
keys = sorted(ports.keys(), key=fm.fiber_sort_key)
print(
f"{'Fiber':<8} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | {'Chip (saved)':<28}",
f"{'Fiber':<8} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | "
f"{'Chip (saved)':<28} | {'PCIe (saved)':<22}",
flush=True,
)
print("-" * 95)
print("-" * 120)
need_local = any(
fm.fiber_entry_hub_port(ports[k]) is not None
and not fm.fiber_ssh_target(ports[k] if isinstance(ports[k], dict) else None)
@ -560,8 +570,12 @@ class AcronameManager:
tup = fm.fiber_entry_hub_port(entry)
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
chip_s = fm.stored_chip_preview(entry)
pcie_s = fm.stored_pcie_preview(entry)
if tup is None:
print(f"{key!s:<8} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28}", flush=True)
print(
f"{key!s:<8} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
hub_1, port_0 = tup
route = (ssh if ssh else "local")[:18]
@ -574,14 +588,16 @@ class AcronameManager:
else:
pwr, cur = rs.parse_status_line_for_hub_port(out, hub_1, port_0)
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | {chip_s:<28}",
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | {chip_s:<28}",
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
@ -589,7 +605,8 @@ class AcronameManager:
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | {chip_s:<28}",
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
continue
@ -598,7 +615,8 @@ class AcronameManager:
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | {chip_s:<28}",
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | "
f"{chip_s:<28} | {pcie_s:<22}",
flush=True,
)
@ -700,12 +718,33 @@ class AcronameManager:
seen_h.add(s)
cli_hosts.append(s)
rs.apply_remote_ssh_env_file()
env_rem = os.environ.get("HUB_MANAGER_CALIBRATE_REMOTES", "").strip()
if env_rem:
added_from_env = []
for part in env_rem.split(","):
s = part.strip()
if s and s not in seen_h:
seen_h.add(s)
cli_hosts.append(s)
added_from_env.append(s)
if added_from_env:
print(
f"hub_manager: calibrate remotes from HUB_MANAGER_CALIBRATE_REMOTES: "
f"{', '.join(added_from_env)}",
file=sys.stderr,
flush=True,
)
# Always try hard to open local hubs for calibrate: full errors + stem-class retry (see _connect_specs).
saw_specs_connect_failed = False
local_ok = bool(self.hubs)
if not local_ok:
specs_pre = self._enumerate_usb_specs()
if specs_pre:
local_ok = self._connect_specs(specs_pre, quiet=False)
if not local_ok:
saw_specs_connect_failed = True
if not local_ok and cli_hosts:
print(
"hub_manager: Local USB shows Acroname module(s) but no hub opened — "
@ -735,6 +774,20 @@ class AcronameManager:
steps = steps[: max(0, limit)]
if not steps:
if saw_specs_connect_failed and not cli_hosts:
print(
"hub_manager: This PC sees Acroname USB module(s) in BrainStem discovery but connectFromSpec "
"failed, and no remote host was given for calibrate.\n"
" If your hubs are on a Raspberry Pi (or another machine), run from here:\n"
" python3 hub_manager.py panel calibrate merge --ssh pi@<pi-address>\n"
" (repeat --ssh for each host). Or put in fiber_map.json next to hub_manager.py:\n"
' "calibrate_remotes": ["pi@<pi-address>"]\n'
" Use remote_ssh.env on this PC if the Pi uses a venv path for python / script.\n"
" If the hubs are really plugged into *this* Fedora box, fix USB access (udev 24ff, plugdev, "
"unplug/replug) until `python3 hub_manager.py discover` opens them.",
file=sys.stderr,
flush=True,
)
print(
"Nothing to calibrate: no local Acroname hubs and no remote ports "
"(use --ssh user@host or calibrate_remotes in fiber_map.json).",
@ -785,7 +838,9 @@ class AcronameManager:
"Order: all local hub ports (hub 1 port 0 first), then each --ssh hosts ports in order.\n"
"Each step: lsusb snapshot (port OFF) → ON (~2s) → new lsusb lines (chip hint) → fiber id, s=skip, q=quit.\n"
"When you map a fiber, usb_id / chip_type are saved if new lsusb lines appeared.\n"
"Remote steps store ssh in fiber_map.json automatically.",
"Remote steps store ssh in fiber_map.json automatically.\n"
"After each fiber id you can pick PCIe by number: 16 = known Adnacom H3 card, then SFP 14 "
"(no paste), or m=manual / c=clear / Enter=keep. Edit fiber_map.json anytime (see example).",
flush=True,
)
@ -895,6 +950,14 @@ class AcronameManager:
):
entry.pop(k, None)
entry.update(fm.chip_fields_from_lsusb_lines(chip_hint_lines))
action, pdata = fm.prompt_pcie_metadata_for_calibrate(entry.get("pcie"))
if action == "clear":
entry.pop("pcie", None)
elif action == "set" and pdata is not None:
if pdata:
entry["pcie"] = pdata
else:
entry.pop("pcie", None)
ports[key] = entry
self._write_fiber_map_document(doc)

View File

@ -0,0 +1,146 @@
"""
Known Adnacom Monitor H3 / PEX8718 boards (PCI BDF, board serial, SFP serials per lane).
Populated from operator screenshots; used during panel calibrate so PCIe metadata can be
chosen with number keys (no copy/paste).
"""
from typing import Any, Dict, List
# SFP order is monitor "SFP 1" … "SFP 4" (adapter_port 14).
ADNACOM_KNOWN_CARDS: List[Dict[str, Any]] = [
{
"bus_address": "0000:01:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-B2-13-13",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82510221225",
"6C82510221222",
"6C82510221223",
"6C82510221224",
],
},
{
"bus_address": "0000:07:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-B2-13-19",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82510221226",
"6C82601120819",
"6C82512030665",
"6C82601120820",
],
},
{
"bus_address": "0000:21:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-02-13-15",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82601120813",
"6C82601120811",
"6C82601120814",
"6C82601120816",
],
},
{
"bus_address": "0000:c1:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-5B-13-03",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82510221389",
"6C82510221388",
"6C82510221387",
"6C82510221386",
],
},
{
"bus_address": "0000:c7:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-02-13-1A",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82601120815",
"6C82601120818",
"6C82512030663",
"6C82601120817",
],
},
{
"bus_address": "0000:e2:00.0",
"board_label": "H3",
"board_serial": "4C-0A-3D-02-13-17",
"switch_chip": "PEX8718",
"slot_port": 0,
"link_capability": "Gen3 x4",
"link_state": "Gen3 x4",
"sfp_serials": [
"6C82510221229",
"6C82510221230",
"6C82510221228",
"6C82510221227",
],
},
]
def short_bdf(bus_address: str) -> str:
s = (bus_address or "").strip()
if s.startswith("0000:"):
return s[5:]
return s
def board_serial_tail(board_serial: str) -> str:
s = (board_serial or "").strip()
if len(s) <= 10:
return s
return s[-10:]
def print_catalog_menu() -> None:
print(" Known Adnacom H3 / PEX8718 cards (match the dropdown in Adnacom Monitor):", flush=True)
for i, c in enumerate(ADNACOM_KNOWN_CARDS, start=1):
bdf = short_bdf(c["bus_address"])
tail = board_serial_tail(c["board_serial"])
print(
f" {i}) {bdf:>8} {c['board_label']} board SN …{tail}",
flush=True,
)
print(" m) Type PCIe fields manually (long-form prompts)", flush=True)
def pcie_from_card_and_lane(card: Dict[str, Any], sfp_lane_1_4: int) -> Dict[str, Any]:
if sfp_lane_1_4 < 1 or sfp_lane_1_4 > 4:
raise ValueError("sfp_lane_1_4 must be 14")
serials = card["sfp_serials"]
serial = serials[sfp_lane_1_4 - 1]
return {
"bus_address": card["bus_address"],
"board_label": card["board_label"],
"board_serial": card["board_serial"],
"switch_chip": card["switch_chip"],
"slot_port": int(card["slot_port"]),
"link_capability": card["link_capability"],
"link_state": card["link_state"],
"adapter_port": sfp_lane_1_4,
"sfp_serial": serial,
}

View File

@ -122,7 +122,8 @@ def main() -> int:
" hub_manager.py panel on|off <panel_port>\n"
" hub_manager.py panel reboot|reboot-force <panel_port>\n"
" hub_manager.py panel calibrate [merge] [<N>] [--ssh user@host] …\n"
" calibrate: local hub ports first, then each --ssh host (and calibrate_remotes in JSON).\n"
" calibrate: local hub ports first, then each --ssh host, calibrate_remotes in JSON, and/or\n"
" HUB_MANAGER_CALIBRATE_REMOTES in remote_ssh.env (comma-separated) for one-command hybrid.\n"
" merge / N as before; remote steps set \"ssh\" on new fiber_ports entries.\n"
" <panel_port> is 124; use power fiber-port for arbitrary ids.\n"
" Preset: fiber_map.rpi20.json → fiber_map.json for 8+8+4 → fiber ports 120.",
@ -182,11 +183,13 @@ def main() -> int:
" HUB_MANAGER_REMOTE_PYTHON remote interpreter (default python3)\n"
" HUB_MANAGER_REMOTE_SCRIPT remote script path (default /usr/local/bin/hub_manager.py)\n"
" HUB_MANAGER_SSH_OPTS e.g. '-o BatchMode=yes'\n"
" HUB_MANAGER_CALIBRATE_REMOTES optional comma-separated user@host for panel calibrate (no --ssh needed)\n"
" Pi: pip install -r requirements.txt in the venv you point REMOTE_PYTHON at; udev 24ff.\n"
"\n"
"fiber_map.json fiber_ports entries may set ssh routing (hubs on another machine):\n"
' "ssh": "user@host" or "remote": "" or "host": "ip", "user": "pi"\n'
" On the SSH destination, the same fiber id should be local (omit ssh) so commands are not re-forwarded.\n"
' Optional "pcie": { bus, switch, slot, adapter_port, sfp_serial, board_serial, … } — calibrate can fill via 16+SFP.\n'
"\n"
"Hybrid calibrate: put {\"calibrate_remotes\": [\"pi@ip\"]} in fiber_map.json or pass --ssh per host;\n"
" order is all local downstream ports, then each remotes ports (see calibrate-ports-json on the Pi)."

View File

@ -6,6 +6,13 @@ import re
import sys
import time
from hubmgr.adnacom_pcie_catalog import (
ADNACOM_KNOWN_CARDS,
board_serial_tail,
pcie_from_card_and_lane,
print_catalog_menu,
short_bdf,
)
from hubmgr.constants import PANEL_SLOTS
from hubmgr.paths import fiber_map_path, panel_map_path
@ -151,6 +158,200 @@ def stored_chip_preview(entry, width=26):
return ""
def stored_pcie_preview(entry, width=22):
"""
Short label from optional fiber_ports[].pcie object (manual or external tooling).
Supported keys (all optional):
bus_address / pci_address e.g. 0000:c1:00.0 (display shortens leading 0000:)
switch_chip / chip e.g. PEX8718
slot_port / downstream_port integer downstream slot index (e.g. monitor "Port 0")
board_label e.g. H3
board_serial optional host board serial string
adapter_port host adapter lane, e.g. SFP 14 from monitor tools
sfp_serial transceiver serial for that lane (from catalog or manual)
link_capability, link_state free-text strings
"""
if not isinstance(entry, dict):
return ""
pcie = entry.get("pcie")
if not isinstance(pcie, dict):
return ""
bus = pcie.get("bus_address") or pcie.get("pci_address") or ""
if isinstance(bus, str):
bus = bus.strip()
if bus.startswith("0000:"):
bus = bus[5:]
chip = pcie.get("switch_chip") or pcie.get("chip") or ""
if isinstance(chip, str):
chip = chip.strip()
board = pcie.get("board_label") or pcie.get("board") or ""
if isinstance(board, str):
board = board.strip()
slot = pcie.get("slot_port")
if slot is None:
slot = pcie.get("downstream_port")
ap = pcie.get("adapter_port")
sfp_sn = pcie.get("sfp_serial")
if isinstance(sfp_sn, str) and sfp_sn.strip():
sfp_sn = sfp_sn.strip()
if len(sfp_sn) > 10:
sfp_sn = sfp_sn[-10:]
else:
sfp_sn = ""
bits = []
if board:
bits.append(board)
if chip:
bits.append(chip)
if slot is not None and str(slot).strip() != "":
bits.append(f"slot{slot}")
if ap is not None and str(ap).strip() != "":
bits.append(f"SFP{ap}")
if sfp_sn:
bits.append(sfp_sn)
if bus:
bits.append(bus)
s = " ".join(bits) if bits else ""
if not s:
return ""
s = s.replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
def _prompt_pcie_fields_interactive(pcie_start: dict) -> dict:
"""TTY prompts; non-empty answers override or set keys. EOF on first line → return pcie_start."""
pcie = dict(pcie_start) if isinstance(pcie_start, dict) else {}
labels = [
(
"bus_address",
"PCI bus / BDF (e.g. 0000:c1:00.0 or c1:00.0)",
"str",
),
(
"switch_chip",
"PCIe switch chip (e.g. PEX8718)",
"str",
),
(
"slot_port",
'Downstream slot index (integer; monitor "Port 0" → 0)',
"int",
),
(
"board_label",
"Board label (e.g. H3)",
"str",
),
(
"board_serial",
"Board serial (optional, e.g. 4C-0A-3D-…)",
"str",
),
(
"adapter_port",
"Host adapter / SFP lane (integer, e.g. 14)",
"int",
),
(
"sfp_serial",
"SFP module serial (optional)",
"str",
),
(
"link_capability",
"Link capability (e.g. Gen3 x4)",
"str",
),
(
"link_state",
"Link state (e.g. Gen3 x4)",
"str",
),
]
for key, label, kind in labels:
try:
raw = input(f" {label} [empty=keep previous]: ").strip()
except EOFError:
print(" (EOF — leaving earlier PCIe answers as-is.)", flush=True)
return pcie
if not raw:
continue
if kind == "int":
try:
pcie[key] = int(raw, 10)
except ValueError:
print(f" Ignored {key!r} (not an integer).", flush=True)
else:
pcie[key] = raw
return pcie
def prompt_pcie_metadata_for_calibrate(existing_pcie):
"""
Optional TTY step after assigning a fiber id during panel calibrate.
Prints the Adnacom card list (no paste): pick ``1````6`` then SFP lane ``1````4``,
or ``m`` for typed fields, ``c`` to clear, Enter to keep.
Returns ``(action, payload)``:
``("keep", None)`` leave ``entry['pcie']`` unchanged (still from copied base map).
``("clear", None)`` remove ``pcie`` from this entry.
``("set", dict)`` replace with ``dict`` (caller drops ``pcie`` if dict is empty).
"""
print_catalog_menu()
try:
r = input(
" PCIe? [Enter=keep, 16=card from list, m=manual, c=clear]: "
).strip().lower()
except EOFError:
return ("keep", None)
if r in ("c", "clear"):
return ("clear", None)
if r in ("m", "manual"):
base = existing_pcie if isinstance(existing_pcie, dict) else {}
newp = _prompt_pcie_fields_interactive(base)
return ("set", newp)
if r.isdigit():
n = int(r, 10)
if 1 <= n <= len(ADNACOM_KNOWN_CARDS):
card = ADNACOM_KNOWN_CARDS[n - 1]
bdf = short_bdf(card["bus_address"])
tail = board_serial_tail(card["board_serial"])
print(
f" Selected card {n}: {bdf} board SN …{tail} (pick SFP 14 in Monitor)",
flush=True,
)
try:
lane_s = input(
" SFP lane on that card [14], Enter=cancel: "
).strip()
except EOFError:
return ("keep", None)
if not lane_s:
print(" Cancelled — PCIe unchanged.", flush=True)
return ("keep", None)
try:
lane = int(lane_s, 10)
except ValueError:
print(" Not an integer — PCIe unchanged.", flush=True)
return ("keep", None)
if lane < 1 or lane > 4:
print(" Use 14 — PCIe unchanged.", flush=True)
return ("keep", None)
newp = pcie_from_card_and_lane(card, lane)
print(
f" Stored: {bdf} SFP{lane} module SN {newp['sfp_serial']}",
flush=True,
)
return ("set", newp)
if r:
print(" Unrecognised — PCIe unchanged.", flush=True)
return ("keep", None)
def read_panel_map_file(path):
"""Load and normalize panel map JSON to a list of length PANEL_SLOTS."""
with open(path, encoding="utf-8") as f:

View File

@ -14,6 +14,8 @@ REMOTE_SSH_ENV_KEYS = frozenset(
"HUB_MANAGER_REMOTE_SCRIPT",
"HUB_MANAGER_SSH_BIN",
"HUB_MANAGER_SSH_OPTS",
# Comma-separated user@host; panel calibrate adds these without repeating --ssh on CLI.
"HUB_MANAGER_CALIBRATE_REMOTES",
}
)

View File

@ -12,3 +12,7 @@
HUB_MANAGER_REMOTE_PYTHON=/home/rjmcmahon/Code/acroname/env/bin/python3
HUB_MANAGER_REMOTE_SCRIPT=/home/rjmcmahon/Code/acroname/hub_manager.py
# Optional: comma-separated Pi (or other) hosts. Then one command does local + remote calibrate:
# python3 hub_manager.py panel calibrate
# HUB_MANAGER_CALIBRATE_REMOTES=pi@192.168.1.50,pi@192.168.1.51