#!/usr/bin/env python3 import sys import time import asyncio import os import subprocess import brainstem class AcronameManager: def __init__(self): self.hubs = [] self.SUCCESS = brainstem.result.Result.NO_ERROR def connect(self): """Finds all hubs and sorts by serial number for consistent Hub 1/2 naming.""" try: # v2.12 API discovery call specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB) except AttributeError: # Fallback for older SDK variations from brainstem.discovery import Discovery specs = Discovery.findAll(brainstem.link.Spec.USB) specs.sort(key=lambda x: x.serial_number) for spec in specs: stem = brainstem.stem.USBHub3p() res = stem.connectFromSpec(spec) if res == self.SUCCESS: self.hubs.append(stem) if not self.hubs: print("Error: No Acroname hubs found.") return False return True def _parse_target(self, target_str): if target_str.lower() == 'all': return 'all', 'all' try: h_num, p_idx = target_str.split('.') h_idx = 'all' if h_num.lower() == 'all' else int(h_num) - 1 p_idx = 'all' if p_idx.lower() == 'all' else int(p_idx) return h_idx, p_idx except ValueError: print(f"Format Error: Use '1.4' for Hub 1 Port 4, or 'all'") sys.exit(1) def _sample_inrush(self, stem, port, sample_duration=0.3): """Captures peak current and duration for the reboot report.""" samples = [] start_time = time.time() while (time.time() - start_time) < sample_duration: curr = stem.usb.getPortCurrent(port).value / 1000.0 samples.append((time.time() - start_time, curr)) peak_current = max(s[1] for s in samples) if samples else 0.0 duration = 0 if peak_current > 15.0: for t, c in samples: if c >= peak_current * 0.9: duration = t return {"peak": peak_current, "duration": duration * 1000} def status(self, target_str="all"): if not self.hubs and not self.connect(): return h_target, p_target = self._parse_target(target_str) print(f"{'Identity':<8} | {'Port':<5} | {'Power':<7} | {'Current (mA)':<12}") print("-" * 55) for i, stem in enumerate(self.hubs): if h_target != 'all' and i != h_target: continue ports = range(8) if p_target == 'all' else [p_target] for port in ports: pwr_val = stem.usb.getPortState(port).value pwr_str = "ON" if (pwr_val & 1) else "OFF" raw_curr = stem.usb.getPortCurrent(port).value / 1000.0 current = raw_curr if abs(raw_curr) > 15.0 else 0.0 print(f"Hub {i+1:<3} | {port:<5} | {pwr_str:<7} | {current:<12.2f}") async def _async_reboot_port(self, h_idx, stem, port, delay, stagger, skip_empty): """Reboots a port and captures inrush vs steady state data.""" current_ma = stem.usb.getPortCurrent(port).value / 1000.0 if skip_empty and current_ma < 15.0: return {"hub": h_idx+1, "port": port, "status": "Skipped", "peak": 0.0, "duration": 0.0, "steady": 0.0} stem.usb.setPortDisable(port) await asyncio.sleep(delay) if stagger > 0: await asyncio.sleep(stagger * port) stem.usb.setPortEnable(port) # Capture the immediate spike inrush_stats = self._sample_inrush(stem, port) # Increased to 2 seconds for radio stabilization await asyncio.sleep(2.0) steady_ma = stem.usb.getPortCurrent(port).value / 1000.0 return { "hub": h_idx+1, "port": port, "status": "Rebooted", "peak": inrush_stats['peak'], "duration": inrush_stats['duration'], "steady": steady_ma if steady_ma > 15.0 else 0.0 } def reboot(self, target_str, delay=2, stagger=0.2, skip_empty=True): if not self.hubs and not self.connect(): return h_target, p_target = self._parse_target(target_str) tasks = [] for i, stem in enumerate(self.hubs): if h_target != 'all' and i != h_target: continue ports = range(8) if p_target == 'all' else [p_target] for port in ports: tasks.append(self._async_reboot_port(i, stem, port, delay, stagger, skip_empty)) if tasks: print(f"{'Identity':<8} | {'Port':<5} | {'Action':<10} | {'Peak(mA)':<10} | {'Steady(mA)':<12} | {'Settle(ms)':<10}") print("-" * 75) results = asyncio.run(self._run_tasks(tasks)) for r in sorted(results, key=lambda x: (x['hub'], x['port'])): print(f"Hub {r['hub']:<3} | {r['port']:<5} | {r['status']:<10} | {r['peak']:<10.2f} | {r['steady']:<12.2f} | {r['duration']:<10.2f}") async def _run_tasks(self, tasks): return await asyncio.gather(*tasks) def power(self, mode, target_str): if not self.hubs and not self.connect(): return h_target, p_target = self._parse_target(target_str) for i, stem in enumerate(self.hubs): if h_target != 'all' and i != h_target: continue ports = range(8) if p_target == 'all' else [p_target] for port in ports: if mode.lower() == 'on': stem.usb.setPortEnable(port) else: stem.usb.setPortDisable(port) self.status(target_str) def setup_udev(self): if not self.hubs and not self.connect(): return rule_path = "/etc/udev/rules.d/99-acroname.rules" lines = ['# Acroname Hub Permissions\nSUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666", GROUP="plugdev"'] for i, stem in enumerate(self.hubs): res = stem.system.getSerialNumber() if res.error == self.SUCCESS: sn = f"{res.value:08X}" lines.append(f'SUBSYSTEM=="usb", ATTR{{idVendor}}=="24ff", ATTR{{serial}}=="{sn}", SYMLINK+="acroname_hub{i+1}"') with open("99-acroname.rules", "w") as f: f.write("\n".join(lines)) print(f"udev rules generated. Install with: sudo mv 99-acroname.rules {rule_path}") def verify(self): for i in range(1, 3): link = f"/dev/acroname_hub{i}" if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}") else: print(f"[ERROR] {link} not found.") def disconnect(self): for stem in self.hubs: stem.disconnect() if __name__ == "__main__": mgr = AcronameManager() try: cmd = sys.argv[1].lower() if len(sys.argv) > 1 else "status" target = sys.argv[2] if len(sys.argv) > 2 else "all" if cmd == "status": mgr.status(target) elif cmd in ["on", "off"]: mgr.power(cmd, target) elif cmd in ["reboot", "reboot-force"]: mgr.reboot(target, skip_empty=(cmd == "reboot")) elif cmd == "setup": mgr.setup_udev() elif cmd == "verify": mgr.verify() finally: mgr.disconnect()