#!/usr/bin/env python3 """ Measure **USB inrush** on a local BrainStem hub port. **Default — on-hub sampling** (``reflex/inrush.reflex``): - Load the compiled ``.map`` into a hub store (ReflexLoader / HubTool), then run this script. - Re-arms the map (``slotDisable`` / ``slotEnable``) to run ``mapEnable`` so Timer[0] and scratchpad reset, optionally power-cycles the downstream port, **waits** ``--sample-ms`` on the host while the hub samples current at its timer rate (e.g. 100 µs), then reads **scratchpad** via ``pointer[N]`` (offsets match ``reflex/inrush.reflex``). **Legacy — host sampling** (``--host-sample``): - Poll ``getPortCurrent`` on a deadline grid (default 1 ms). No Reflex map required. **Standalone**:: python tests/check_inrush.py 1 0 python tests/check_inrush.py 1 0 --host-sample python tests/check_inrush.py --config uax24 2 4 --map-slot 1 --json ``hub`` is **1-based**; ``port`` is **0-based**. On-hub mode: ``reflex/inrush.reflex`` is built for **port 0** only unless you recompile with another index. ``--config`` sets ``FIWI_CONFIG``. """ from __future__ import annotations import argparse import json import math import os import sys import time _ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) # Must match reflex/inrush.reflex scratchpad layout (byte offsets into pad). _PAD_PEAK_UA = 0 _PAD_TICKS = 4 _PAD_ABOVE_THR_TICKS = 8 _PAD_SAMPLE_PERIOD_US = 12 # Reflex uses 50 mA threshold (50000 µA); host --threshold-ma does not change on-hub tallies. _REFLEX_THRESHOLD_MA = 50.0 def _configure_paths() -> None: import fiwi.paths as paths_mod paths_mod.configure(_ROOT) def _sleep_until(deadline: float) -> None: while True: now = time.perf_counter() if now >= deadline: return time.sleep(deadline - now) def _i32(v: int) -> int: v &= 0xFFFFFFFF if v >= 0x80000000: return v - 0x100000000 return v def _read_pad_i32(stem, pointer_index: int, byte_offset: int, ok) -> int | None: """Read signed 32-bit from reflex scratchpad at byte offset.""" try: ptr = stem.pointer[pointer_index] except (IndexError, AttributeError, TypeError): return None pm = getattr(ptr, "POINTER_MODE_STATIC", 0) try: ptr.setMode(pm) except Exception: pass if ptr.setOffset(byte_offset) != ok: return None r = ptr.getInt() if r.error != ok: return None return _i32(int(r.value)) def _rearm_map_slot(stem, store_index: int, slot: int, ok) -> bool: """Disable/enable store slot to re-trigger ``mapEnable`` on the loaded reflex map.""" try: st = stem.store[store_index] except (IndexError, AttributeError, TypeError): print( "check_inrush: hub has no store at this index (on-hub mode needs a loaded .map).", file=sys.stderr, flush=True, ) return False if st.slotDisable(slot) != ok: return False time.sleep(0.05) if st.slotEnable(slot) != ok: return False time.sleep(0.02) return True def _measure_inrush_on_hub( c, hub_idx_0: int, port: int, *, off_s: float, sample_s: float, power_cycle: bool, store_index: int, map_slot: int, pointer_index: int, rearm_map: bool, ) -> dict[str, object]: stem = c.hubs[hub_idx_0] ok = c.SUCCESS if power_cycle: stem.usb.setPortDisable(port) time.sleep(off_s) if rearm_map: if not _rearm_map_slot(stem, store_index, map_slot, ok): return {"error": "rearm_map_failed"} if power_cycle: stem.usb.setPortEnable(port) time.sleep(sample_s) peak_ua = _read_pad_i32(stem, pointer_index, _PAD_PEAK_UA, ok) ticks = _read_pad_i32(stem, pointer_index, _PAD_TICKS, ok) above_ticks = _read_pad_i32(stem, pointer_index, _PAD_ABOVE_THR_TICKS, ok) period_us = _read_pad_i32(stem, pointer_index, _PAD_SAMPLE_PERIOD_US, ok) sn_res = stem.system.getSerialNumber() serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?" if peak_ua is None or ticks is None or above_ticks is None or period_us is None: return { "error": "scratchpad_read_failed", "hub": hub_idx_0 + 1, "port": port, "serial": serial, } peak_ma = peak_ua / 1000.0 p_us = max(int(period_us), 0) t = max(int(ticks), 0) at = max(int(above_ticks), 0) obs_us = t * p_us above_us = at * p_us return { "mode": "on-hub", "hub": hub_idx_0 + 1, "port": port, "serial": serial, "peak_ma": round(peak_ma, 3), "peak_ua": peak_ua, "ticks": t, "sample_period_us": p_us, "observation_us": obs_us, "above_threshold_ticks": at, "above_threshold_us": above_us, "reflex_threshold_ma": _REFLEX_THRESHOLD_MA, "sample_window_ms": round(sample_s * 1000.0, 3), "power_cycled": power_cycle, "map_store_index": store_index, "map_slot": map_slot, "pointer_index": pointer_index, } def _measure_inrush_host( c, hub_idx_0: int, port: int, *, off_s: float, sample_s: float, interval_s: float, threshold_ma: float, power_cycle: bool, ) -> dict[str, object]: stem = c.hubs[hub_idx_0] ok = c.SUCCESS if power_cycle: stem.usb.setPortDisable(port) time.sleep(off_s) stem.usb.setPortEnable(port) interval_s = max(interval_s, 0.0005) n_samples = max(1, int(round(sample_s / interval_s))) samples: list[tuple[float, float]] = [] t0 = time.perf_counter() peak_ma = float("-inf") peak_first_s = 0.0 first_above_s: float | None = None above_count = 0 for i in range(n_samples): _sleep_until(t0 + i * interval_s) rel_s = time.perf_counter() - t0 cr = stem.usb.getPortCurrent(port) if cr.error != ok: ma = float("nan") else: ma = cr.value / 1000.0 samples.append((rel_s, ma)) if not math.isnan(ma): if ma > peak_ma: peak_ma = ma peak_first_s = rel_s if ma > threshold_ma: above_count += 1 if first_above_s is None: first_above_s = rel_s t_end = time.perf_counter() if peak_ma == float("-inf") or math.isnan(peak_ma): peak_ma = 0.0 above_s = above_count * interval_s sn_res = stem.system.getSerialNumber() serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?" return { "mode": "host-sample", "hub": hub_idx_0 + 1, "port": port, "serial": serial, "peak_ma": round(peak_ma, 3), "peak_time_ms": round(peak_first_s * 1000.0, 3), "first_above_threshold_ms": ( None if first_above_s is None else round(first_above_s * 1000.0, 3) ), "above_threshold_ma": threshold_ma, "above_threshold_ms": round(above_s * 1000.0, 3), "sample_count": len(samples), "sample_window_ms": round(sample_s * 1000.0, 3), "elapsed_ms": round((t_end - t0) * 1000.0, 3), "interval_ms": round(interval_s * 1000.0, 3), "power_cycled": power_cycle, } def main() -> int: p = argparse.ArgumentParser( description="USB inrush: default on-hub (reflex/inrush.reflex scratchpad); optional host polling.", ) p.add_argument("hub", type=int, help="Hub index 1-based (first hub = 1)") p.add_argument("port", type=int, help="Downstream port index 0-based") p.add_argument( "-c", "--config", metavar="PROFILE_OR_INI", help="Set FIWI_CONFIG (profile name or absolute *.ini)", ) p.add_argument( "--host-sample", action="store_true", help="Poll getPortCurrent on the PC (legacy); default is on-hub reflex scratchpad.", ) p.add_argument( "--off-ms", type=float, default=250.0, help="Milliseconds off before re-enable when power-cycling (default 250)", ) p.add_argument( "--sample-ms", type=float, default=500.0, help="After re-arm (+ optional power-on), wait this long on host while hub samples (default 500)", ) p.add_argument( "--interval-ms", type=float, default=1.0, help="(host-sample only) ms between getPortCurrent reads (default 1)", ) p.add_argument( "--threshold-ma", type=float, default=50.0, help="(host-sample only) threshold for above-threshold tally", ) p.add_argument( "--no-power-cycle", action="store_true", help="Do not disable/enable the downstream port", ) p.add_argument( "--map-store-index", type=int, default=1, help="stem.store index for slotDisable/Enable (USBHub3p: 0 internal, 1 RAM; default 1)", ) p.add_argument( "--map-slot", type=int, default=0, help="Store slot number where inrush.map is loaded (default 0)", ) p.add_argument( "--pointer-index", type=int, default=0, help="stem.pointer index for reflex scratchpad (default 0)", ) p.add_argument( "--no-rearm-map", action="store_true", help="Do not slotDisable/slotEnable before measure (counters/timer may be stale)", ) p.add_argument("--json", action="store_true", help="Print one JSON object on stdout") args = p.parse_args() if args.config: os.environ["FIWI_CONFIG"] = args.config.strip() if args.hub < 1: print("hub must be >= 1", file=sys.stderr, flush=True) return 2 if args.port < 0: print("port must be >= 0", file=sys.stderr, flush=True) return 2 off_s = max(args.off_ms / 1000.0, 0.0) sample_s = max(args.sample_ms / 1000.0, 0.01) interval_s = max(args.interval_ms / 1000.0, 0.0005) _configure_paths() from fiwi.concentrator import FiWiConcentrator c = FiWiConcentrator() try: if not c.connect(): print("No local USB power-control hubs connected.", file=sys.stderr, flush=True) return 1 hi = args.hub - 1 if hi < 0 or hi >= len(c.hubs): print( f"Hub {args.hub} invalid (have {len(c.hubs)} hub(s)).", file=sys.stderr, flush=True, ) return 1 stem = c.hubs[hi] n = c._port_count(stem) if args.port < 0 or args.port >= n: print( f"Port {args.port} out of range for hub {args.hub} (0..{n - 1}).", file=sys.stderr, flush=True, ) return 1 if not args.host_sample and args.port != 0: print( "check_inrush: on-hub reflex/inrush.reflex monitors port 0 only; " f"you asked for port {args.port}. Recompile Reflex or use --host-sample.", file=sys.stderr, flush=True, ) if args.host_sample: out = _measure_inrush_host( c, hi, args.port, off_s=off_s, sample_s=sample_s, interval_s=interval_s, threshold_ma=args.threshold_ma, power_cycle=not args.no_power_cycle, ) else: out = _measure_inrush_on_hub( c, hi, args.port, off_s=off_s, sample_s=sample_s, power_cycle=not args.no_power_cycle, store_index=args.map_store_index, map_slot=args.map_slot, pointer_index=args.pointer_index, rearm_map=not args.no_rearm_map, ) if out.get("error") == "rearm_map_failed": print( "check_inrush: could not re-arm map (store slotDisable/Enable failed). " "Load reflex/inrush.map into --map-store-index / --map-slot, or try --no-rearm-map.", file=sys.stderr, flush=True, ) return 1 if out.get("error") == "scratchpad_read_failed": print( "check_inrush: scratchpad read failed (pointer offset / hub type). " "Confirm inrush.map is loaded, mapEnable ran, and pointer index matches Reflex.", file=sys.stderr, flush=True, ) return 1 if args.json: print(json.dumps(out), flush=True) else: print( f"Hub {out['hub']} port {out['port']} serial {out['serial']}", flush=True, ) if out.get("mode") == "on-hub": print(f" mode: on-hub reflex (scratchpad via pointer {args.pointer_index})", flush=True) print( f" peak current: {out['peak_ma']} mA (peak_ua={out['peak_ua']})", flush=True, ) print( f" hub ticks: {out['ticks']} @ {out['sample_period_us']} µs → " f"~{out['observation_us']} µs on timer grid", flush=True, ) print( f" > {_REFLEX_THRESHOLD_MA} mA (reflex): ~{out['above_threshold_us']} µs " f"({out['above_threshold_ticks']} ticks)", flush=True, ) print( f" host wait: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']} " f"store[{out['map_store_index']}] slot {out['map_slot']}", flush=True, ) if out["ticks"] == 0 and out.get("sample_period_us", 0) == 0: print( " ! ticks and sample_period_us are 0 — map likely not running or wrong pointer.", file=sys.stderr, flush=True, ) else: fat = out["first_above_threshold_ms"] fat_s = f"{fat} ms" if fat is not None else "n/a" print(f" mode: host-sample (getPortCurrent loop)", flush=True) print( f" peak current: {out['peak_ma']} mA (when max first rose: {out['peak_time_ms']} ms)", flush=True, ) print( f" first > {out['above_threshold_ma']} mA: {fat_s}", flush=True, ) print( f" > {out['above_threshold_ma']} mA for ~{out['above_threshold_ms']} ms " f"({out['sample_count']} samples @ {out['interval_ms']} ms; wall {out['elapsed_ms']} ms)", flush=True, ) print( f" nominal window: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']}", flush=True, ) return 0 finally: c.disconnect() if __name__ == "__main__": raise SystemExit(main())