1043 lines
45 KiB
Python
1043 lines
45 KiB
Python
"""Acroname BrainStem hub manager: connect, power, panel/fiber map, calibrate."""
|
||
|
||
import asyncio
|
||
import copy
|
||
import json
|
||
import os
|
||
import shutil
|
||
import sys
|
||
import time
|
||
|
||
import hubmgr.brainstem_loader as stemmod
|
||
from hubmgr.constants import PANEL_SLOTS
|
||
from hubmgr.paths import fiber_map_path
|
||
from hubmgr import fiber_map_io as fm
|
||
from hubmgr import remote_ssh as rs
|
||
from hubmgr import usb_probe as usb
|
||
|
||
|
||
class AcronameManager:
|
||
def __init__(self):
|
||
stemmod.load_brainstem()
|
||
self.hubs = []
|
||
self.SUCCESS = stemmod.brainstem.result.Result.NO_ERROR
|
||
|
||
def _enumerate_usb_specs(self):
|
||
"""Return sorted link specs from BrainStem USB discovery, or [] on failure."""
|
||
try:
|
||
specs = stemmod.brainstem.discover.findAllModules(stemmod.brainstem.link.Spec.USB)
|
||
except AttributeError:
|
||
from brainstem.discovery import Discovery
|
||
specs = Discovery.findAll(stemmod.brainstem.link.Spec.USB)
|
||
except Exception as exc:
|
||
print(f"BrainStem findAllModules(USB) raised: {exc}", file=sys.stderr, flush=True)
|
||
return []
|
||
if not specs:
|
||
return []
|
||
specs = list(specs)
|
||
specs.sort(key=lambda x: x.serial_number)
|
||
return specs
|
||
|
||
@staticmethod
|
||
def _hub_stem_classes_for_spec(spec, alternate=False):
|
||
"""
|
||
Pick stem class for hardware. USBHub2x4 vs USBHub3p must match the device or
|
||
connectFromSpec fails (HubTool handles this; raw USBHub3p() does not).
|
||
If alternate=True, try 3p → 3c → 2x4 (helps when model hints order wrong on some hosts).
|
||
"""
|
||
stemmod.load_brainstem()
|
||
if alternate:
|
||
return [
|
||
stemmod.brainstem.stem.USBHub3p,
|
||
stemmod.brainstem.stem.USBHub3c,
|
||
stemmod.brainstem.stem.USBHub2x4,
|
||
]
|
||
model = getattr(spec, "model", None)
|
||
defs = getattr(stemmod.brainstem, "defs", None)
|
||
preferred = []
|
||
if defs is not None and model is not None:
|
||
for mid, cls in (
|
||
(getattr(defs, "MODEL_USBHUB_2X4", None), stemmod.brainstem.stem.USBHub2x4),
|
||
(getattr(defs, "MODEL_USBHUB_3P", None), stemmod.brainstem.stem.USBHub3p),
|
||
(getattr(defs, "MODEL_USBHUB_3C", None), stemmod.brainstem.stem.USBHub3c),
|
||
):
|
||
if mid is not None and model == mid:
|
||
preferred.append(cls)
|
||
for cls in (stemmod.brainstem.stem.USBHub2x4, stemmod.brainstem.stem.USBHub3p, stemmod.brainstem.stem.USBHub3c):
|
||
if cls not in preferred:
|
||
preferred.append(cls)
|
||
return preferred
|
||
|
||
def _connect_from_spec(self, spec, alternate=False):
|
||
for cls in self._hub_stem_classes_for_spec(spec, alternate=alternate):
|
||
stem = cls()
|
||
res = stem.connectFromSpec(spec)
|
||
if res == self.SUCCESS:
|
||
return stem
|
||
try:
|
||
stem.disconnect()
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def _connect_specs(self, specs, quiet=False):
|
||
"""Append successfully opened stems to self.hubs; return False if none opened."""
|
||
first_pass_ok = False
|
||
for spec in specs:
|
||
stem = self._connect_from_spec(spec, alternate=False)
|
||
if stem is not None:
|
||
self.hubs.append(stem)
|
||
first_pass_ok = bool(self.hubs)
|
||
|
||
if not first_pass_ok and specs:
|
||
time.sleep(0.45)
|
||
for spec in specs:
|
||
stem = self._connect_from_spec(spec, alternate=True)
|
||
if stem is not None:
|
||
self.hubs.append(stem)
|
||
|
||
if self.hubs and not first_pass_ok and not quiet:
|
||
print(
|
||
"hub_manager: local hub(s) opened after retry (alternate USBHub3p → USBHub3c → USBHub2x4 order).",
|
||
flush=True,
|
||
)
|
||
|
||
if self.hubs:
|
||
return True
|
||
|
||
if specs:
|
||
if quiet:
|
||
print(
|
||
"hub_manager: local USB shows Acroname module(s) but BrainStem connectFromSpec failed "
|
||
"(udev 24ff / stem type / library); continuing without local hubs.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
else:
|
||
print(
|
||
"Error: findAllModules(USB) reported device(s) but connectFromSpec failed for all.",
|
||
flush=True,
|
||
)
|
||
print(
|
||
" Use the correct stem type (USBHub2x4 for 4-port, USBHub3p for USBHub3+), "
|
||
"udev permissions (vendor 24ff), and BrainStem library version.",
|
||
flush=True,
|
||
)
|
||
else:
|
||
if quiet:
|
||
print(
|
||
"hub_manager: no local Acroname hubs found; continuing (e.g. --ssh calibrate).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
else:
|
||
print("Error: No Acroname hubs found (findAllModules(USB) returned nothing).", flush=True)
|
||
acr = usb.lsusb_acroname_lines()
|
||
if acr:
|
||
print(" lsusb does see Acroname (kernel enumerates the device), e.g.:", flush=True)
|
||
for ln in acr[:8]:
|
||
print(f" {ln}", flush=True)
|
||
print(
|
||
" BrainStem still needs userland access: udev rules for vendor 24ff (MODE=0666 or "
|
||
"GROUP=plugdev + your user in that group), and the same libBrainStem version HubTool uses.",
|
||
flush=True,
|
||
)
|
||
return False
|
||
|
||
def connect(self, quiet=False):
|
||
"""Finds all hubs and sorts by serial number for consistent Hub 1/2 naming."""
|
||
return self._connect_specs(self._enumerate_usb_specs(), quiet=quiet)
|
||
|
||
def _parse_target(self, target_str):
|
||
if target_str.lower() == 'all':
|
||
return 'all', 'all'
|
||
try:
|
||
h_num, p_idx = target_str.split('.')
|
||
h_idx = 'all' if h_num.lower() == 'all' else int(h_num) - 1
|
||
p_idx = 'all' if p_idx.lower() == 'all' else int(p_idx)
|
||
return h_idx, p_idx
|
||
except ValueError:
|
||
print(f"Format Error: Use '1.4' for Hub 1 Port 4, or 'all'")
|
||
sys.exit(1)
|
||
|
||
def _port_count(self, stem):
|
||
"""
|
||
Count of downstream ports you power/cycle (Type-A sides on 2x4 / 3p).
|
||
Prefer stem-class NUMBER_OF_DOWNSTREAM_USB over cmdPORT quantity: the latter
|
||
matches NUMBER_OF_PORTS and includes extra PORT entities (e.g. 6 vs 4 on USBHub2x4).
|
||
"""
|
||
cls = type(stem)
|
||
nd = getattr(cls, "NUMBER_OF_DOWNSTREAM_USB", None)
|
||
if isinstance(nd, int) and nd > 0:
|
||
return nd
|
||
nu = getattr(cls, "NUMBER_OF_USB_PORTS", None)
|
||
if isinstance(nu, int) and nu > 0:
|
||
return nu
|
||
if stemmod._BS_C is not None:
|
||
try:
|
||
res = stem.classQuantity(stemmod._BS_C.cmdPORT)
|
||
if res.error == self.SUCCESS and getattr(res, "value", 0) > 0:
|
||
return int(res.value)
|
||
except (AttributeError, TypeError, ValueError):
|
||
pass
|
||
return 8
|
||
|
||
def _ports_for_hub(self, stem, hub_display_num, p_target):
|
||
"""Resolve port indices for this stem; return None if an explicit port is out of range."""
|
||
n = self._port_count(stem)
|
||
if p_target == "all":
|
||
return list(range(n))
|
||
if p_target < 0 or p_target >= n:
|
||
print(
|
||
f"Error: Hub {hub_display_num} has {n} port(s) (indices 0–{n - 1}); "
|
||
f"port {p_target} is invalid."
|
||
)
|
||
return None
|
||
return [p_target]
|
||
|
||
def discover(self):
|
||
"""USB module discovery only: serial and cmdPORT entity count per hub. No port power reads or changes."""
|
||
print("Scanning USB for Acroname modules (BrainStem discover)...", flush=True)
|
||
specs = self._enumerate_usb_specs()
|
||
if not specs:
|
||
print(
|
||
"findAllModules(USB): no link specs. If HubTool sees hubs, check udev (vendor 24ff), "
|
||
"group permissions, and that the BrainStem Python package matches HubTool’s library.",
|
||
flush=True,
|
||
)
|
||
acr = usb.lsusb_acroname_lines()
|
||
if acr:
|
||
print(" lsusb:", flush=True)
|
||
for ln in acr[:8]:
|
||
print(f" {ln}", flush=True)
|
||
return
|
||
print(f"findAllModules: {len(specs)} USB link spec(s)", flush=True)
|
||
for spec in specs:
|
||
print(
|
||
f" serial=0x{spec.serial_number:08X} model={getattr(spec, 'model', '?')} "
|
||
f"module={getattr(spec, 'module', '?')}",
|
||
flush=True,
|
||
)
|
||
if not self._connect_specs(specs):
|
||
return
|
||
print(f"{'Hub':<6} | {'Serial':<12} | {'Ports'}")
|
||
print("-" * 28)
|
||
for i, stem in enumerate(self.hubs):
|
||
sn_res = stem.system.getSerialNumber()
|
||
sn = f"0x{sn_res.value:08X}" if sn_res.error == self.SUCCESS else "?"
|
||
n = self._port_count(stem)
|
||
print(f"{i + 1:<6} | {sn:<12} | {n}")
|
||
|
||
def _sample_inrush(self, stem, port, sample_duration=0.3):
|
||
"""Captures peak current and duration for the reboot report."""
|
||
samples = []
|
||
start_time = time.time()
|
||
while (time.time() - start_time) < sample_duration:
|
||
curr = stem.usb.getPortCurrent(port).value / 1000.0
|
||
samples.append((time.time() - start_time, curr))
|
||
|
||
peak_current = max(s[1] for s in samples) if samples else 0.0
|
||
duration = 0
|
||
if peak_current > 15.0:
|
||
for t, c in samples:
|
||
if c >= peak_current * 0.9:
|
||
duration = t
|
||
return {"peak": peak_current, "duration": duration * 1000}
|
||
|
||
def status(self, target_str="all"):
|
||
if not self.hubs and not self.connect(): return
|
||
h_target, p_target = self._parse_target(target_str)
|
||
print(f"{'Identity':<8} | {'Port':<5} | {'Power':<7} | {'Current (mA)':<12}")
|
||
print("-" * 55)
|
||
for i, stem in enumerate(self.hubs):
|
||
if h_target != 'all' and i != h_target: continue
|
||
ports = self._ports_for_hub(stem, i + 1, p_target)
|
||
if ports is None:
|
||
return
|
||
for port in ports:
|
||
pwr_val = stem.usb.getPortState(port).value
|
||
pwr_str = "ON" if (pwr_val & 1) else "OFF"
|
||
raw_curr = stem.usb.getPortCurrent(port).value / 1000.0
|
||
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
|
||
print(f"Hub {i+1:<3} | {port:<5} | {pwr_str:<7} | {current:<12.2f}")
|
||
|
||
async def _async_reboot_port(self, h_idx, stem, port, delay, stagger, skip_empty):
|
||
"""Reboots a port and captures inrush vs steady state data."""
|
||
current_ma = stem.usb.getPortCurrent(port).value / 1000.0
|
||
if skip_empty and current_ma < 15.0:
|
||
return {"hub": h_idx+1, "port": port, "status": "Skipped", "peak": 0.0, "duration": 0.0, "steady": 0.0}
|
||
|
||
stem.usb.setPortDisable(port)
|
||
await asyncio.sleep(delay)
|
||
if stagger > 0:
|
||
await asyncio.sleep(stagger * port)
|
||
|
||
stem.usb.setPortEnable(port)
|
||
# Capture the immediate spike
|
||
inrush_stats = self._sample_inrush(stem, port)
|
||
|
||
# Increased to 2 seconds for radio stabilization
|
||
await asyncio.sleep(2.0)
|
||
steady_ma = stem.usb.getPortCurrent(port).value / 1000.0
|
||
|
||
return {
|
||
"hub": h_idx+1, "port": port, "status": "Rebooted",
|
||
"peak": inrush_stats['peak'], "duration": inrush_stats['duration'],
|
||
"steady": steady_ma if steady_ma > 15.0 else 0.0
|
||
}
|
||
|
||
def reboot(self, target_str, delay=2, stagger=0.2, skip_empty=True):
|
||
if not self.hubs and not self.connect(): return
|
||
h_target, p_target = self._parse_target(target_str)
|
||
|
||
tasks = []
|
||
for i, stem in enumerate(self.hubs):
|
||
if h_target != 'all' and i != h_target: continue
|
||
ports = self._ports_for_hub(stem, i + 1, p_target)
|
||
if ports is None:
|
||
return
|
||
for port in ports:
|
||
tasks.append(self._async_reboot_port(i, stem, port, delay, stagger, skip_empty))
|
||
|
||
if tasks:
|
||
print(f"{'Identity':<8} | {'Port':<5} | {'Action':<10} | {'Peak(mA)':<10} | {'Steady(mA)':<12} | {'Settle(ms)':<10}")
|
||
print("-" * 75)
|
||
results = asyncio.run(self._run_tasks(tasks))
|
||
for r in sorted(results, key=lambda x: (x['hub'], x['port'])):
|
||
print(f"Hub {r['hub']:<3} | {r['port']:<5} | {r['status']:<10} | {r['peak']:<10.2f} | {r['steady']:<12.2f} | {r['duration']:<10.2f}")
|
||
|
||
async def _run_tasks(self, tasks):
|
||
return await asyncio.gather(*tasks)
|
||
|
||
def power(self, mode, target_str):
|
||
if not self.hubs and not self.connect():
|
||
print(
|
||
"Error: No Acroname hubs connected (cannot change port power).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return False
|
||
h_target, p_target = self._parse_target(target_str)
|
||
for i, stem in enumerate(self.hubs):
|
||
if h_target != 'all' and i != h_target:
|
||
continue
|
||
ports = self._ports_for_hub(stem, i + 1, p_target)
|
||
if ports is None:
|
||
return False
|
||
for port in ports:
|
||
if mode.lower() == "on":
|
||
stem.usb.setPortEnable(port)
|
||
else:
|
||
stem.usb.setPortDisable(port)
|
||
self.status(target_str)
|
||
return True
|
||
|
||
def setup_udev(self):
|
||
if not self.hubs and not self.connect(): return
|
||
rule_path = "/etc/udev/rules.d/99-acroname.rules"
|
||
lines = ['# Acroname Hub Permissions\nSUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666", GROUP="plugdev"']
|
||
for i, stem in enumerate(self.hubs):
|
||
res = stem.system.getSerialNumber()
|
||
if res.error == self.SUCCESS:
|
||
sn = f"{res.value:08X}"
|
||
lines.append(f'SUBSYSTEM=="usb", ATTR{{idVendor}}=="24ff", ATTR{{serial}}=="{sn}", SYMLINK+="acroname_hub{i+1}"')
|
||
with open("99-acroname.rules", "w") as f: f.write("\n".join(lines))
|
||
print(f"udev rules generated. Install with: sudo mv 99-acroname.rules {rule_path}")
|
||
|
||
def verify(self):
|
||
for i in range(1, 3):
|
||
link = f"/dev/acroname_hub{i}"
|
||
if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}")
|
||
else: print(f"[ERROR] {link} not found.")
|
||
|
||
def panel_status(self):
|
||
"""Rack positions 1–24: mapping and power via local hubs or per-entry ssh."""
|
||
doc = fm.load_fiber_map_or_exit()
|
||
ports = doc["fiber_ports"]
|
||
need_local = any(
|
||
fm.fiber_entry_hub_port(ports.get(str(n))) is not None
|
||
and fm.fiber_ssh_target(ports.get(str(n)) if isinstance(ports.get(str(n)), dict) else None)
|
||
is None
|
||
for n in range(1, PANEL_SLOTS + 1)
|
||
)
|
||
if need_local and not self.hubs and not self.connect():
|
||
return
|
||
print(
|
||
f"{'Panel':<7} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | "
|
||
f"{'Chip (saved)':<28} | {'PCIe (saved)':<22}",
|
||
flush=True,
|
||
)
|
||
print("-" * 120)
|
||
for idx in range(PANEL_SLOTS):
|
||
panel_n = idx + 1
|
||
key = str(panel_n)
|
||
entry = ports.get(key)
|
||
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
chip_s = fm.stored_chip_preview(entry)
|
||
pcie_s = fm.stored_pcie_preview(entry)
|
||
if tup is None:
|
||
print(
|
||
f"{panel_n:<7} | {'—':<10} | {'—':<18} | {'—':<5} | {'—':<8} | {chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
hub_1, port_0 = tup
|
||
route = (ssh if ssh else "local")[:18]
|
||
if ssh:
|
||
code, out, err = rs.ssh_forward_capture(ssh, ["status", f"{hub_1}.{port_0}"])
|
||
if code != 0:
|
||
pwr, cur = "?", "?"
|
||
if err.strip():
|
||
route = f"{ssh} (err)"[:18]
|
||
else:
|
||
pwr, cur = rs.parse_status_line_for_hub_port(out, hub_1, port_0)
|
||
print(
|
||
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
h_idx = hub_1 - 1
|
||
if h_idx < 0 or h_idx >= len(self.hubs):
|
||
print(
|
||
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
stem = self.hubs[h_idx]
|
||
nports = self._port_count(stem)
|
||
if port_0 < 0 or port_0 >= nports:
|
||
print(
|
||
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
pwr_val = stem.usb.getPortState(port_0).value
|
||
pwr_str = "ON" if (pwr_val & 1) else "OFF"
|
||
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
|
||
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
|
||
print(
|
||
f"{panel_n:<7} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
|
||
def panel_power(self, mode, panel_1based):
|
||
if panel_1based < 1 or panel_1based > PANEL_SLOTS:
|
||
print(f"Panel port must be 1–{PANEL_SLOTS}, got {panel_1based}", file=sys.stderr, flush=True)
|
||
sys.exit(1)
|
||
doc = fm.load_fiber_map_or_exit()
|
||
key = str(panel_1based)
|
||
entry = doc["fiber_ports"].get(key)
|
||
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
|
||
if tup is None:
|
||
print(
|
||
f"Panel {panel_1based} is not mapped (no fiber_ports[{key!r}] in fiber_map.json).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
if ssh:
|
||
sys.exit(rs.ssh_forward(ssh, ["panel", mode, str(panel_1based)]))
|
||
tgt = f"{tup[0]}.{tup[1]}"
|
||
print(f"Panel {panel_1based} → hub target {tgt} ({mode})", flush=True)
|
||
if not self.power(mode, tgt):
|
||
sys.exit(1)
|
||
|
||
def fiber_power(self, mode, fiber_port):
|
||
"""Power via fiber_map.json fiber_ports key (any positive integer id)."""
|
||
doc = fm.load_fiber_map_or_exit()
|
||
key = str(int(fiber_port))
|
||
entry = doc["fiber_ports"].get(key)
|
||
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
|
||
if tup is None:
|
||
print(
|
||
f"Fiber port {fiber_port} is not mapped (missing fiber_ports[{key!r}] in fiber_map.json).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
if ssh:
|
||
sys.exit(rs.ssh_forward(ssh, ["power", "fiber-port", key, mode.lower()]))
|
||
tgt = f"{tup[0]}.{tup[1]}"
|
||
print(f"Fiber port {fiber_port} → hub target {tgt} ({mode})", flush=True)
|
||
if not self.power(mode, tgt):
|
||
sys.exit(1)
|
||
|
||
def fiber_chip(self, fiber_port, save=False):
|
||
"""
|
||
Identify newly enumerated USB device(s) on this fiber’s hub port (lsusb diff).
|
||
If the port was off, turns it on briefly, snapshots, then restores previous power.
|
||
With save=True, merge usb_id / chip_type / usb_lsusb_lines into fiber_map.json when new lines appear.
|
||
"""
|
||
doc = fm.load_fiber_map_or_exit()
|
||
key = str(int(fiber_port))
|
||
entry = doc["fiber_ports"].get(key)
|
||
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
|
||
if tup is None:
|
||
print(
|
||
f"Fiber port {fiber_port} is not mapped (missing fiber_ports[{key!r}] in fiber_map.json).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
if ssh:
|
||
extra = ["save"] if save else []
|
||
sys.exit(rs.ssh_forward(ssh, ["fiber", "chip", key, *extra]))
|
||
if not shutil.which("lsusb"):
|
||
print(
|
||
"lsusb not found in PATH; install usbutils (e.g. usbutils package) on this host.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
if not self.hubs and not self.connect():
|
||
return
|
||
hub_1, port_0 = tup
|
||
h_idx = hub_1 - 1
|
||
if h_idx < 0 or h_idx >= len(self.hubs):
|
||
print(f"No hub {hub_1} connected.", file=sys.stderr, flush=True)
|
||
sys.exit(1)
|
||
stem = self.hubs[h_idx]
|
||
nports = self._port_count(stem)
|
||
if port_0 < 0 or port_0 >= nports:
|
||
print(f"Hub {hub_1} has no USB port index {port_0}.", file=sys.stderr, flush=True)
|
||
sys.exit(1)
|
||
before = usb.lsusb_lines()
|
||
st = stem.usb.getPortState(port_0)
|
||
if st.error != self.SUCCESS:
|
||
print(f"getPortState error: {st.error}", file=sys.stderr, flush=True)
|
||
sys.exit(1)
|
||
was_on = (st.value & 1) != 0
|
||
if not was_on:
|
||
self._set_hub_port_power(hub_1, port_0, True)
|
||
time.sleep(2.0)
|
||
after = usb.lsusb_lines()
|
||
new_devs = usb.lsusb_new_devices(before, after)
|
||
print(
|
||
f"Fiber port {fiber_port} → hub {hub_1} USB port {port_0} — USB identity (lsusb, new vs baseline):",
|
||
flush=True,
|
||
)
|
||
if new_devs:
|
||
for ln in new_devs:
|
||
print(f" {ln}", flush=True)
|
||
else:
|
||
print(
|
||
" (No new lsusb lines vs snapshot before this step.) Device may already have been plugged in.",
|
||
flush=True,
|
||
)
|
||
if not was_on:
|
||
self._set_hub_port_power(hub_1, port_0, False)
|
||
print(" Restored hub port power: OFF.", flush=True)
|
||
|
||
if save:
|
||
if new_devs:
|
||
doc2 = fm.load_fiber_map_or_exit()
|
||
ent = doc2["fiber_ports"].get(key)
|
||
base = dict(ent) if isinstance(ent, dict) else {}
|
||
for k in (
|
||
"usb_lsusb_lines",
|
||
"usb_id",
|
||
"usb_ids",
|
||
"chip_type",
|
||
"chip_profiled_at",
|
||
):
|
||
base.pop(k, None)
|
||
base.update(fm.chip_fields_from_lsusb_lines(new_devs))
|
||
doc2["fiber_ports"][key] = base
|
||
self._write_fiber_map_document(doc2)
|
||
print(" Saved chip metadata to fiber_map.json (usb_id, chip_type, usb_lsusb_lines).", flush=True)
|
||
else:
|
||
print(
|
||
" save: no new lsusb lines — not updating fiber_map.json (avoids wiping a good profile).",
|
||
flush=True,
|
||
)
|
||
|
||
def fiber_map_status(self):
|
||
"""All fiber_ports entries with hub.port and live power (local BrainStem or ssh status)."""
|
||
doc = fm.load_fiber_map_or_exit()
|
||
ports = doc["fiber_ports"]
|
||
keys = sorted(ports.keys(), key=fm.fiber_sort_key)
|
||
print(
|
||
f"{'Fiber':<8} | {'Hub.Port':<10} | {'Route':<18} | {'Pwr':<5} | {'mA':<8} | "
|
||
f"{'Chip (saved)':<28} | {'PCIe (saved)':<22}",
|
||
flush=True,
|
||
)
|
||
print("-" * 120)
|
||
need_local = any(
|
||
fm.fiber_entry_hub_port(ports[k]) is not None
|
||
and not fm.fiber_ssh_target(ports[k] if isinstance(ports[k], dict) else None)
|
||
for k in keys
|
||
)
|
||
if need_local and not self.hubs and not self.connect():
|
||
return
|
||
for key in keys:
|
||
entry = ports[key]
|
||
tup = fm.fiber_entry_hub_port(entry)
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
chip_s = fm.stored_chip_preview(entry)
|
||
pcie_s = fm.stored_pcie_preview(entry)
|
||
if tup is None:
|
||
print(
|
||
f"{key!s:<8} | {'—':<10} | {'—':<18} | {'—':<5} | {'—':<8} | {chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
hub_1, port_0 = tup
|
||
route = (ssh if ssh else "local")[:18]
|
||
if ssh:
|
||
code, out, err = rs.ssh_forward_capture(ssh, ["status", f"{hub_1}.{port_0}"])
|
||
if code != 0:
|
||
pwr, cur = "?", "?"
|
||
if err.strip():
|
||
route = f"{ssh} (err)"[:18]
|
||
else:
|
||
pwr, cur = rs.parse_status_line_for_hub_port(out, hub_1, port_0)
|
||
print(
|
||
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr:<5} | {cur:<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
h_idx = hub_1 - 1
|
||
if h_idx < 0 or h_idx >= len(self.hubs):
|
||
print(
|
||
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'no hub':<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
stem = self.hubs[h_idx]
|
||
nports = self._port_count(stem)
|
||
if port_0 < 0 or port_0 >= nports:
|
||
print(
|
||
f"{key!s:<8} | {hub_1}.{port_0:<9} | {'local':<18} | {'?':<5} | {'bad map':<8} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
continue
|
||
pwr_val = stem.usb.getPortState(port_0).value
|
||
pwr_str = "ON" if (pwr_val & 1) else "OFF"
|
||
raw_curr = stem.usb.getPortCurrent(port_0).value / 1000.0
|
||
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
|
||
print(
|
||
f"{key!s:<8} | {hub_1}.{port_0:<9} | {route:<18} | {pwr_str:<5} | {current:<8.2f} | "
|
||
f"{chip_s:<28} | {pcie_s:<22}",
|
||
flush=True,
|
||
)
|
||
|
||
def panel_reboot(self, panel_1based, skip_empty=True):
|
||
if panel_1based < 1 or panel_1based > PANEL_SLOTS:
|
||
print(f"Panel port must be 1–{PANEL_SLOTS}, got {panel_1based}", file=sys.stderr, flush=True)
|
||
sys.exit(1)
|
||
doc = fm.load_fiber_map_or_exit()
|
||
key = str(panel_1based)
|
||
entry = doc["fiber_ports"].get(key)
|
||
tup = fm.fiber_entry_hub_port(entry) if entry is not None else None
|
||
if tup is None:
|
||
print(
|
||
f"Panel {panel_1based} is not mapped (no fiber_ports[{key!r}] in fiber_map.json).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
sys.exit(1)
|
||
ssh = fm.fiber_ssh_target(entry) if isinstance(entry, dict) else None
|
||
sub = "reboot" if skip_empty else "reboot-force"
|
||
if ssh:
|
||
sys.exit(rs.ssh_forward(ssh, ["panel", sub, str(panel_1based)]))
|
||
tgt = f"{tup[0]}.{tup[1]}"
|
||
print(f"Panel {panel_1based} → hub target {tgt} ({sub})", flush=True)
|
||
self.reboot(tgt, skip_empty=skip_empty)
|
||
|
||
def _ordered_downstream_ports(self):
|
||
"""Hub 1 port 0, 1, … then hub 2 … (BrainStem hub order after connect)."""
|
||
out = []
|
||
for i, stem in enumerate(self.hubs):
|
||
for port in range(self._port_count(stem)):
|
||
out.append((i + 1, port))
|
||
return out
|
||
|
||
def _set_hub_port_power(self, hub_1, port_0, enable):
|
||
h_idx = hub_1 - 1
|
||
stem = self.hubs[h_idx]
|
||
if enable:
|
||
stem.usb.setPortEnable(port_0)
|
||
time.sleep(0.25)
|
||
else:
|
||
stem.usb.setPortDisable(port_0)
|
||
|
||
def _calibrate_step_power_off(self, ssh_host, hub_1, port_0):
|
||
"""Turn off downstream port at end of one calibrate step (after PCIe prompts if mapped)."""
|
||
if ssh_host is None:
|
||
self._set_hub_port_power(hub_1, port_0, False)
|
||
print(
|
||
f">>> OFF local hub {hub_1} USB port {port_0} ({self._port_power_feedback(hub_1, port_0)})",
|
||
flush=True,
|
||
)
|
||
else:
|
||
rs.remote_hub_port_power(ssh_host, hub_1, port_0, False)
|
||
print(
|
||
f">>> OFF {ssh_host} hub {hub_1} USB port {port_0}",
|
||
flush=True,
|
||
)
|
||
|
||
def _port_power_feedback(self, hub_1, port_0):
|
||
"""Return short status string after a change (hub power state + optional current)."""
|
||
h_idx = hub_1 - 1
|
||
stem = self.hubs[h_idx]
|
||
st = stem.usb.getPortState(port_0)
|
||
if st.error != self.SUCCESS:
|
||
return f"getPortState error {st.error}"
|
||
on = (st.value & 1) != 0
|
||
cur = stem.usb.getPortCurrent(port_0)
|
||
ma = cur.value / 1000.0 if cur.error == self.SUCCESS else None
|
||
bits = f"hub reports {'ON' if on else 'OFF'}"
|
||
if ma is not None and abs(ma) > 15.0:
|
||
bits += f", ~{ma:.0f} mA"
|
||
return bits
|
||
|
||
def _write_fiber_map_document(self, doc):
|
||
path = fiber_map_path()
|
||
out = fm.ensure_fiber_map_document(doc)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(out, f, indent=2)
|
||
f.write("\n")
|
||
print(f"Wrote {path}", flush=True)
|
||
|
||
def panel_calibrate(self, merge=False, limit=None, calibrate_ssh_hosts=None):
|
||
"""
|
||
Walk downstream USB ports in hub order: local hubs first, then each --ssh / calibrate_remotes host.
|
||
You type the fiber port id for each step; writes one fiber_map.json (adds ssh on remote steps).
|
||
"""
|
||
calibrate_ssh_hosts = list(calibrate_ssh_hosts or [])
|
||
try:
|
||
existing = fm.load_fiber_map_document()
|
||
except (OSError, json.JSONDecodeError, ValueError) as exc:
|
||
print(f"Could not load existing map: {exc}", file=sys.stderr, flush=True)
|
||
return
|
||
if merge and existing is not None:
|
||
doc = copy.deepcopy(existing)
|
||
elif existing is not None and isinstance(existing, dict):
|
||
doc = {k: copy.deepcopy(v) for k, v in existing.items() if k != "fiber_ports"}
|
||
doc["fiber_ports"] = {}
|
||
else:
|
||
doc = {"fiber_ports": {}}
|
||
doc = fm.ensure_fiber_map_document(doc)
|
||
|
||
seen_h = set()
|
||
cli_hosts = []
|
||
for h in calibrate_ssh_hosts:
|
||
s = str(h).strip()
|
||
if s and s not in seen_h:
|
||
seen_h.add(s)
|
||
cli_hosts.append(s)
|
||
cr = doc.get("calibrate_remotes")
|
||
if isinstance(cr, list):
|
||
for x in cr:
|
||
s = str(x).strip()
|
||
if s and s not in seen_h:
|
||
seen_h.add(s)
|
||
cli_hosts.append(s)
|
||
|
||
rs.apply_remote_ssh_env_file()
|
||
env_rem = os.environ.get("HUB_MANAGER_CALIBRATE_REMOTES", "").strip()
|
||
if env_rem:
|
||
added_from_env = []
|
||
for part in env_rem.split(","):
|
||
s = part.strip()
|
||
if s and s not in seen_h:
|
||
seen_h.add(s)
|
||
cli_hosts.append(s)
|
||
added_from_env.append(s)
|
||
if added_from_env:
|
||
print(
|
||
f"hub_manager: calibrate remotes from HUB_MANAGER_CALIBRATE_REMOTES: "
|
||
f"{', '.join(added_from_env)}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
|
||
# Always try hard to open local hubs for calibrate: full errors + stem-class retry (see _connect_specs).
|
||
saw_specs_connect_failed = False
|
||
local_ok = bool(self.hubs)
|
||
if not local_ok:
|
||
specs_pre = self._enumerate_usb_specs()
|
||
if specs_pre:
|
||
local_ok = self._connect_specs(specs_pre, quiet=False)
|
||
if not local_ok:
|
||
saw_specs_connect_failed = True
|
||
if not local_ok and cli_hosts:
|
||
print(
|
||
"hub_manager: Local USB shows Acroname module(s) but no hub opened — "
|
||
"this calibrate run will skip local ports and use only --ssh.\n"
|
||
" Fix Fedora access, then re-run to include local + Pi in one pass:\n"
|
||
" python3 hub_manager.py setup && sudo install -m 0644 99-acroname.rules /etc/udev/rules.d/\n"
|
||
" (setup needs a working connect; if it still fails, generic vendor rule:)\n"
|
||
' echo \'SUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666"\' | sudo tee /etc/udev/rules.d/99-acroname.rules\n'
|
||
" sudo udevadm control --reload-rules && sudo udevadm trigger\n"
|
||
" Unplug/replug the hub.\n",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
else:
|
||
local_ok = self.connect(quiet=bool(cli_hosts))
|
||
local_ordered = self._ordered_downstream_ports() if local_ok else []
|
||
|
||
steps = []
|
||
for hub_1, port_0 in local_ordered:
|
||
steps.append((None, hub_1, port_0))
|
||
for host in cli_hosts:
|
||
remote_pairs = rs.fetch_calibrate_ports_json(host)
|
||
if not remote_pairs:
|
||
print(
|
||
f"hub_manager: WARNING: 0 remote calibrate steps from {host!r} — SSH hub_manager returned "
|
||
"no port list (often: Pi used system python3 without brainstem). On this PC set "
|
||
"HUB_MANAGER_REMOTE_PYTHON and HUB_MANAGER_REMOTE_SCRIPT in remote_ssh.env to paths "
|
||
"that exist on the Pi (venv python3 + hub_manager.py). See remote_ssh.env.example.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
for hub_1, port_0 in remote_pairs:
|
||
steps.append((host, hub_1, port_0))
|
||
|
||
if limit is not None:
|
||
steps = steps[: max(0, limit)]
|
||
|
||
if not steps:
|
||
if saw_specs_connect_failed and not cli_hosts:
|
||
print(
|
||
"hub_manager: This PC sees Acroname USB module(s) in BrainStem discovery but connectFromSpec "
|
||
"failed, and no remote host was given for calibrate.\n"
|
||
" If your hubs are on a Raspberry Pi (or another machine), run from here:\n"
|
||
" python3 hub_manager.py panel calibrate merge --ssh pi@<pi-address>\n"
|
||
" (repeat --ssh for each host). Or put in fiber_map.json next to hub_manager.py:\n"
|
||
' "calibrate_remotes": ["pi@<pi-address>"]\n'
|
||
" Use remote_ssh.env on this PC if the Pi uses a venv path for python / script.\n"
|
||
" If the hubs are really plugged into *this* Fedora box, fix USB access (udev 24ff, plugdev, "
|
||
"unplug/replug) until `python3 hub_manager.py discover` opens them.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
print(
|
||
"Nothing to calibrate: no local Acroname hubs and no remote ports "
|
||
"(use --ssh user@host or calibrate_remotes in fiber_map.json).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return
|
||
|
||
n_loc = sum(1 for s in steps if s[0] is None)
|
||
n_rem = len(steps) - n_loc
|
||
|
||
# Baseline: only one downstream port should be powered per step.
|
||
if n_loc > 0 and self.hubs:
|
||
nlp = sum(self._port_count(s) for s in self.hubs)
|
||
print(f"Turning OFF every local downstream USB port (baseline, {nlp} port(s))…", flush=True)
|
||
for stem in self.hubs:
|
||
for p in range(self._port_count(stem)):
|
||
stem.usb.setPortDisable(p)
|
||
time.sleep(0.6)
|
||
print(" Local baseline done.", flush=True)
|
||
|
||
remote_hosts_ordered = []
|
||
for sh, _, _ in steps:
|
||
if sh is not None and sh not in remote_hosts_ordered:
|
||
remote_hosts_ordered.append(sh)
|
||
for host in remote_hosts_ordered:
|
||
pairs = sorted({(h, p) for s_h, h, p in steps if s_h == host})
|
||
n_off = len(pairs)
|
||
print(
|
||
f"Turning OFF every downstream USB port on {host} (baseline, {n_off} SSH round trip(s))…",
|
||
flush=True,
|
||
)
|
||
for i, (h, p) in enumerate(pairs, start=1):
|
||
rc, rerr = rs.remote_hub_port_power(host, h, p, False)
|
||
if rc != 0:
|
||
print(
|
||
f" [{i}/{n_off}] off {h}.{p} → exit {rc}: {(rerr or '').strip()[:120]}",
|
||
flush=True,
|
||
)
|
||
else:
|
||
print(f" [{i}/{n_off}] off {h}.{p} ok", flush=True)
|
||
time.sleep(0.6)
|
||
print(f" Remote baseline done for {host}.", flush=True)
|
||
|
||
print(
|
||
f"Fiber map calibrate: {len(steps)} step(s) — {n_loc} local, {n_rem} via ssh.\n"
|
||
"All downstream ports were turned OFF first so only one port is ON per step.\n"
|
||
"Order: all local hub ports (hub 1 port 0 first), then each --ssh host’s ports in order.\n"
|
||
"Each step: lsusb snapshot (port OFF) → ON (~2s) → new lsusb lines (chip hint) → fiber id, s=skip, q=quit.\n"
|
||
"Port stays ON through optional PCIe prompts, then powers OFF. Ctrl-C anytime saves fiber_map.json and exits.\n"
|
||
"When you map a fiber, usb_id / chip_type are saved if new lsusb lines appeared.\n"
|
||
"Remote steps store ssh in fiber_map.json automatically.\n"
|
||
"After each fiber id you can pick PCIe by number: 1–6 = known Adnacom H3 card, then SFP 1–4 "
|
||
"(no paste), or m=manual / c=clear / Enter=keep. Edit fiber_map.json anytime (see example).",
|
||
flush=True,
|
||
)
|
||
|
||
ports = doc["fiber_ports"]
|
||
for ssh_host, hub_1, port_0 in steps:
|
||
route = "local" if ssh_host is None else ssh_host
|
||
before_lsusb = (
|
||
usb.lsusb_lines()
|
||
if ssh_host is None
|
||
else rs.remote_lsusb_lines(ssh_host)
|
||
)
|
||
chip_hint_lines = []
|
||
step_powered = False
|
||
line = ""
|
||
print(f"\n>>> ON {route} hub {hub_1} USB port {port_0}", flush=True)
|
||
try:
|
||
if ssh_host is None:
|
||
self._set_hub_port_power(hub_1, port_0, True)
|
||
print(f" {self._port_power_feedback(hub_1, port_0)}", flush=True)
|
||
step_powered = True
|
||
else:
|
||
rc, rmsg = rs.remote_hub_port_power(ssh_host, hub_1, port_0, True)
|
||
if rc != 0:
|
||
snippet = (rmsg or "").strip()[:800]
|
||
print(
|
||
f" remote on failed (exit {rc})"
|
||
+ (f": {snippet}" if snippet else ""),
|
||
flush=True,
|
||
)
|
||
step_powered = False
|
||
else:
|
||
time.sleep(0.25)
|
||
print(f" {rs.remote_port_power_feedback(ssh_host, hub_1, port_0)}", flush=True)
|
||
step_powered = True
|
||
time.sleep(2.0)
|
||
if ssh_host is not None and not step_powered:
|
||
print(
|
||
" Skipping this calibrate step (remote ON failed; fix SSH / Pi hub_manager exit code).",
|
||
flush=True,
|
||
)
|
||
continue
|
||
after_lsusb = (
|
||
usb.lsusb_lines()
|
||
if ssh_host is None
|
||
else rs.remote_lsusb_lines(ssh_host)
|
||
)
|
||
chip_hint_lines = usb.lsusb_new_devices(before_lsusb, after_lsusb)
|
||
if chip_hint_lines:
|
||
print(" USB / chip (lsusb lines new vs port OFF):", flush=True)
|
||
for ln in chip_hint_lines:
|
||
print(f" {ln}", flush=True)
|
||
elif before_lsusb or after_lsusb:
|
||
print(
|
||
" (No new lsusb lines vs OFF snapshot — device may already be listed, or hub not downstream of host.)",
|
||
flush=True,
|
||
)
|
||
else:
|
||
print(
|
||
" (lsusb unavailable — install usbutils on this machine / Pi.)",
|
||
flush=True,
|
||
)
|
||
try:
|
||
line = input(
|
||
"Which fiber port id? [s=skip q=quit, Ctrl-C=save map & exit]: "
|
||
).strip()
|
||
except EOFError:
|
||
print(
|
||
"\n*** No input (stdin closed). Use a TTY, e.g. ssh -t … panel calibrate … ***\n",
|
||
flush=True,
|
||
)
|
||
line = "q"
|
||
except KeyboardInterrupt:
|
||
print(
|
||
"\n*** Calibrate interrupted (Ctrl-C); writing fiber_map.json and powering off this port. ***\n",
|
||
flush=True,
|
||
)
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
self._write_fiber_map_document(doc)
|
||
raise SystemExit(130)
|
||
|
||
low = line.lower()
|
||
if low == "q":
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
break
|
||
if low == "s" or not line:
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
continue
|
||
try:
|
||
pn = int(line)
|
||
except ValueError:
|
||
print(" Ignored (not an integer).", flush=True)
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
continue
|
||
if pn < 1:
|
||
print(" Ignored (fiber port id must be >= 1).", flush=True)
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
continue
|
||
key = str(pn)
|
||
prev = fm.fiber_entry_hub_port(ports.get(key))
|
||
prev_ssh = fm.fiber_ssh_target(ports.get(key)) if isinstance(ports.get(key), dict) else None
|
||
if prev is not None and (prev != (hub_1, port_0) or (ssh_host or None) != (prev_ssh or None)):
|
||
ps = f" ssh={prev_ssh!r}" if prev_ssh else ""
|
||
print(f" Note: fiber port {pn} was {prev[0]}.{prev[1]}{ps}; now {hub_1}.{port_0} @ {route}", flush=True)
|
||
base = ports.get(key)
|
||
if isinstance(base, dict):
|
||
entry = dict(base)
|
||
else:
|
||
entry = {}
|
||
entry["hub"] = hub_1
|
||
entry["port"] = port_0
|
||
if ssh_host:
|
||
entry["ssh"] = ssh_host
|
||
for k in ("remote", "host", "user"):
|
||
entry.pop(k, None)
|
||
else:
|
||
entry.pop("ssh", None)
|
||
entry.pop("remote", None)
|
||
entry.pop("host", None)
|
||
entry.pop("user", None)
|
||
if chip_hint_lines:
|
||
for k in (
|
||
"usb_lsusb_lines",
|
||
"usb_id",
|
||
"usb_ids",
|
||
"chip_type",
|
||
"chip_profiled_at",
|
||
):
|
||
entry.pop(k, None)
|
||
entry.update(fm.chip_fields_from_lsusb_lines(chip_hint_lines))
|
||
try:
|
||
action, pdata = fm.prompt_pcie_metadata_for_calibrate(entry.get("pcie"))
|
||
except KeyboardInterrupt:
|
||
print(
|
||
"\n*** Interrupted during PCIe prompts; saving this port’s map row and powering off. ***\n",
|
||
flush=True,
|
||
)
|
||
ports[key] = entry
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
self._write_fiber_map_document(doc)
|
||
raise SystemExit(130)
|
||
if action == "clear":
|
||
entry.pop("pcie", None)
|
||
elif action == "set" and pdata is not None:
|
||
if pdata:
|
||
entry["pcie"] = pdata
|
||
else:
|
||
entry.pop("pcie", None)
|
||
ports[key] = entry
|
||
if step_powered:
|
||
self._calibrate_step_power_off(ssh_host, hub_1, port_0)
|
||
|
||
self._write_fiber_map_document(doc)
|
||
|
||
def disconnect(self):
|
||
for stem in self.hubs: stem.disconnect()
|
||
|