UmberHubManager/hub_manager.py

1498 lines
59 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import sys
import time
import re
import asyncio
import os
import json
import copy
import shlex
import subprocess
import shutil
PANEL_SLOTS = 24
# Loaded on first use so we can print diagnostics before touching native BrainStem code.
brainstem = None
_BS_C = None
def _load_brainstem():
global brainstem, _BS_C
if brainstem is not None:
return brainstem
import brainstem as _bs
brainstem = _bs
try:
from brainstem import _BS_C as _c
_BS_C = _c
except ImportError:
_BS_C = None
return brainstem
def _panel_map_path():
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "panel_map.json")
def _fiber_map_path():
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "fiber_map.json")
def _ensure_fiber_map_document(doc):
if not isinstance(doc, dict):
raise ValueError("map root must be a JSON object")
out = dict(doc)
fp = out.get("fiber_ports")
if fp is None:
out["fiber_ports"] = {}
elif not isinstance(fp, dict):
raise ValueError("fiber_ports must be a JSON object")
return out
def _document_from_legacy_panel_array(slots):
fiber_ports = {}
for idx, slot in enumerate(slots):
if slot is not None:
fiber_ports[str(idx + 1)] = {"hub": slot[0], "port": slot[1]}
return {"fiber_ports": fiber_ports}
def _load_fiber_map_document():
"""
Load routing map: prefer fiber_map.json (object keyed by fiber port).
If missing, migrate in-memory from legacy panel_map.json (24-slot array).
Returns None if neither file exists.
"""
fiber_path = _fiber_map_path()
panel_path = _panel_map_path()
if os.path.isfile(fiber_path):
with open(fiber_path, encoding="utf-8") as f:
return _ensure_fiber_map_document(json.load(f))
if os.path.isfile(panel_path):
slots = _read_panel_map_file(panel_path)
return _ensure_fiber_map_document(_document_from_legacy_panel_array(slots))
return None
def _load_fiber_map_or_exit():
doc = _load_fiber_map_document()
if doc is None:
print(
f"Missing {_fiber_map_path()} (or legacy {_panel_map_path()}).\n"
" Copy fiber_map.example.json → fiber_map.json and set fiber_ports "
'(e.g. "5": {"hub": 1, "port": 2, "ssh": "pi@192.168.1.39"}). '
"Use ssh / remote / host+user when Acroname hubs are reached via SSH.",
file=sys.stderr,
flush=True,
)
sys.exit(1)
return doc
def _fiber_sort_key(key):
s = str(key).strip()
if s.isdigit():
return (0, int(s))
return (1, s)
def _fiber_entry_hub_port(entry):
tup = _parse_panel_map_entry(entry)
return tup
def _fiber_ssh_target(entry):
"""
Optional SSH destination for a fiber_ports entry (hubs live on another host).
- "ssh": "user@host" or "user@ip"
- or "remote": same
- or "host": "ip" with optional "user" (default root)
"""
if not isinstance(entry, dict):
return None
s = entry.get("ssh") or entry.get("remote")
if isinstance(s, str) and s.strip():
return s.strip()
host = entry.get("host")
if isinstance(host, str) and host.strip():
user = entry.get("user")
u = user.strip() if isinstance(user, str) and user.strip() else "root"
return f"{u}@{host.strip()}"
return None
_REMOTE_SSH_ENV_KEYS = frozenset(
{
"HUB_MANAGER_REMOTE_PYTHON",
"HUB_MANAGER_REMOTE_SCRIPT",
"HUB_MANAGER_SSH_BIN",
"HUB_MANAGER_SSH_OPTS",
}
)
def _apply_remote_ssh_env_file():
"""
Load remote_ssh.env (or .hub_manager_remote) from this scripts directory.
Uses os.environ.setdefault so real environment variables still win.
Put this file on the machine where you run `hub_manager.py --ssh` (e.g. Fedora), not on the Pi.
Values are paths as seen on the SSH *destination* (the Pi).
"""
base = os.path.dirname(os.path.abspath(__file__))
for fname in ("remote_ssh.env", ".hub_manager_remote"):
path = os.path.join(base, fname)
if not os.path.isfile(path):
continue
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, val = line.partition("=")
key, val = key.strip(), val.strip()
if len(val) >= 2 and val[0] == val[-1] and val[0] in "'\"":
val = val[1:-1]
if key in _REMOTE_SSH_ENV_KEYS:
os.environ.setdefault(key, val)
except OSError:
continue
break
def _ssh_forward(remote_host, remote_args):
"""
Run hub_manager on a remote machine (e.g. Raspberry Pi with USB hubs attached).
Does not import brainstem locally — only needs OpenSSH client.
Optional file next to this script (on the PC where you run --ssh): remote_ssh.env
See remote_ssh.env.example. Env vars still override the file.
Env (paths are as seen *on the remote* after SSH login):
HUB_MANAGER_REMOTE_PYTHON default: python3 (use …/env/bin/python3 if brainstem is in a venv)
HUB_MANAGER_REMOTE_SCRIPT default: /usr/local/bin/hub_manager.py
HUB_MANAGER_SSH_BIN default: ssh
HUB_MANAGER_SSH_OPTS extra ssh args, shell-quoted string, e.g. -o BatchMode=yes
"""
_apply_remote_ssh_env_file()
py = os.environ.get("HUB_MANAGER_REMOTE_PYTHON", "python3")
script = os.environ.get("HUB_MANAGER_REMOTE_SCRIPT", "/usr/local/bin/hub_manager.py")
ssh_bin = os.environ.get("HUB_MANAGER_SSH_BIN", "ssh")
extra = shlex.split(os.environ.get("HUB_MANAGER_SSH_OPTS", ""))
# Calibrate must have a PTY on the remote or input() gets EOF and each port toggles off instantly.
if (
len(remote_args) >= 2
and remote_args[0].lower() == "panel"
and remote_args[1].lower() == "calibrate"
and not any(x in ("-t", "-tt") for x in extra)
):
extra = ["-t", *extra]
cmd = [ssh_bin, *extra, remote_host, py, script, *remote_args]
print(f"hub_manager: ssh {remote_host}{py} {script} {' '.join(remote_args)}", file=sys.stderr, flush=True)
proc = subprocess.run(cmd, stdin=sys.stdin)
return proc.returncode if proc.returncode is not None else 1
def _ssh_forward_capture(remote_host, remote_args, timeout=90):
"""Run hub_manager on remote; return (exit_code, stdout, stderr). No TTY."""
_apply_remote_ssh_env_file()
py = os.environ.get("HUB_MANAGER_REMOTE_PYTHON", "python3")
script = os.environ.get("HUB_MANAGER_REMOTE_SCRIPT", "/usr/local/bin/hub_manager.py")
ssh_bin = os.environ.get("HUB_MANAGER_SSH_BIN", "ssh")
extra = shlex.split(os.environ.get("HUB_MANAGER_SSH_OPTS", ""))
cmd = [ssh_bin, *extra, remote_host, py, script, *remote_args]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
return 124, "", "ssh/hub_manager timed out"
return (
proc.returncode if proc.returncode is not None else 1,
proc.stdout or "",
proc.stderr or "",
)
def _parse_panel_calibrate_argv(args):
"""
panel calibrate [merge] [N] [--ssh user@host] ...
Returns (merge, limit, calibrate_ssh_hosts).
"""
merge = False
limit = None
hosts = []
i = 0
while i < len(args):
a = args[i]
low = a.lower()
if low == "merge":
merge = True
i += 1
continue
if low == "--ssh":
if i + 1 >= len(args):
print("panel calibrate: --ssh requires user@host", file=sys.stderr, flush=True)
sys.exit(2)
hosts.append(args[i + 1].strip())
i += 2
continue
if a.isdigit():
limit = int(a)
i += 1
continue
print(f"panel calibrate: unknown argument {a!r}", file=sys.stderr, flush=True)
sys.exit(2)
return merge, limit, hosts
def _parse_discover_stdout_for_calibrate_ports(stdout):
"""
Parse `discover` table lines: '1 | 0x........ | 8' → (1,0)…(1,7).
Skips header and pre-table noise.
"""
pairs = []
for line in stdout.splitlines():
raw = line.rstrip()
if "|" not in raw:
continue
if set(raw.strip()) <= {"-", " "}:
continue
parts = [p.strip() for p in raw.split("|")]
if len(parts) < 3:
continue
hub_s, _, n_s = parts[0], parts[1], parts[2]
if not hub_s.isdigit():
continue
try:
hub = int(hub_s)
nports = int(n_s)
except ValueError:
continue
if hub < 1 or nports < 1:
continue
for p in range(nports):
pairs.append((hub, p))
return pairs
def _fetch_calibrate_ports_json(ssh_host):
"""Ask remote hub_manager for [[hub,port], ...]; fall back to parsing `discover` if remote script is older."""
code, out, err = _ssh_forward_capture(ssh_host, ["calibrate-ports-json"], timeout=90)
out = out or ""
err = err or ""
json_pairs = None
if code == 0 and out.strip():
try:
data = json.loads(out.strip())
except json.JSONDecodeError:
json_pairs = None
else:
if isinstance(data, list):
json_pairs = []
for item in data:
if isinstance(item, (list, tuple)) and len(item) == 2:
try:
json_pairs.append((int(item[0]), int(item[1])))
except (TypeError, ValueError):
continue
else:
json_pairs = None
if json_pairs is not None:
return json_pairs
if "Unknown command" in (out + err):
print(
f"Remote {ssh_host!r} has no calibrate-ports-json; using discover fallback "
"(scp this hub_manager.py to the Pi to skip the extra SSH round trip).",
file=sys.stderr,
flush=True,
)
code2, out2, err2 = _ssh_forward_capture(ssh_host, ["discover"], timeout=120)
out2 = out2 or ""
if code2 != 0:
print(
f"discover on {ssh_host!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
file=sys.stderr,
flush=True,
)
if code != 0 or (out + err).strip():
print(
f"calibrate-ports-json on {ssh_host!r} (exit {code}): {(err or out).strip()[:400]}",
file=sys.stderr,
flush=True,
)
return []
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
if not pairs:
print(
f"Could not parse hub/port list from discover on {ssh_host!r}. "
"Ensure the Pi connects to its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
file=sys.stderr,
flush=True,
)
return []
print(
f"Using discover output for {ssh_host!r} ({len(pairs)} hub.port step(s)).",
file=sys.stderr,
flush=True,
)
return pairs
def _remote_hub_port_power(ssh_host, hub_1, port_0, enable):
sub = "on" if enable else "off"
code, _, err = _ssh_forward_capture(ssh_host, [sub, f"{hub_1}.{port_0}"])
return code, err
def _remote_port_power_feedback(ssh_host, hub_1, port_0):
code, out, err = _ssh_forward_capture(ssh_host, ["status", f"{hub_1}.{port_0}"])
if code != 0:
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
pwr, cur = _parse_status_line_for_hub_port(out, hub_1, port_0)
return f"remote hub reports {pwr}, {cur} mA"
def _parse_status_line_for_hub_port(stdout, hub_1, port_0):
"""Parse `status hub.port` table output for one port; return (power, current_str) or (?, ?)."""
for line in stdout.splitlines():
ln = line.strip()
if not ln or ln.startswith("-") or "Identity" in ln or "Hub" not in ln:
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) < 4:
continue
hub_part = parts[0].replace("Hub", "", 1).strip()
try:
h = int(hub_part)
p = int(parts[1])
except ValueError:
continue
if h == hub_1 and p == port_0:
return parts[2], parts[3]
return "?", "?"
def _dispatch_fiber_mapped_ssh_if_needed(argv):
"""
If the fiber map says this port is on another host (ssh / host+user), forward over SSH
without importing brainstem locally. Returns exit code, or None to continue normally.
"""
_apply_remote_ssh_env_file()
try:
doc = _load_fiber_map_document()
except (OSError, json.JSONDecodeError, ValueError):
return None
if doc is None:
return None
if (
len(argv) >= 5
and argv[0].lower() == "power"
and argv[2].lower() == "fiber-port"
and argv[4].lower() in ("on", "off")
):
try:
fid = int(argv[3])
except ValueError:
return None
entry = doc["fiber_ports"].get(str(fid))
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
return _ssh_forward(ssh, ["power", "fiber-port", str(fid), argv[4].lower()])
if len(argv) >= 3 and argv[0].lower() == "fiber" and argv[1].lower() == "chip":
try:
fid = int(argv[2])
except ValueError:
return None
entry = doc["fiber_ports"].get(str(fid))
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
extra = ["save"] if len(argv) >= 4 and argv[3].lower() == "save" else []
return _ssh_forward(ssh, ["fiber", "chip", str(fid), *extra])
if len(argv) >= 3 and argv[0].lower() == "panel":
sub = argv[1].lower()
if sub in ("on", "off", "reboot", "reboot-force"):
try:
pn = int(argv[2])
except ValueError:
return None
if pn < 1 or pn > PANEL_SLOTS:
return None
entry = doc["fiber_ports"].get(str(pn))
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
return _ssh_forward(ssh, ["panel", sub, str(pn)])
return None
def _lsusb_lines():
lsusb_bin = shutil.which("lsusb")
if not lsusb_bin:
return []
try:
proc = subprocess.run(
[lsusb_bin], capture_output=True, text=True, timeout=15
)
except (OSError, subprocess.TimeoutExpired):
return []
if proc.returncode != 0 or not proc.stdout:
return []
return proc.stdout.splitlines()
def _lsusb_new_devices(before_lines, after_lines):
"""Lines present in after but not before, excluding Acroname hub vendor lines."""
before = set(before_lines)
out = []
for ln in after_lines:
if ln in before:
continue
if "24ff:" in ln.lower():
continue
out.append(ln)
return out
def _chip_fields_from_lsusb_lines(lines):
"""Build fiber_map.json fields from full lsusb lines (see fiber chip … save)."""
if not lines:
return {}
ids = []
descs = []
for line in lines:
m = re.search(r"ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s*(.*)", line, re.I)
if m:
ids.append(f"{m.group(1)}:{m.group(2)}".lower())
descs.append(m.group(3).strip())
out = {
"usb_lsusb_lines": list(lines),
"chip_profiled_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if ids:
out["usb_id"] = ids[0]
out["chip_type"] = descs[0] if descs[0] else ids[0]
if len(ids) > 1:
out["usb_ids"] = ids
return out
def _stored_chip_preview(entry, width=26):
"""Short label for status tables from saved probe metadata."""
if not isinstance(entry, dict):
return ""
for k in ("chip_type", "usb_description", "usb_id"):
v = entry.get(k)
if isinstance(v, str) and v.strip():
s = v.strip().replace("\n", " ")
if len(s) > width:
return s[: width - 2] + ".."
return s
return ""
def _read_panel_map_file(path):
"""Load and normalize panel map JSON to a list of length PANEL_SLOTS."""
with open(path, encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("panel map must be a JSON array")
slots = [_parse_panel_map_entry(x) for x in data]
while len(slots) < PANEL_SLOTS:
slots.append(None)
return slots[:PANEL_SLOTS]
def _parse_panel_map_entry(entry):
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
if entry is None:
return None
if isinstance(entry, str):
entry = entry.strip()
if not entry or entry.lower() == "null":
return None
try:
h_s, p_s = entry.split(".", 1)
return int(h_s), int(p_s)
except ValueError:
return None
if isinstance(entry, dict):
try:
return int(entry["hub"]), int(entry["port"])
except (KeyError, TypeError, ValueError):
return None
return None
def _lsusb_acroname_lines():
try:
lsusb_bin = shutil.which("lsusb")
if not lsusb_bin:
return []
out = subprocess.run(
[lsusb_bin],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0 or not out.stdout:
return []
return [ln for ln in out.stdout.splitlines() if "24ff:" in ln.lower() or " acroname" in ln.lower()]
except (OSError, subprocess.TimeoutExpired):
return []
class AcronameManager:
def __init__(self):
_load_brainstem()
self.hubs = []
self.SUCCESS = brainstem.result.Result.NO_ERROR
def _enumerate_usb_specs(self):
"""Return sorted link specs from BrainStem USB discovery, or [] on failure."""
try:
specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
except AttributeError:
from brainstem.discovery import Discovery
specs = Discovery.findAll(brainstem.link.Spec.USB)
except Exception as exc:
print(f"BrainStem findAllModules(USB) raised: {exc}", file=sys.stderr, flush=True)
return []
if not specs:
return []
specs = list(specs)
specs.sort(key=lambda x: x.serial_number)
return specs
@staticmethod
def _hub_stem_classes_for_spec(spec):
"""
Pick stem class for hardware. USBHub2x4 vs USBHub3p must match the device or
connectFromSpec fails (HubTool handles this; raw USBHub3p() does not).
"""
_load_brainstem()
model = getattr(spec, "model", None)
defs = getattr(brainstem, "defs", None)
preferred = []
if defs is not None and model is not None:
for mid, cls in (
(getattr(defs, "MODEL_USBHUB_2X4", None), brainstem.stem.USBHub2x4),
(getattr(defs, "MODEL_USBHUB_3P", None), brainstem.stem.USBHub3p),
(getattr(defs, "MODEL_USBHUB_3C", None), brainstem.stem.USBHub3c),
):
if mid is not None and model == mid:
preferred.append(cls)
for cls in (brainstem.stem.USBHub2x4, brainstem.stem.USBHub3p, brainstem.stem.USBHub3c):
if cls not in preferred:
preferred.append(cls)
return preferred
def _connect_from_spec(self, spec):
for cls in self._hub_stem_classes_for_spec(spec):
stem = cls()
res = stem.connectFromSpec(spec)
if res == self.SUCCESS:
return stem
try:
stem.disconnect()
except Exception:
pass
return None
def _connect_specs(self, specs):
"""Append successfully opened stems to self.hubs; return False if none opened."""
for spec in specs:
stem = self._connect_from_spec(spec)
if stem is not None:
self.hubs.append(stem)
if self.hubs:
return True
if specs:
print(
"Error: findAllModules(USB) reported device(s) but connectFromSpec failed for all.",
flush=True,
)
print(
" Use the correct stem type (USBHub2x4 for 4-port, USBHub3p for USBHub3+), "
"udev permissions (vendor 24ff), and BrainStem library version.",
flush=True,
)
else:
print("Error: No Acroname hubs found (findAllModules(USB) returned nothing).", flush=True)
acr = _lsusb_acroname_lines()
if acr:
print(" lsusb does see Acroname (kernel enumerates the device), e.g.:", flush=True)
for ln in acr[:8]:
print(f" {ln}", flush=True)
print(
" BrainStem still needs userland access: udev rules for vendor 24ff (MODE=0666 or "
"GROUP=plugdev + your user in that group), and the same libBrainStem version HubTool uses.",
flush=True,
)
return False
def connect(self):
"""Finds all hubs and sorts by serial number for consistent Hub 1/2 naming."""
return self._connect_specs(self._enumerate_usb_specs())
def _parse_target(self, target_str):
if target_str.lower() == 'all':
return 'all', 'all'
try:
h_num, p_idx = target_str.split('.')
h_idx = 'all' if h_num.lower() == 'all' else int(h_num) - 1
p_idx = 'all' if p_idx.lower() == 'all' else int(p_idx)
return h_idx, p_idx
except ValueError:
print(f"Format Error: Use '1.4' for Hub 1 Port 4, or 'all'")
sys.exit(1)
def _port_count(self, stem):
"""
Count of downstream ports you power/cycle (Type-A sides on 2x4 / 3p).
Prefer stem-class NUMBER_OF_DOWNSTREAM_USB over cmdPORT quantity: the latter
matches NUMBER_OF_PORTS and includes extra PORT entities (e.g. 6 vs 4 on USBHub2x4).
"""
cls = type(stem)
nd = getattr(cls, "NUMBER_OF_DOWNSTREAM_USB", None)
if isinstance(nd, int) and nd > 0:
return nd
nu = getattr(cls, "NUMBER_OF_USB_PORTS", None)
if isinstance(nu, int) and nu > 0:
return nu
if _BS_C is not None:
try:
res = stem.classQuantity(_BS_C.cmdPORT)
if res.error == self.SUCCESS and getattr(res, "value", 0) > 0:
return int(res.value)
except (AttributeError, TypeError, ValueError):
pass
return 8
def _ports_for_hub(self, stem, hub_display_num, p_target):
"""Resolve port indices for this stem; return None if an explicit port is out of range."""
n = self._port_count(stem)
if p_target == "all":
return list(range(n))
if p_target < 0 or p_target >= n:
print(
f"Error: Hub {hub_display_num} has {n} port(s) (indices 0{n - 1}); "
f"port {p_target} is invalid."
)
return None
return [p_target]
def discover(self):
"""USB module discovery only: serial and cmdPORT entity count per hub. No port power reads or changes."""
print("Scanning USB for Acroname modules (BrainStem discover)...", flush=True)
specs = self._enumerate_usb_specs()
if not specs:
print(
"findAllModules(USB): no link specs. If HubTool sees hubs, check udev (vendor 24ff), "
"group permissions, and that the BrainStem Python package matches HubTools library.",
flush=True,
)
acr = _lsusb_acroname_lines()
if acr:
print(" lsusb:", flush=True)
for ln in acr[:8]:
print(f" {ln}", flush=True)
return
print(f"findAllModules: {len(specs)} USB link spec(s)", flush=True)
for spec in specs:
print(
f" serial=0x{spec.serial_number:08X} model={getattr(spec, 'model', '?')} "
f"module={getattr(spec, 'module', '?')}",
flush=True,
)
if not self._connect_specs(specs):
return
print(f"{'Hub':<6} | {'Serial':<12} | {'Ports'}")
print("-" * 28)
for i, stem in enumerate(self.hubs):
sn_res = stem.system.getSerialNumber()
sn = f"0x{sn_res.value:08X}" if sn_res.error == self.SUCCESS else "?"
n = self._port_count(stem)
print(f"{i + 1:<6} | {sn:<12} | {n}")
def _sample_inrush(self, stem, port, sample_duration=0.3):
"""Captures peak current and duration for the reboot report."""
samples = []
start_time = time.time()
while (time.time() - start_time) < sample_duration:
curr = stem.usb.getPortCurrent(port).value / 1000.0
samples.append((time.time() - start_time, curr))
peak_current = max(s[1] for s in samples) if samples else 0.0
duration = 0
if peak_current > 15.0:
for t, c in samples:
if c >= peak_current * 0.9:
duration = t
return {"peak": peak_current, "duration": duration * 1000}
def status(self, target_str="all"):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
print(f"{'Identity':<8} | {'Port':<5} | {'Power':<7} | {'Current (mA)':<12}")
print("-" * 55)
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = self._ports_for_hub(stem, i + 1, p_target)
if ports is None:
return
for port in ports:
pwr_val = stem.usb.getPortState(port).value
pwr_str = "ON" if (pwr_val & 1) else "OFF"
raw_curr = stem.usb.getPortCurrent(port).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(f"Hub {i+1:<3} | {port:<5} | {pwr_str:<7} | {current:<12.2f}")
async def _async_reboot_port(self, h_idx, stem, port, delay, stagger, skip_empty):
"""Reboots a port and captures inrush vs steady state data."""
current_ma = stem.usb.getPortCurrent(port).value / 1000.0
if skip_empty and current_ma < 15.0:
return {"hub": h_idx+1, "port": port, "status": "Skipped", "peak": 0.0, "duration": 0.0, "steady": 0.0}
stem.usb.setPortDisable(port)
await asyncio.sleep(delay)
if stagger > 0:
await asyncio.sleep(stagger * port)
stem.usb.setPortEnable(port)
# Capture the immediate spike
inrush_stats = self._sample_inrush(stem, port)
# Increased to 2 seconds for radio stabilization
await asyncio.sleep(2.0)
steady_ma = stem.usb.getPortCurrent(port).value / 1000.0
return {
"hub": h_idx+1, "port": port, "status": "Rebooted",
"peak": inrush_stats['peak'], "duration": inrush_stats['duration'],
"steady": steady_ma if steady_ma > 15.0 else 0.0
}
def reboot(self, target_str, delay=2, stagger=0.2, skip_empty=True):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
tasks = []
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = self._ports_for_hub(stem, i + 1, p_target)
if ports is None:
return
for port in ports:
tasks.append(self._async_reboot_port(i, stem, port, delay, stagger, skip_empty))
if tasks:
print(f"{'Identity':<8} | {'Port':<5} | {'Action':<10} | {'Peak(mA)':<10} | {'Steady(mA)':<12} | {'Settle(ms)':<10}")
print("-" * 75)
results = asyncio.run(self._run_tasks(tasks))
for r in sorted(results, key=lambda x: (x['hub'], x['port'])):
print(f"Hub {r['hub']:<3} | {r['port']:<5} | {r['status']:<10} | {r['peak']:<10.2f} | {r['steady']:<12.2f} | {r['duration']:<10.2f}")
async def _run_tasks(self, tasks):
return await asyncio.gather(*tasks)
def power(self, mode, target_str):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = self._ports_for_hub(stem, i + 1, p_target)
if ports is None:
return
for port in ports:
if mode.lower() == 'on': stem.usb.setPortEnable(port)
else: stem.usb.setPortDisable(port)
self.status(target_str)
def setup_udev(self):
if not self.hubs and not self.connect(): return
rule_path = "/etc/udev/rules.d/99-acroname.rules"
lines = ['# Acroname Hub Permissions\nSUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666", GROUP="plugdev"']
for i, stem in enumerate(self.hubs):
res = stem.system.getSerialNumber()
if res.error == self.SUCCESS:
sn = f"{res.value:08X}"
lines.append(f'SUBSYSTEM=="usb", ATTR{{idVendor}}=="24ff", ATTR{{serial}}=="{sn}", SYMLINK+="acroname_hub{i+1}"')
with open("99-acroname.rules", "w") as f: f.write("\n".join(lines))
print(f"udev rules generated. Install with: sudo mv 99-acroname.rules {rule_path}")
def verify(self):
for i in range(1, 3):
link = f"/dev/acroname_hub{i}"
if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}")
else: print(f"[ERROR] {link} not found.")
def panel_status(self):
"""Rack positions 124: mapping and power via local hubs or per-entry ssh."""
doc = _load_fiber_map_or_exit()
ports = doc["fiber_ports"]
need_local = any(
_fiber_entry_hub_port(ports.get(str(n))) is not None
and _fiber_ssh_target(ports.get(str(n)) if isinstance(ports.get(str(n)), dict) else None)
is None
for n in range(1, PANEL_SLOTS + 1)
)
if need_local and not self.hubs and not self.connect():
return
print(
f"{'Panel':<7} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | {'Chip (saved)':<28}",
flush=True,
)
print("-" * 95)
for idx in range(PANEL_SLOTS):
panel_n = idx + 1
key = str(panel_n)
entry = ports.get(key)
tup = _fiber_entry_hub_port(entry) if entry is not None else None
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
chip_s = _stored_chip_preview(entry)
if tup is None:
print(f"{panel_n:<7} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28}", flush=True)
continue
hub_1, port_0 = tup
route = (ssh if ssh else "local")[:18]
if ssh:
code, out, err = _ssh_forward_capture(ssh, ["status", f"{hub_1}.{port_0}"])
if code != 0:
pwr, cur = "?", "?"
if err.strip():
route = f"{ssh} (err)"[:18]
else:
pwr, cur = _parse_status_line_for_hub_port(out, hub_1, port_0)
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | {chip_s:<28}",
flush=True,
)
continue
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | {chip_s:<28}",
flush=True,
)
continue
stem = self.hubs[h_idx]
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | {chip_s:<28}",
flush=True,
)
continue
pwr_val = stem.usb.getPortState(port_0).value
pwr_str = "ON" if (pwr_val & 1) else "OFF"
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | {chip_s:<28}",
flush=True,
)
def panel_power(self, mode, panel_1based):
if panel_1based < 1 or panel_1based > PANEL_SLOTS:
print(f"Panel port must be 1{PANEL_SLOTS}, got {panel_1based}", file=sys.stderr, flush=True)
sys.exit(1)
doc = _load_fiber_map_or_exit()
key = str(panel_1based)
entry = doc["fiber_ports"].get(key)
tup = _fiber_entry_hub_port(entry) if entry is not None else None
if tup is None:
print(
f"Panel {panel_1based} is not mapped (no fiber_ports[{key!r}] in fiber_map.json).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
sys.exit(_ssh_forward(ssh, ["panel", mode, str(panel_1based)]))
tgt = f"{tup[0]}.{tup[1]}"
print(f"Panel {panel_1based} → hub target {tgt} ({mode})", flush=True)
self.power(mode, tgt)
def fiber_power(self, mode, fiber_port):
"""Power via fiber_map.json fiber_ports key (any positive integer id)."""
doc = _load_fiber_map_or_exit()
key = str(int(fiber_port))
entry = doc["fiber_ports"].get(key)
tup = _fiber_entry_hub_port(entry) if entry is not None else None
if tup is None:
print(
f"Fiber port {fiber_port} is not mapped (missing fiber_ports[{key!r}] in fiber_map.json).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
sys.exit(_ssh_forward(ssh, ["power", "fiber-port", key, mode.lower()]))
tgt = f"{tup[0]}.{tup[1]}"
print(f"Fiber port {fiber_port} → hub target {tgt} ({mode})", flush=True)
self.power(mode, tgt)
def fiber_chip(self, fiber_port, save=False):
"""
Identify newly enumerated USB device(s) on this fibers hub port (lsusb diff).
If the port was off, turns it on briefly, snapshots, then restores previous power.
With save=True, merge usb_id / chip_type / usb_lsusb_lines into fiber_map.json when new lines appear.
"""
doc = _load_fiber_map_or_exit()
key = str(int(fiber_port))
entry = doc["fiber_ports"].get(key)
tup = _fiber_entry_hub_port(entry) if entry is not None else None
if tup is None:
print(
f"Fiber port {fiber_port} is not mapped (missing fiber_ports[{key!r}] in fiber_map.json).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
if ssh:
extra = ["save"] if save else []
sys.exit(_ssh_forward(ssh, ["fiber", "chip", key, *extra]))
if not shutil.which("lsusb"):
print(
"lsusb not found in PATH; install usbutils (e.g. usbutils package) on this host.",
file=sys.stderr,
flush=True,
)
sys.exit(1)
if not self.hubs and not self.connect():
return
hub_1, port_0 = tup
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(f"No hub {hub_1} connected.", file=sys.stderr, flush=True)
sys.exit(1)
stem = self.hubs[h_idx]
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(f"Hub {hub_1} has no USB port index {port_0}.", file=sys.stderr, flush=True)
sys.exit(1)
before = _lsusb_lines()
st = stem.usb.getPortState(port_0)
if st.error != self.SUCCESS:
print(f"getPortState error: {st.error}", file=sys.stderr, flush=True)
sys.exit(1)
was_on = (st.value & 1) != 0
if not was_on:
self._set_hub_port_power(hub_1, port_0, True)
time.sleep(2.0)
after = _lsusb_lines()
new_devs = _lsusb_new_devices(before, after)
print(
f"Fiber port {fiber_port} → hub {hub_1} USB port {port_0} — USB identity (lsusb, new vs baseline):",
flush=True,
)
if new_devs:
for ln in new_devs:
print(f" {ln}", flush=True)
else:
print(
" (No new lsusb lines vs snapshot before this step.) Device may already have been plugged in.",
flush=True,
)
if not was_on:
self._set_hub_port_power(hub_1, port_0, False)
print(" Restored hub port power: OFF.", flush=True)
if save:
if new_devs:
doc2 = _load_fiber_map_or_exit()
ent = doc2["fiber_ports"].get(key)
base = dict(ent) if isinstance(ent, dict) else {}
for k in (
"usb_lsusb_lines",
"usb_id",
"usb_ids",
"chip_type",
"chip_profiled_at",
):
base.pop(k, None)
base.update(_chip_fields_from_lsusb_lines(new_devs))
doc2["fiber_ports"][key] = base
self._write_fiber_map_document(doc2)
print(" Saved chip metadata to fiber_map.json (usb_id, chip_type, usb_lsusb_lines).", flush=True)
else:
print(
" save: no new lsusb lines — not updating fiber_map.json (avoids wiping a good profile).",
flush=True,
)
def fiber_map_status(self):
"""All fiber_ports entries with hub.port and live power (local BrainStem or ssh status)."""
doc = _load_fiber_map_or_exit()
ports = doc["fiber_ports"]
keys = sorted(ports.keys(), key=_fiber_sort_key)
print(
f"{'Fiber':<8} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | {'Chip (saved)':<28}",
flush=True,
)
print("-" * 95)
need_local = any(
_fiber_entry_hub_port(ports[k]) is not None
and not _fiber_ssh_target(ports[k] if isinstance(ports[k], dict) else None)
for k in keys
)
if need_local and not self.hubs and not self.connect():
return
for key in keys:
entry = ports[key]
tup = _fiber_entry_hub_port(entry)
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
chip_s = _stored_chip_preview(entry)
if tup is None:
print(f"{key!s:<8} | {'':<10} | {'':<18} | {'':<5} | {'':<8} | {chip_s:<28}", flush=True)
continue
hub_1, port_0 = tup
route = (ssh if ssh else "local")[:18]
if ssh:
code, out, err = _ssh_forward_capture(ssh, ["status", f"{hub_1}.{port_0}"])
if code != 0:
pwr, cur = "?", "?"
if err.strip():
route = f"{ssh} (err)"[:18]
else:
pwr, cur = _parse_status_line_for_hub_port(out, hub_1, port_0)
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | {chip_s:<28}",
flush=True,
)
continue
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | {chip_s:<28}",
flush=True,
)
continue
stem = self.hubs[h_idx]
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | {chip_s:<28}",
flush=True,
)
continue
pwr_val = stem.usb.getPortState(port_0).value
pwr_str = "ON" if (pwr_val & 1) else "OFF"
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | {chip_s:<28}",
flush=True,
)
def panel_reboot(self, panel_1based, skip_empty=True):
if panel_1based < 1 or panel_1based > PANEL_SLOTS:
print(f"Panel port must be 1{PANEL_SLOTS}, got {panel_1based}", file=sys.stderr, flush=True)
sys.exit(1)
doc = _load_fiber_map_or_exit()
key = str(panel_1based)
entry = doc["fiber_ports"].get(key)
tup = _fiber_entry_hub_port(entry) if entry is not None else None
if tup is None:
print(
f"Panel {panel_1based} is not mapped (no fiber_ports[{key!r}] in fiber_map.json).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
ssh = _fiber_ssh_target(entry) if isinstance(entry, dict) else None
sub = "reboot" if skip_empty else "reboot-force"
if ssh:
sys.exit(_ssh_forward(ssh, ["panel", sub, str(panel_1based)]))
tgt = f"{tup[0]}.{tup[1]}"
print(f"Panel {panel_1based} → hub target {tgt} ({sub})", flush=True)
self.reboot(tgt, skip_empty=skip_empty)
def _ordered_downstream_ports(self):
"""Hub 1 port 0, 1, … then hub 2 … (BrainStem hub order after connect)."""
out = []
for i, stem in enumerate(self.hubs):
for port in range(self._port_count(stem)):
out.append((i + 1, port))
return out
def _set_hub_port_power(self, hub_1, port_0, enable):
h_idx = hub_1 - 1
stem = self.hubs[h_idx]
if enable:
stem.usb.setPortEnable(port_0)
time.sleep(0.25)
else:
stem.usb.setPortDisable(port_0)
def _port_power_feedback(self, hub_1, port_0):
"""Return short status string after a change (hub power state + optional current)."""
h_idx = hub_1 - 1
stem = self.hubs[h_idx]
st = stem.usb.getPortState(port_0)
if st.error != self.SUCCESS:
return f"getPortState error {st.error}"
on = (st.value & 1) != 0
cur = stem.usb.getPortCurrent(port_0)
ma = cur.value / 1000.0 if cur.error == self.SUCCESS else None
bits = f"hub reports {'ON' if on else 'OFF'}"
if ma is not None and abs(ma) > 15.0:
bits += f", ~{ma:.0f} mA"
return bits
def _write_fiber_map_document(self, doc):
path = _fiber_map_path()
out = _ensure_fiber_map_document(doc)
with open(path, "w", encoding="utf-8") as f:
json.dump(out, f, indent=2)
f.write("\n")
print(f"Wrote {path}", flush=True)
def panel_calibrate(self, merge=False, limit=None, calibrate_ssh_hosts=None):
"""
Walk downstream USB ports in hub order: local hubs first, then each --ssh / calibrate_remotes host.
You type the fiber port id for each step; writes one fiber_map.json (adds ssh on remote steps).
"""
calibrate_ssh_hosts = list(calibrate_ssh_hosts or [])
try:
existing = _load_fiber_map_document()
except (OSError, json.JSONDecodeError, ValueError) as exc:
print(f"Could not load existing map: {exc}", file=sys.stderr, flush=True)
return
if merge and existing is not None:
doc = copy.deepcopy(existing)
elif existing is not None and isinstance(existing, dict):
doc = {k: copy.deepcopy(v) for k, v in existing.items() if k != "fiber_ports"}
doc["fiber_ports"] = {}
else:
doc = {"fiber_ports": {}}
doc = _ensure_fiber_map_document(doc)
seen_h = set()
cli_hosts = []
for h in calibrate_ssh_hosts:
s = str(h).strip()
if s and s not in seen_h:
seen_h.add(s)
cli_hosts.append(s)
cr = doc.get("calibrate_remotes")
if isinstance(cr, list):
for x in cr:
s = str(x).strip()
if s and s not in seen_h:
seen_h.add(s)
cli_hosts.append(s)
local_ok = bool(self.hubs) or self.connect()
local_ordered = self._ordered_downstream_ports() if local_ok else []
steps = []
for hub_1, port_0 in local_ordered:
steps.append((None, hub_1, port_0))
for host in cli_hosts:
remote_pairs = _fetch_calibrate_ports_json(host)
for hub_1, port_0 in remote_pairs:
steps.append((host, hub_1, port_0))
if limit is not None:
steps = steps[: max(0, limit)]
if not steps:
print(
"Nothing to calibrate: no local Acroname hubs and no remote ports "
"(use --ssh user@host or calibrate_remotes in fiber_map.json).",
file=sys.stderr,
flush=True,
)
return
n_loc = sum(1 for s in steps if s[0] is None)
n_rem = len(steps) - n_loc
print(
f"Fiber map calibrate: {len(steps)} step(s) — {n_loc} local, {n_rem} via ssh.\n"
"Order: all local hub ports (hub 1 port 0 first), then each --ssh hosts ports in order.\n"
"Each step: port ON (~2s) — enter fiber port id, s=skip, q=quit.\n"
"Remote steps store ssh in fiber_map.json automatically.",
flush=True,
)
ports = doc["fiber_ports"]
for ssh_host, hub_1, port_0 in steps:
route = "local" if ssh_host is None else ssh_host
print(f"\n>>> ON {route} hub {hub_1} USB port {port_0}", flush=True)
try:
if ssh_host is None:
self._set_hub_port_power(hub_1, port_0, True)
print(f" {self._port_power_feedback(hub_1, port_0)}", flush=True)
else:
rc, rerr = _remote_hub_port_power(ssh_host, hub_1, port_0, True)
if rc != 0:
print(f" remote on failed ({rc}): {rerr.strip()[:200]}", flush=True)
else:
time.sleep(0.25)
print(f" {_remote_port_power_feedback(ssh_host, hub_1, port_0)}", flush=True)
time.sleep(2.0)
try:
line = input("Which fiber port id? [s=skip q=quit]: ").strip()
except EOFError:
print(
"\n*** No input (stdin closed). Use a TTY, e.g. ssh -t … panel calibrate … ***\n",
flush=True,
)
line = "q"
finally:
if ssh_host is None:
self._set_hub_port_power(hub_1, port_0, False)
print(f">>> OFF local hub {hub_1} USB port {port_0} ({self._port_power_feedback(hub_1, port_0)})", flush=True)
else:
_remote_hub_port_power(ssh_host, hub_1, port_0, False)
print(
f">>> OFF {ssh_host} hub {hub_1} USB port {port_0}",
flush=True,
)
low = line.lower()
if low == "q":
break
if low == "s" or not line:
continue
try:
pn = int(line)
except ValueError:
print(" Ignored (not an integer).", flush=True)
continue
if pn < 1:
print(" Ignored (fiber port id must be >= 1).", flush=True)
continue
key = str(pn)
prev = _fiber_entry_hub_port(ports.get(key))
prev_ssh = _fiber_ssh_target(ports.get(key)) if isinstance(ports.get(key), dict) else None
if prev is not None and (prev != (hub_1, port_0) or (ssh_host or None) != (prev_ssh or None)):
ps = f" ssh={prev_ssh!r}" if prev_ssh else ""
print(f" Note: fiber port {pn} was {prev[0]}.{prev[1]}{ps}; now {hub_1}.{port_0} @ {route}", flush=True)
base = ports.get(key)
if isinstance(base, dict):
entry = dict(base)
else:
entry = {}
entry["hub"] = hub_1
entry["port"] = port_0
if ssh_host:
entry["ssh"] = ssh_host
for k in ("remote", "host", "user"):
entry.pop(k, None)
else:
entry.pop("ssh", None)
entry.pop("remote", None)
entry.pop("host", None)
entry.pop("user", None)
ports[key] = entry
self._write_fiber_map_document(doc)
def disconnect(self):
for stem in self.hubs: stem.disconnect()
if __name__ == "__main__":
argv = sys.argv[1:]
if len(argv) >= 2 and argv[0] in ("--ssh", "--remote"):
remote_host = argv[1]
rest = argv[2:]
if not rest:
print(
"Usage: hub_manager.py --ssh user@host <command> [args...]\n"
" Example: hub_manager.py --ssh pi@192.168.1.39 discover\n"
" If brainstem is in a Pi venv: copy remote_ssh.env.example → remote_ssh.env next to\n"
" this script on the PC where you run --ssh (paths in the file are on the Pi).\n"
" Or export HUB_MANAGER_REMOTE_PYTHON / HUB_MANAGER_REMOTE_SCRIPT.\n"
" On the Pi: pip install -r requirements.txt in that venv; udev 24ff.",
file=sys.stderr,
flush=True,
)
sys.exit(2)
sys.exit(_ssh_forward(remote_host, rest))
rc_ssh_map = _dispatch_fiber_mapped_ssh_if_needed(argv)
if rc_ssh_map is not None:
sys.exit(rc_ssh_map)
# If this never appears but the prompt returns, you are not executing this file.
# If stderr shows this but nothing else, `import brainstem` may be aborting the process.
os.write(2, b"hub_manager: start\n")
try:
_load_brainstem()
except Exception as exc:
print(f"hub_manager: failed to import brainstem: {exc}", file=sys.stderr, flush=True)
if isinstance(exc, ImportError):
print(
" If this text came from `hub_manager.py --ssh …`: the remote used system python3 by default.\n"
" On your PC export HUB_MANAGER_REMOTE_PYTHON to the Pi venvs python and\n"
" HUB_MANAGER_REMOTE_SCRIPT to that hub_manager.py (absolute paths on the Pi).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
mgr = AcronameManager()
try:
cmd = sys.argv[1].lower() if len(sys.argv) > 1 else "status"
target = sys.argv[2] if len(sys.argv) > 2 else "all"
if cmd == "status": mgr.status(target)
elif cmd == "calibrate-ports-json":
if not mgr.hubs and not mgr.connect():
print("[]", flush=True)
else:
pairs = mgr._ordered_downstream_ports()
print(json.dumps([[h, p] for h, p in pairs]), flush=True)
elif cmd == "discover": mgr.discover()
elif cmd == "power":
if len(sys.argv) < 5 or sys.argv[2].lower() != "fiber-port":
print(
"Usage: hub_manager.py power fiber-port <fiber_port_id> on|off\n"
" Uses fiber_map.json; per-entry ssh / host+user forwards to that host (see help).",
file=sys.stderr,
flush=True,
)
sys.exit(2)
try:
fp_n = int(sys.argv[3])
except ValueError:
print("fiber_port_id must be an integer.", file=sys.stderr, flush=True)
sys.exit(2)
mode = sys.argv[4].lower()
if mode not in ("on", "off"):
print("Last argument must be on or off.", file=sys.stderr, flush=True)
sys.exit(2)
mgr.fiber_power(mode, fp_n)
elif cmd == "fiber":
if len(sys.argv) < 3:
print(
"Usage: hub_manager.py fiber status\n"
" hub_manager.py fiber chip <fiber_port_id> [save]\n"
" status — hub.port, Route, power, and saved chip preview from fiber_map.json\n"
" chip — lsusb diff on that USB port; add save to store usb_id / chip_type in the map",
file=sys.stderr,
flush=True,
)
sys.exit(2)
sub = sys.argv[2].lower()
if sub == "status":
mgr.fiber_map_status()
elif sub == "chip":
if len(sys.argv) < 4:
print(
"Usage: hub_manager.py fiber chip <fiber_port_id> [save]",
file=sys.stderr,
flush=True,
)
sys.exit(2)
try:
chip_fp = int(sys.argv[3])
except ValueError:
print("fiber_port_id must be an integer.", file=sys.stderr, flush=True)
sys.exit(2)
save_chip = len(sys.argv) >= 5 and sys.argv[4].lower() == "save"
mgr.fiber_chip(chip_fp, save=save_chip)
else:
print(f"Unknown fiber subcommand: {sub!r}", file=sys.stderr, flush=True)
sys.exit(2)
elif cmd == "panel":
if len(sys.argv) < 3:
print(
"Usage: hub_manager.py panel status\n"
" hub_manager.py panel on|off <panel_port>\n"
" hub_manager.py panel reboot|reboot-force <panel_port>\n"
" hub_manager.py panel calibrate [merge] [<N>] [--ssh user@host] …\n"
" calibrate: local hub ports first, then each --ssh host (and calibrate_remotes in JSON).\n"
" merge / N as before; remote steps set \"ssh\" on new fiber_ports entries.\n"
" <panel_port> is 124; use power fiber-port for arbitrary ids.\n"
" Preset: fiber_map.rpi20.json → fiber_map.json for 8+8+4 → fiber ports 120.",
file=sys.stderr,
flush=True,
)
sys.exit(2)
sub = sys.argv[2].lower()
if sub == "status":
mgr.panel_status()
elif sub == "calibrate":
args = sys.argv[3:]
merge, limit, cal_hosts = _parse_panel_calibrate_argv(args)
mgr.panel_calibrate(merge=merge, limit=limit, calibrate_ssh_hosts=cal_hosts)
elif sub in ("on", "off"):
if len(sys.argv) < 4:
print(f"Usage: hub_manager.py panel {sub} <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_power(sub, int(sys.argv[3]))
elif sub == "reboot":
if len(sys.argv) < 4:
print("Usage: hub_manager.py panel reboot <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_reboot(int(sys.argv[3]), skip_empty=True)
elif sub == "reboot-force":
if len(sys.argv) < 4:
print("Usage: hub_manager.py panel reboot-force <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_reboot(int(sys.argv[3]), skip_empty=False)
else:
print(f"Unknown panel subcommand: {sub!r}", file=sys.stderr, flush=True)
sys.exit(2)
elif cmd in ["on", "off"]: mgr.power(cmd, target)
elif cmd in ["reboot", "reboot-force"]:
mgr.reboot(target, skip_empty=(cmd == "reboot"))
elif cmd == "setup": mgr.setup_udev()
elif cmd == "verify": mgr.verify()
elif cmd in ("help", "-h", "--help"):
print(
"Usage: hub_manager.py <command> [target]\n"
" discover — list hubs (serial, port count); no port I/O\n"
" status [target] — default command; target like all, 1.3, all.2\n"
" fiber status — fiber_ports + power (local or per-entry ssh / host+user)\n"
" fiber chip <id> [save] — lsusb probe; save stores usb_id / chip_type in fiber_map.json\n"
" power fiber-port <id> on|off — power by fiber key (ssh forward if map says so)\n"
" panel status — rack positions 124 (fiber ids 124 in fiber_map.json)\n"
" panel calibrate [merge] [N] [--ssh user@host]… — hybrid local + ssh hubs → one fiber_map.json\n"
" panel on|off|reboot|reboot-force <n>\n"
" on|off [target] reboot|reboot-force [target] setup verify\n"
"\n"
"Remote (hubs on another host — no local brainstem needed):\n"
" hub_manager.py --ssh user@host discover\n"
" remote_ssh.env next to hub_manager.py (see remote_ssh.env.example) or env vars:\n"
" HUB_MANAGER_REMOTE_PYTHON remote interpreter (default python3)\n"
" HUB_MANAGER_REMOTE_SCRIPT remote script path (default /usr/local/bin/hub_manager.py)\n"
" HUB_MANAGER_SSH_OPTS e.g. '-o BatchMode=yes'\n"
" Pi: pip install -r requirements.txt in the venv you point REMOTE_PYTHON at; udev 24ff.\n"
"\n"
"fiber_map.json fiber_ports entries may set ssh routing (hubs on another machine):\n"
' "ssh": "user@host" or "remote": "" or "host": "ip", "user": "pi"\n'
" On the SSH destination, the same fiber id should be local (omit ssh) so commands are not re-forwarded.\n"
"\n"
"Hybrid calibrate: put {\"calibrate_remotes\": [\"pi@ip\"]} in fiber_map.json or pass --ssh per host;\n"
" order is all local downstream ports, then each remotes ports (see calibrate-ports-json on the Pi)."
)
else:
print(f"Unknown command: {cmd!r}", file=sys.stderr, flush=True)
print(
"Try: --ssh user@host … | discover | calibrate-ports-json | status | fiber | power | panel | … | help",
file=sys.stderr,
flush=True,
)
sys.exit(2)
finally:
mgr.disconnect()