Add hub discovery, panel map, SSH remote helper, and Pi deps.

Support USBHub2x4 and USBHub3p discovery with correct downstream port counts,
patch-panel mapping via panel_map.json, lazy brainstem import and diagnostics,
SSH forwarding with optional remote_ssh.env defaults, and requirements.txt
for brainstem. Ignore local panel_map.json, remote_ssh.env, and __pycache__.

Made-with: Cursor
This commit is contained in:
Robert McMahon 2026-03-25 22:44:56 -07:00
parent 4b9010de87
commit 0d90434ee4
5 changed files with 285 additions and 2 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
env/
__pycache__/
hub_manager.py~
arc.txt
panel_map.json
remote_ssh.env
.hub_manager_remote

View File

@ -3,9 +3,13 @@ import sys
import time
import asyncio
import os
import json
import shlex
import subprocess
import shutil
PANEL_SLOTS = 24
# Loaded on first use so we can print diagnostics before touching native BrainStem code.
brainstem = None
_BS_C = None
@ -26,6 +30,97 @@ def _load_brainstem():
return brainstem
def _panel_map_path():
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "panel_map.json")
_REMOTE_SSH_ENV_KEYS = frozenset(
{
"HUB_MANAGER_REMOTE_PYTHON",
"HUB_MANAGER_REMOTE_SCRIPT",
"HUB_MANAGER_SSH_BIN",
"HUB_MANAGER_SSH_OPTS",
}
)
def _apply_remote_ssh_env_file():
"""
Load remote_ssh.env (or .hub_manager_remote) from this scripts directory.
Uses os.environ.setdefault so real environment variables still win.
Put this file on the machine where you run `hub_manager.py --ssh` (e.g. Fedora), not on the Pi.
Values are paths as seen on the SSH *destination* (the Pi).
"""
base = os.path.dirname(os.path.abspath(__file__))
for fname in ("remote_ssh.env", ".hub_manager_remote"):
path = os.path.join(base, fname)
if not os.path.isfile(path):
continue
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, val = line.partition("=")
key, val = key.strip(), val.strip()
if len(val) >= 2 and val[0] == val[-1] and val[0] in "'\"":
val = val[1:-1]
if key in _REMOTE_SSH_ENV_KEYS:
os.environ.setdefault(key, val)
except OSError:
continue
break
def _ssh_forward(remote_host, remote_args):
"""
Run hub_manager on a remote machine (e.g. Raspberry Pi with USB hubs attached).
Does not import brainstem locally only needs OpenSSH client.
Optional file next to this script (on the PC where you run --ssh): remote_ssh.env
See remote_ssh.env.example. Env vars still override the file.
Env (paths are as seen *on the remote* after SSH login):
HUB_MANAGER_REMOTE_PYTHON default: python3 (use /env/bin/python3 if brainstem is in a venv)
HUB_MANAGER_REMOTE_SCRIPT default: /usr/local/bin/hub_manager.py
HUB_MANAGER_SSH_BIN default: ssh
HUB_MANAGER_SSH_OPTS extra ssh args, shell-quoted string, e.g. -o BatchMode=yes
"""
_apply_remote_ssh_env_file()
py = os.environ.get("HUB_MANAGER_REMOTE_PYTHON", "python3")
script = os.environ.get("HUB_MANAGER_REMOTE_SCRIPT", "/usr/local/bin/hub_manager.py")
ssh_bin = os.environ.get("HUB_MANAGER_SSH_BIN", "ssh")
extra = shlex.split(os.environ.get("HUB_MANAGER_SSH_OPTS", ""))
cmd = [ssh_bin, *extra, remote_host, py, script, *remote_args]
print(f"hub_manager: ssh {remote_host}{py} {script} {' '.join(remote_args)}", file=sys.stderr, flush=True)
proc = subprocess.run(cmd)
return proc.returncode if proc.returncode is not None else 1
def _parse_panel_map_entry(entry):
"""Return (hub_1based, port_0based) or None if unmapped / invalid."""
if entry is None:
return None
if isinstance(entry, str):
entry = entry.strip()
if not entry or entry.lower() == "null":
return None
try:
h_s, p_s = entry.split(".", 1)
return int(h_s), int(p_s)
except ValueError:
return None
if isinstance(entry, dict):
try:
return int(entry["hub"]), int(entry["port"])
except (KeyError, TypeError, ValueError):
return None
return None
def _lsusb_acroname_lines():
try:
lsusb_bin = shutil.which("lsusb")
@ -331,10 +426,108 @@ class AcronameManager:
if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}")
else: print(f"[ERROR] {link} not found.")
def _load_panel_map(self):
"""24 slots: index 0 = patch panel port 1. Each entry: (hub_1based, port_0based) or None."""
path = _panel_map_path()
if not os.path.isfile(path):
print(
f"Missing {path}. Copy panel_map.example.json to panel_map.json and set each slot to "
f'{{"hub": 1, "port": 0}} or "1.0" (hub 1-based, port 0-based), or null if unused.',
file=sys.stderr,
flush=True,
)
sys.exit(1)
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"Cannot read panel map {path}: {exc}", file=sys.stderr, flush=True)
sys.exit(1)
if not isinstance(data, list):
print(f"panel_map.json must be a JSON array of {PANEL_SLOTS} entries.", file=sys.stderr, flush=True)
sys.exit(1)
slots = [_parse_panel_map_entry(x) for x in data]
while len(slots) < PANEL_SLOTS:
slots.append(None)
if len(slots) > PANEL_SLOTS:
slots = slots[:PANEL_SLOTS]
return slots
def _panel_target_for(self, panel_1based):
if panel_1based < 1 or panel_1based > PANEL_SLOTS:
print(f"Patch panel port must be 1{PANEL_SLOTS}, got {panel_1based}", file=sys.stderr, flush=True)
sys.exit(1)
slots = self._load_panel_map()
mapped = slots[panel_1based - 1]
if mapped is None:
print(f"Patch panel port {panel_1based} is not mapped (null in panel_map.json).", file=sys.stderr, flush=True)
sys.exit(1)
hub_1, port_0 = mapped
return f"{hub_1}.{port_0}"
def panel_status(self):
"""Show all 24 patch panel positions, mapping, and power state when connected."""
slots = self._load_panel_map()
if not self.hubs and not self.connect():
return
print(f"{'Panel':<7} | {'Hub.Port':<10} | {'Power':<7} | {'Current (mA)':<12}")
print("-" * 50)
for idx in range(PANEL_SLOTS):
panel_n = idx + 1
mapped = slots[idx]
if mapped is None:
print(f"{panel_n:<7} | {'':<10} | {'':<7} | {'':<12}")
continue
hub_1, port_0 = mapped
h_idx = hub_1 - 1
if h_idx < 0 or h_idx >= len(self.hubs):
print(f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'?':<7} | {'no hub':<12}")
continue
stem = self.hubs[h_idx]
nports = self._port_count(stem)
if port_0 < 0 or port_0 >= nports:
print(f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'?':<7} | {'bad map':<12}")
continue
port = port_0
pwr_val = stem.usb.getPortState(port).value
pwr_str = "ON" if (pwr_val & 1) else "OFF"
raw_curr = stem.usb.getPortCurrent(port).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(f"{panel_n:<7} | {hub_1}.{port_0:<9} | {pwr_str:<7} | {current:<12.2f}")
def panel_power(self, mode, panel_1based):
tgt = self._panel_target_for(panel_1based)
print(f"Patch panel {panel_1based} → hub target {tgt} ({mode})", flush=True)
self.power(mode, tgt)
def panel_reboot(self, panel_1based, skip_empty=True):
tgt = self._panel_target_for(panel_1based)
cmd = "reboot" if skip_empty else "reboot-force"
print(f"Patch panel {panel_1based} → hub target {tgt} ({cmd})", flush=True)
self.reboot(tgt, skip_empty=skip_empty)
def disconnect(self):
for stem in self.hubs: stem.disconnect()
if __name__ == "__main__":
argv = sys.argv[1:]
if len(argv) >= 2 and argv[0] in ("--ssh", "--remote"):
remote_host = argv[1]
rest = argv[2:]
if not rest:
print(
"Usage: hub_manager.py --ssh user@host <command> [args...]\n"
" Example: hub_manager.py --ssh pi@192.168.1.39 discover\n"
" If brainstem is in a Pi venv: copy remote_ssh.env.example → remote_ssh.env next to\n"
" this script on the PC where you run --ssh (paths in the file are on the Pi).\n"
" Or export HUB_MANAGER_REMOTE_PYTHON / HUB_MANAGER_REMOTE_SCRIPT.\n"
" On the Pi: pip install -r requirements.txt in that venv; udev 24ff.",
file=sys.stderr,
flush=True,
)
sys.exit(2)
sys.exit(_ssh_forward(remote_host, rest))
# If this never appears but the prompt returns, you are not executing this file.
# If stderr shows this but nothing else, `import brainstem` may be aborting the process.
os.write(2, b"hub_manager: start\n")
@ -342,6 +535,14 @@ if __name__ == "__main__":
_load_brainstem()
except Exception as exc:
print(f"hub_manager: failed to import brainstem: {exc}", file=sys.stderr, flush=True)
if isinstance(exc, ImportError):
print(
" If this text came from `hub_manager.py --ssh …`: the remote used system python3 by default.\n"
" On your PC export HUB_MANAGER_REMOTE_PYTHON to the Pi venvs python and\n"
" HUB_MANAGER_REMOTE_SCRIPT to that hub_manager.py (absolute paths on the Pi).",
file=sys.stderr,
flush=True,
)
sys.exit(1)
mgr = AcronameManager()
try:
@ -349,6 +550,38 @@ if __name__ == "__main__":
target = sys.argv[2] if len(sys.argv) > 2 else "all"
if cmd == "status": mgr.status(target)
elif cmd == "discover": mgr.discover()
elif cmd == "panel":
if len(sys.argv) < 3:
print(
"Usage: hub_manager.py panel status\n"
" hub_manager.py panel on|off <panel_port>\n"
" hub_manager.py panel reboot|reboot-force <panel_port>\n"
" <panel_port> is 124; map in panel_map.json (copy from panel_map.example.json).",
file=sys.stderr,
flush=True,
)
sys.exit(2)
sub = sys.argv[2].lower()
if sub == "status":
mgr.panel_status()
elif sub in ("on", "off"):
if len(sys.argv) < 4:
print(f"Usage: hub_manager.py panel {sub} <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_power(sub, int(sys.argv[3]))
elif sub == "reboot":
if len(sys.argv) < 4:
print("Usage: hub_manager.py panel reboot <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_reboot(int(sys.argv[3]), skip_empty=True)
elif sub == "reboot-force":
if len(sys.argv) < 4:
print("Usage: hub_manager.py panel reboot-force <1-24>", file=sys.stderr, flush=True)
sys.exit(2)
mgr.panel_reboot(int(sys.argv[3]), skip_empty=False)
else:
print(f"Unknown panel subcommand: {sub!r}", file=sys.stderr, flush=True)
sys.exit(2)
elif cmd in ["on", "off"]: mgr.power(cmd, target)
elif cmd in ["reboot", "reboot-force"]:
mgr.reboot(target, skip_empty=(cmd == "reboot"))
@ -359,12 +592,22 @@ if __name__ == "__main__":
"Usage: hub_manager.py <command> [target]\n"
" discover — list hubs (serial, port count); no port I/O\n"
" status [target] — default command; target like all, 1.3, all.2\n"
" on|off [target] reboot|reboot-force [target] setup verify"
" panel status — patch panel 124 (see panel_map.json)\n"
" panel on|off|reboot|reboot-force <n>\n"
" on|off [target] reboot|reboot-force [target] setup verify\n"
"\n"
"Remote (hubs on another host — no local brainstem needed):\n"
" hub_manager.py --ssh user@host discover\n"
" remote_ssh.env next to hub_manager.py (see remote_ssh.env.example) or env vars:\n"
" HUB_MANAGER_REMOTE_PYTHON remote interpreter (default python3)\n"
" HUB_MANAGER_REMOTE_SCRIPT remote script path (default /usr/local/bin/hub_manager.py)\n"
" HUB_MANAGER_SSH_OPTS e.g. '-o BatchMode=yes'\n"
" Pi: pip install -r requirements.txt in the venv you point REMOTE_PYTHON at; udev 24ff."
)
else:
print(f"Unknown command: {cmd!r}", file=sys.stderr, flush=True)
print(
"Try: discover | status | on | off | reboot | reboot-force | setup | verify | help",
"Try: --ssh user@host … | discover | status | panel | on | off | reboot | … | help",
file=sys.stderr,
flush=True,
)

26
panel_map.example.json Normal file
View File

@ -0,0 +1,26 @@
[
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
]

6
remote_ssh.env.example Normal file
View File

@ -0,0 +1,6 @@
# Copy to remote_ssh.env (same folder as hub_manager.py on the machine where you run --ssh, e.g. Fedora).
# Paths below are on the SSH *destination* (Raspberry Pi), not your laptop.
# Environment variables override these lines if both are set.
HUB_MANAGER_REMOTE_PYTHON=/home/rjmcmahon/Code/acroname/env/bin/python3
HUB_MANAGER_REMOTE_SCRIPT=/home/rjmcmahon/Code/acroname/hub_manager.py

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
# Acroname BrainStem Python API (install on the machine that has USB hubs attached).
# In a virtualenv: pip install -r requirements.txt (do not use --user)
# System Python without a venv: pip install --user -r requirements.txt
brainstem