171 lines
7.1 KiB
Python
Executable File
171 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import sys
|
|
import time
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import brainstem
|
|
|
|
class AcronameManager:
|
|
def __init__(self):
|
|
self.hubs = []
|
|
self.SUCCESS = brainstem.result.Result.NO_ERROR
|
|
|
|
def connect(self):
|
|
"""Finds all hubs and sorts by serial number for consistent Hub 1/2 naming."""
|
|
try:
|
|
# v2.12 API discovery call
|
|
specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
|
|
except AttributeError:
|
|
# Fallback for older SDK variations
|
|
from brainstem.discovery import Discovery
|
|
specs = Discovery.findAll(brainstem.link.Spec.USB)
|
|
|
|
specs.sort(key=lambda x: x.serial_number)
|
|
|
|
for spec in specs:
|
|
stem = brainstem.stem.USBHub3p()
|
|
res = stem.connectFromSpec(spec)
|
|
if res == self.SUCCESS:
|
|
self.hubs.append(stem)
|
|
|
|
if not self.hubs:
|
|
print("Error: No Acroname hubs found.")
|
|
return False
|
|
return True
|
|
|
|
def _parse_target(self, target_str):
|
|
if target_str.lower() == 'all':
|
|
return 'all', 'all'
|
|
try:
|
|
h_num, p_idx = target_str.split('.')
|
|
h_idx = 'all' if h_num.lower() == 'all' else int(h_num) - 1
|
|
p_idx = 'all' if p_idx.lower() == 'all' else int(p_idx)
|
|
return h_idx, p_idx
|
|
except ValueError:
|
|
print(f"Format Error: Use '1.4' for Hub 1 Port 4, or 'all'")
|
|
sys.exit(1)
|
|
|
|
def _sample_inrush(self, stem, port, sample_duration=0.3):
|
|
"""Captures peak current and duration for the reboot report."""
|
|
samples = []
|
|
start_time = time.time()
|
|
while (time.time() - start_time) < sample_duration:
|
|
curr = stem.usb.getPortCurrent(port).value / 1000.0
|
|
samples.append((time.time() - start_time, curr))
|
|
|
|
peak_current = max(s[1] for s in samples) if samples else 0.0
|
|
duration = 0
|
|
if peak_current > 15.0:
|
|
for t, c in samples:
|
|
if c >= peak_current * 0.9:
|
|
duration = t
|
|
return {"peak": peak_current, "duration": duration * 1000}
|
|
|
|
def status(self, target_str="all"):
|
|
if not self.hubs and not self.connect(): return
|
|
h_target, p_target = self._parse_target(target_str)
|
|
print(f"{'Identity':<8} | {'Port':<5} | {'Power':<7} | {'Current (mA)':<12}")
|
|
print("-" * 55)
|
|
for i, stem in enumerate(self.hubs):
|
|
if h_target != 'all' and i != h_target: continue
|
|
ports = range(8) if p_target == 'all' else [p_target]
|
|
for port in ports:
|
|
pwr_val = stem.usb.getPortState(port).value
|
|
pwr_str = "ON" if (pwr_val & 1) else "OFF"
|
|
raw_curr = stem.usb.getPortCurrent(port).value / 1000.0
|
|
current = raw_curr if abs(raw_curr) > 15.0 else 0.0
|
|
print(f"Hub {i+1:<3} | {port:<5} | {pwr_str:<7} | {current:<12.2f}")
|
|
|
|
async def _async_reboot_port(self, h_idx, stem, port, delay, stagger, skip_empty):
|
|
"""Reboots a port and captures inrush vs steady state data."""
|
|
current_ma = stem.usb.getPortCurrent(port).value / 1000.0
|
|
if skip_empty and current_ma < 15.0:
|
|
return {"hub": h_idx+1, "port": port, "status": "Skipped", "peak": 0.0, "duration": 0.0, "steady": 0.0}
|
|
|
|
stem.usb.setPortDisable(port)
|
|
await asyncio.sleep(delay)
|
|
if stagger > 0:
|
|
await asyncio.sleep(stagger * port)
|
|
|
|
stem.usb.setPortEnable(port)
|
|
# Capture the immediate spike
|
|
inrush_stats = self._sample_inrush(stem, port)
|
|
|
|
# Increased to 2 seconds for radio stabilization
|
|
await asyncio.sleep(2.0)
|
|
steady_ma = stem.usb.getPortCurrent(port).value / 1000.0
|
|
|
|
return {
|
|
"hub": h_idx+1, "port": port, "status": "Rebooted",
|
|
"peak": inrush_stats['peak'], "duration": inrush_stats['duration'],
|
|
"steady": steady_ma if steady_ma > 15.0 else 0.0
|
|
}
|
|
|
|
def reboot(self, target_str, delay=2, stagger=0.2, skip_empty=True):
|
|
if not self.hubs and not self.connect(): return
|
|
h_target, p_target = self._parse_target(target_str)
|
|
|
|
tasks = []
|
|
for i, stem in enumerate(self.hubs):
|
|
if h_target != 'all' and i != h_target: continue
|
|
ports = range(8) if p_target == 'all' else [p_target]
|
|
for port in ports:
|
|
tasks.append(self._async_reboot_port(i, stem, port, delay, stagger, skip_empty))
|
|
|
|
if tasks:
|
|
print(f"{'Identity':<8} | {'Port':<5} | {'Action':<10} | {'Peak(mA)':<10} | {'Steady(mA)':<12} | {'Settle(ms)':<10}")
|
|
print("-" * 75)
|
|
results = asyncio.run(self._run_tasks(tasks))
|
|
for r in sorted(results, key=lambda x: (x['hub'], x['port'])):
|
|
print(f"Hub {r['hub']:<3} | {r['port']:<5} | {r['status']:<10} | {r['peak']:<10.2f} | {r['steady']:<12.2f} | {r['duration']:<10.2f}")
|
|
|
|
async def _run_tasks(self, tasks):
|
|
return await asyncio.gather(*tasks)
|
|
|
|
def power(self, mode, target_str):
|
|
if not self.hubs and not self.connect(): return
|
|
h_target, p_target = self._parse_target(target_str)
|
|
for i, stem in enumerate(self.hubs):
|
|
if h_target != 'all' and i != h_target: continue
|
|
ports = range(8) if p_target == 'all' else [p_target]
|
|
for port in ports:
|
|
if mode.lower() == 'on': stem.usb.setPortEnable(port)
|
|
else: stem.usb.setPortDisable(port)
|
|
self.status(target_str)
|
|
|
|
def setup_udev(self):
|
|
if not self.hubs and not self.connect(): return
|
|
rule_path = "/etc/udev/rules.d/99-acroname.rules"
|
|
lines = ['# Acroname Hub Permissions\nSUBSYSTEM=="usb", ATTR{idVendor}=="24ff", MODE="0666", GROUP="plugdev"']
|
|
for i, stem in enumerate(self.hubs):
|
|
res = stem.system.getSerialNumber()
|
|
if res.error == self.SUCCESS:
|
|
sn = f"{res.value:08X}"
|
|
lines.append(f'SUBSYSTEM=="usb", ATTR{{idVendor}}=="24ff", ATTR{{serial}}=="{sn}", SYMLINK+="acroname_hub{i+1}"')
|
|
with open("99-acroname.rules", "w") as f: f.write("\n".join(lines))
|
|
print(f"udev rules generated. Install with: sudo mv 99-acroname.rules {rule_path}")
|
|
|
|
def verify(self):
|
|
for i in range(1, 3):
|
|
link = f"/dev/acroname_hub{i}"
|
|
if os.path.exists(link): print(f"[OK] Hub {i} -> {os.path.realpath(link)}")
|
|
else: print(f"[ERROR] {link} not found.")
|
|
|
|
def disconnect(self):
|
|
for stem in self.hubs: stem.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
mgr = AcronameManager()
|
|
try:
|
|
cmd = sys.argv[1].lower() if len(sys.argv) > 1 else "status"
|
|
target = sys.argv[2] if len(sys.argv) > 2 else "all"
|
|
if cmd == "status": mgr.status(target)
|
|
elif cmd in ["on", "off"]: mgr.power(cmd, target)
|
|
elif cmd in ["reboot", "reboot-force"]:
|
|
mgr.reboot(target, skip_empty=(cmd == "reboot"))
|
|
elif cmd == "setup": mgr.setup_udev()
|
|
elif cmd == "verify": mgr.verify()
|
|
finally:
|
|
mgr.disconnect()
|