FiWiManager/hubmgr/fiber_map_io.py

164 lines
4.9 KiB
Python

"""Load/save fiber_map.json and legacy panel_map.json; parse entries and chip fields."""
import json
import os
import re
import sys
import time
from hubmgr.constants import PANEL_SLOTS
from hubmgr.paths import fiber_map_path, panel_map_path
def ensure_fiber_map_document(doc):
if not isinstance(doc, dict):
raise ValueError("map root must be a JSON object")
out = dict(doc)
fp = out.get("fiber_ports")
if fp is None:
out["fiber_ports"] = {}
elif not isinstance(fp, dict):
raise ValueError("fiber_ports must be a JSON object")
return out
def document_from_legacy_panel_array(slots):
fiber_ports = {}
for idx, slot in enumerate(slots):
if slot is not None:
fiber_ports[str(idx + 1)] = {"hub": slot[0], "port": slot[1]}
return {"fiber_ports": fiber_ports}
def load_fiber_map_document():
"""
Load routing map: prefer fiber_map.json (object keyed by fiber port).
If missing, migrate in-memory from legacy panel_map.json (24-slot array).
Returns None if neither file exists.
"""
fpath = fiber_map_path()
ppath = panel_map_path()
if os.path.isfile(fpath):
with open(fpath, encoding="utf-8") as f:
return ensure_fiber_map_document(json.load(f))
if os.path.isfile(ppath):
slots = read_panel_map_file(ppath)
return ensure_fiber_map_document(document_from_legacy_panel_array(slots))
return None
def load_fiber_map_or_exit():
doc = load_fiber_map_document()
if doc is None:
print(
f"Missing {fiber_map_path()} (or legacy {panel_map_path()}).\n"
" Copy fiber_map.example.json → fiber_map.json and set fiber_ports "
'(e.g. "5": {"hub": 1, "port": 2, "ssh": "pi@192.168.1.39"}). '
"Use ssh / remote / host+user when Acroname hubs are reached via SSH.",
file=sys.stderr,
flush=True,
)
sys.exit(1)
return doc
def fiber_sort_key(key):
s = str(key).strip()
if s.isdigit():
return (0, int(s))
return (1, s)
def parse_panel_map_entry(entry):
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
if entry is None:
return None
if isinstance(entry, str):
entry = entry.strip()
if not entry or entry.lower() == "null":
return None
try:
h_s, p_s = entry.split(".", 1)
return int(h_s), int(p_s)
except ValueError:
return None
if isinstance(entry, dict):
try:
return int(entry["hub"]), int(entry["port"])
except (KeyError, TypeError, ValueError):
return None
return None
def fiber_entry_hub_port(entry):
return parse_panel_map_entry(entry)
def fiber_ssh_target(entry):
"""
Optional SSH destination for a fiber_ports entry (hubs live on another host).
- "ssh": "user@host" or "user@ip"
- or "remote": same
- or "host": "ip" with optional "user" (default root)
"""
if not isinstance(entry, dict):
return None
s = entry.get("ssh") or entry.get("remote")
if isinstance(s, str) and s.strip():
return s.strip()
host = entry.get("host")
if isinstance(host, str) and host.strip():
user = entry.get("user")
u = user.strip() if isinstance(user, str) and user.strip() else "root"
return f"{u}@{host.strip()}"
return None
def chip_fields_from_lsusb_lines(lines):
"""Build fiber_map.json fields from full lsusb lines (see fiber chip … save)."""
if not lines:
return {}
ids = []
descs = []
for line in lines:
m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I)
if m:
ids.append(f"{m.group(1)}:{m.group(2)}".lower())
descs.append(m.group(3).strip())
out = {
"usb_lsusb_lines": list(lines),
"chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if ids:
out["usb_id"] = ids[0]
out["chip_type"] = descs[0] if descs[0] else ids[0]
if len(ids) > 1:
out["usb_ids"] = ids
return out
def stored_chip_preview(entry, width=26):
"""Short label for status tables from saved probe metadata."""
if not isinstance(entry, dict):
return ""
for k in ("chip_type", "usb_description", "usb_id"):
v = entry.get(k)
if isinstance(v, str) and v.strip():
s = v.strip().replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
return ""
def read_panel_map_file(path):
"""Load and normalize panel map JSON to a list of length PANEL_SLOTS."""
with open(path, encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("panel map must be a JSON array")
slots = [parse_panel_map_entry(x) for x in data]
while len(slots) < PANEL_SLOTS:
slots.append(None)
return slots[:PANEL_SLOTS]