UmberHubManager/hub_manager.py

171 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import time
import asyncio
import os
import subprocess
import brainstem
class AcronameManager:
def __init__(self):
self.hubs = []
self.SUCCESS = brainstem.result.Result.NO_ERROR
def connect(self):
"""Finds all hubs and sorts by serial number for consistent Hub 1/2 naming."""
try:
# v2.12 API discovery call
specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
except AttributeError:
# Fallback for older SDK variations
from brainstem.discovery import Discovery
specs = Discovery.findAll(brainstem.link.Spec.USB)
specs.sort(key=lambda x: x.serial_number)
for spec in specs:
stem = brainstem.stem.USBHub3p()
res = stem.connectFromSpec(spec)
if res == self.SUCCESS:
self.hubs.append(stem)
if not self.hubs:
print("Error: No Acroname hubs found.")
return False
return True
def _parse_target(self, target_str):
if target_str.lower() == 'all':
return 'all', 'all'
try:
h_num, p_idx = target_str.split('.')
h_idx = 'all' if h_num.lower() == 'all' else int(h_num) - 1
p_idx = 'all' if p_idx.lower() == 'all' else int(p_idx)
return h_idx, p_idx
except ValueError:
print(f"Format Error: Use '1.4' for Hub 1 Port 4, or 'all'")
sys.exit(1)
def _sample_inrush(self, stem, port, sample_duration=0.3):
"""Captures peak current and duration for the reboot report."""
samples = []
start_time = time.time()
while (time.time() - start_time) < sample_duration:
curr = stem.usb.getPortCurrent(port).value / 1000.0
samples.append((time.time() - start_time, curr))
peak_current = max(s[1] for s in samples) if samples else 0.0
duration = 0
if peak_current > 15.0:
for t, c in samples:
if c >= peak_current * 0.9:
duration = t
return {"peak": peak_current, "duration": duration * 1000}
def status(self, target_str="all"):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
print(f"{'Identity':<8} | {'Port':<5} | {'Power':<7} | {'Current (mA)':<12}")
print("-" * 55)
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = range(8) if p_target == 'all' else [p_target]
for port in ports:
pwr_val = stem.usb.getPortState(port).value
pwr_str = "ON" if (pwr_val & 1) else "OFF"
raw_curr = stem.usb.getPortCurrent(port).value / 1000.0
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
print(f"Hub {i+1:<3} | {port:<5} | {pwr_str:<7} | {current:<12.2f}")
async def _async_reboot_port(self, h_idx, stem, port, delay, stagger, skip_empty):
"""Reboots a port and captures inrush vs steady state data."""
current_ma = stem.usb.getPortCurrent(port).value / 1000.0
if skip_empty and current_ma < 15.0:
return {"hub": h_idx+1, "port": port, "status": "Skipped", "peak": 0.0, "duration": 0.0, "steady": 0.0}
stem.usb.setPortDisable(port)
await asyncio.sleep(delay)
if stagger > 0:
await asyncio.sleep(stagger * port)
stem.usb.setPortEnable(port)
# Capture the immediate spike
inrush_stats = self._sample_inrush(stem, port)
# Increased to 2 seconds for radio stabilization
await asyncio.sleep(2.0)
steady_ma = stem.usb.getPortCurrent(port).value / 1000.0
return {
"hub": h_idx+1, "port": port, "status": "Rebooted",
"peak": inrush_stats['peak'], "duration": inrush_stats['duration'],
"steady": steady_ma if steady_ma > 15.0 else 0.0
}
def reboot(self, target_str, delay=2, stagger=0.2, skip_empty=True):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
tasks = []
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = range(8) if p_target == 'all' else [p_target]
for port in ports:
tasks.append(self._async_reboot_port(i, stem, port, delay, stagger, skip_empty))
if tasks:
print(f"{'Identity':<8} | {'Port':<5} | {'Action':<10} | {'Peak(mA)':<10} | {'Steady(mA)':<12} | {'Settle(ms)':<10}")
print("-" * 75)
results = asyncio.run(self._run_tasks(tasks))
for r in sorted(results, key=lambda x: (x['hub'], x['port'])):
print(f"Hub {r['hub']:<3} | {r['port']:<5} | {r['status']:<10} | {r['peak']:<10.2f} | {r['steady']:<12.2f} | {r['duration']:<10.2f}")
async def _run_tasks(self, tasks):
return await asyncio.gather(*tasks)
def power(self, mode, target_str):
if not self.hubs and not self.connect(): return
h_target, p_target = self._parse_target(target_str)
for i, stem in enumerate(self.hubs):
if h_target != 'all' and i != h_target: continue
ports = range(8) if p_target == 'all' else [p_target]
for port in ports:
if mode.lower() == 'on': stem.usb.setPortEnable(port)
else: stem.usb.setPortDisable(port)
self.status(target_str)
def setup_udev(self):
if not self.hubs and not self.connect(): return
rule_path = "/etc/udev/rules.d/99-acroname.rules"
lines = ['# Acroname Hub Permissions\nSUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666", GROUP="plugdev"']
for i, stem in enumerate(self.hubs):
res = stem.system.getSerialNumber()
if res.error == self.SUCCESS:
sn = f"{res.value:08X}"
lines.append(f'SUBSYSTEM=="usb", ATTR{{idVendor}}=="24ff", ATTR{{serial}}=="{sn}", SYMLINK+="acroname_hub{i+1}"')
with open("99-acroname.rules", "w") as f: f.write("\n".join(lines))
print(f"udev rules generated. Install with: sudo mv 99-acroname.rules {rule_path}")
def verify(self):
for i in range(1, 3):
link = f"/dev/acroname_hub{i}"
if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}")
else: print(f"[ERROR] {link} not found.")
def disconnect(self):
for stem in self.hubs: stem.disconnect()
if __name__ == "__main__":
mgr = AcronameManager()
try:
cmd = sys.argv[1].lower() if len(sys.argv) > 1 else "status"
target = sys.argv[2] if len(sys.argv) > 2 else "all"
if cmd == "status": mgr.status(target)
elif cmd in ["on", "off"]: mgr.power(cmd, target)
elif cmd in ["reboot", "reboot-force"]:
mgr.reboot(target, skip_empty=(cmd == "reboot"))
elif cmd == "setup": mgr.setup_udev()
elif cmd == "verify": mgr.verify()
finally:
mgr.disconnect()