FiWiManager/tests/check_inrush.py

486 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Measure **USB inrush** on a local BrainStem hub port.
**Default — on-hub sampling** (``reflex/inrush.reflex``):
- Load the compiled ``.map`` into a hub store (ReflexLoader / HubTool), then run this script.
- Re-arms the map (``slotDisable`` / ``slotEnable``) to run ``mapEnable`` so Timer[0] and scratchpad
reset, optionally power-cycles the downstream port, **waits** ``--sample-ms`` on the host while
the hub samples current at its timer rate (e.g. 100 µs), then reads **scratchpad** via
``pointer[N]`` (offsets match ``reflex/inrush.reflex``).
**Legacy — host sampling** (``--host-sample``):
- Poll ``getPortCurrent`` on a deadline grid (default 1 ms). No Reflex map required.
**Standalone**::
python tests/check_inrush.py 1 0
python tests/check_inrush.py 1 0 --host-sample
python tests/check_inrush.py --config uax24 2 4 --map-slot 1 --json
``hub`` is **1-based**; ``port`` is **0-based**. On-hub mode: ``reflex/inrush.reflex`` is built for
**port 0** only unless you recompile with another index. ``--config`` sets ``FIWI_CONFIG``.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import time
_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
# Must match reflex/inrush.reflex scratchpad layout (byte offsets into pad).
_PAD_PEAK_UA = 0
_PAD_TICKS = 4
_PAD_ABOVE_THR_TICKS = 8
_PAD_SAMPLE_PERIOD_US = 12
# Reflex uses 50 mA threshold (50000 µA); host --threshold-ma does not change on-hub tallies.
_REFLEX_THRESHOLD_MA = 50.0
def _configure_paths() -> None:
import fiwi.paths as paths_mod
paths_mod.configure(_ROOT)
def _sleep_until(deadline: float) -> None:
while True:
now = time.perf_counter()
if now >= deadline:
return
time.sleep(deadline - now)
def _i32(v: int) -> int:
v &= 0xFFFFFFFF
if v >= 0x80000000:
return v - 0x100000000
return v
def _read_pad_i32(stem, pointer_index: int, byte_offset: int, ok) -> int | None:
"""Read signed 32-bit from reflex scratchpad at byte offset."""
try:
ptr = stem.pointer[pointer_index]
except (IndexError, AttributeError, TypeError):
return None
pm = getattr(ptr, "POINTER_MODE_STATIC", 0)
try:
ptr.setMode(pm)
except Exception:
pass
if ptr.setOffset(byte_offset) != ok:
return None
r = ptr.getInt()
if r.error != ok:
return None
return _i32(int(r.value))
def _rearm_map_slot(stem, store_index: int, slot: int, ok) -> bool:
"""Disable/enable store slot to re-trigger ``mapEnable`` on the loaded reflex map."""
try:
st = stem.store[store_index]
except (IndexError, AttributeError, TypeError):
print(
"check_inrush: hub has no store at this index (on-hub mode needs a loaded .map).",
file=sys.stderr,
flush=True,
)
return False
if st.slotDisable(slot) != ok:
return False
time.sleep(0.05)
if st.slotEnable(slot) != ok:
return False
time.sleep(0.02)
return True
def _measure_inrush_on_hub(
c,
hub_idx_0: int,
port: int,
*,
off_s: float,
sample_s: float,
power_cycle: bool,
store_index: int,
map_slot: int,
pointer_index: int,
rearm_map: bool,
) -> dict[str, object]:
stem = c.hubs[hub_idx_0]
ok = c.SUCCESS
if power_cycle:
stem.usb.setPortDisable(port)
time.sleep(off_s)
if rearm_map:
if not _rearm_map_slot(stem, store_index, map_slot, ok):
return {"error": "rearm_map_failed"}
if power_cycle:
stem.usb.setPortEnable(port)
time.sleep(sample_s)
peak_ua = _read_pad_i32(stem, pointer_index, _PAD_PEAK_UA, ok)
ticks = _read_pad_i32(stem, pointer_index, _PAD_TICKS, ok)
above_ticks = _read_pad_i32(stem, pointer_index, _PAD_ABOVE_THR_TICKS, ok)
period_us = _read_pad_i32(stem, pointer_index, _PAD_SAMPLE_PERIOD_US, ok)
sn_res = stem.system.getSerialNumber()
serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?"
if peak_ua is None or ticks is None or above_ticks is None or period_us is None:
return {
"error": "scratchpad_read_failed",
"hub": hub_idx_0 + 1,
"port": port,
"serial": serial,
}
peak_ma = peak_ua / 1000.0
p_us = max(int(period_us), 0)
t = max(int(ticks), 0)
at = max(int(above_ticks), 0)
obs_us = t * p_us
above_us = at * p_us
return {
"mode": "on-hub",
"hub": hub_idx_0 + 1,
"port": port,
"serial": serial,
"peak_ma": round(peak_ma, 3),
"peak_ua": peak_ua,
"ticks": t,
"sample_period_us": p_us,
"observation_us": obs_us,
"above_threshold_ticks": at,
"above_threshold_us": above_us,
"reflex_threshold_ma": _REFLEX_THRESHOLD_MA,
"sample_window_ms": round(sample_s * 1000.0, 3),
"power_cycled": power_cycle,
"map_store_index": store_index,
"map_slot": map_slot,
"pointer_index": pointer_index,
}
def _measure_inrush_host(
c,
hub_idx_0: int,
port: int,
*,
off_s: float,
sample_s: float,
interval_s: float,
threshold_ma: float,
power_cycle: bool,
) -> dict[str, object]:
stem = c.hubs[hub_idx_0]
ok = c.SUCCESS
if power_cycle:
stem.usb.setPortDisable(port)
time.sleep(off_s)
stem.usb.setPortEnable(port)
interval_s = max(interval_s, 0.0005)
n_samples = max(1, int(round(sample_s / interval_s)))
samples: list[tuple[float, float]] = []
t0 = time.perf_counter()
peak_ma = float("-inf")
peak_first_s = 0.0
first_above_s: float | None = None
above_count = 0
for i in range(n_samples):
_sleep_until(t0 + i * interval_s)
rel_s = time.perf_counter() - t0
cr = stem.usb.getPortCurrent(port)
if cr.error != ok:
ma = float("nan")
else:
ma = cr.value / 1000.0
samples.append((rel_s, ma))
if not math.isnan(ma):
if ma > peak_ma:
peak_ma = ma
peak_first_s = rel_s
if ma > threshold_ma:
above_count += 1
if first_above_s is None:
first_above_s = rel_s
t_end = time.perf_counter()
if peak_ma == float("-inf") or math.isnan(peak_ma):
peak_ma = 0.0
above_s = above_count * interval_s
sn_res = stem.system.getSerialNumber()
serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?"
return {
"mode": "host-sample",
"hub": hub_idx_0 + 1,
"port": port,
"serial": serial,
"peak_ma": round(peak_ma, 3),
"peak_time_ms": round(peak_first_s * 1000.0, 3),
"first_above_threshold_ms": (
None
if first_above_s is None
else round(first_above_s * 1000.0, 3)
),
"above_threshold_ma": threshold_ma,
"above_threshold_ms": round(above_s * 1000.0, 3),
"sample_count": len(samples),
"sample_window_ms": round(sample_s * 1000.0, 3),
"elapsed_ms": round((t_end - t0) * 1000.0, 3),
"interval_ms": round(interval_s * 1000.0, 3),
"power_cycled": power_cycle,
}
def main() -> int:
p = argparse.ArgumentParser(
description="USB inrush: default on-hub (reflex/inrush.reflex scratchpad); optional host polling.",
)
p.add_argument("hub", type=int, help="Hub index 1-based (first hub = 1)")
p.add_argument("port", type=int, help="Downstream port index 0-based")
p.add_argument(
"-c",
"--config",
metavar="PROFILE_OR_INI",
help="Set FIWI_CONFIG (profile name or absolute *.ini)",
)
p.add_argument(
"--host-sample",
action="store_true",
help="Poll getPortCurrent on the PC (legacy); default is on-hub reflex scratchpad.",
)
p.add_argument(
"--off-ms",
type=float,
default=250.0,
help="Milliseconds off before re-enable when power-cycling (default 250)",
)
p.add_argument(
"--sample-ms",
type=float,
default=500.0,
help="After re-arm (+ optional power-on), wait this long on host while hub samples (default 500)",
)
p.add_argument(
"--interval-ms",
type=float,
default=1.0,
help="(host-sample only) ms between getPortCurrent reads (default 1)",
)
p.add_argument(
"--threshold-ma",
type=float,
default=50.0,
help="(host-sample only) threshold for above-threshold tally",
)
p.add_argument(
"--no-power-cycle",
action="store_true",
help="Do not disable/enable the downstream port",
)
p.add_argument(
"--map-store-index",
type=int,
default=1,
help="stem.store index for slotDisable/Enable (USBHub3p: 0 internal, 1 RAM; default 1)",
)
p.add_argument(
"--map-slot",
type=int,
default=0,
help="Store slot number where inrush.map is loaded (default 0)",
)
p.add_argument(
"--pointer-index",
type=int,
default=0,
help="stem.pointer index for reflex scratchpad (default 0)",
)
p.add_argument(
"--no-rearm-map",
action="store_true",
help="Do not slotDisable/slotEnable before measure (counters/timer may be stale)",
)
p.add_argument("--json", action="store_true", help="Print one JSON object on stdout")
args = p.parse_args()
if args.config:
os.environ["FIWI_CONFIG"] = args.config.strip()
if args.hub < 1:
print("hub must be >= 1", file=sys.stderr, flush=True)
return 2
if args.port < 0:
print("port must be >= 0", file=sys.stderr, flush=True)
return 2
off_s = max(args.off_ms / 1000.0, 0.0)
sample_s = max(args.sample_ms / 1000.0, 0.01)
interval_s = max(args.interval_ms / 1000.0, 0.0005)
_configure_paths()
from fiwi.concentrator import FiWiConcentrator
c = FiWiConcentrator()
try:
if not c.connect():
print("No local USB power-control hubs connected.", file=sys.stderr, flush=True)
return 1
hi = args.hub - 1
if hi < 0 or hi >= len(c.hubs):
print(
f"Hub {args.hub} invalid (have {len(c.hubs)} hub(s)).",
file=sys.stderr,
flush=True,
)
return 1
stem = c.hubs[hi]
n = c._port_count(stem)
if args.port < 0 or args.port >= n:
print(
f"Port {args.port} out of range for hub {args.hub} (0..{n - 1}).",
file=sys.stderr,
flush=True,
)
return 1
if not args.host_sample and args.port != 0:
print(
"check_inrush: on-hub reflex/inrush.reflex monitors port 0 only; "
f"you asked for port {args.port}. Recompile Reflex or use --host-sample.",
file=sys.stderr,
flush=True,
)
if args.host_sample:
out = _measure_inrush_host(
c,
hi,
args.port,
off_s=off_s,
sample_s=sample_s,
interval_s=interval_s,
threshold_ma=args.threshold_ma,
power_cycle=not args.no_power_cycle,
)
else:
out = _measure_inrush_on_hub(
c,
hi,
args.port,
off_s=off_s,
sample_s=sample_s,
power_cycle=not args.no_power_cycle,
store_index=args.map_store_index,
map_slot=args.map_slot,
pointer_index=args.pointer_index,
rearm_map=not args.no_rearm_map,
)
if out.get("error") == "rearm_map_failed":
print(
"check_inrush: could not re-arm map (store slotDisable/Enable failed). "
"Load reflex/inrush.map into --map-store-index / --map-slot, or try --no-rearm-map.",
file=sys.stderr,
flush=True,
)
return 1
if out.get("error") == "scratchpad_read_failed":
print(
"check_inrush: scratchpad read failed (pointer offset / hub type). "
"Confirm inrush.map is loaded, mapEnable ran, and pointer index matches Reflex.",
file=sys.stderr,
flush=True,
)
return 1
if args.json:
print(json.dumps(out), flush=True)
else:
print(
f"Hub {out['hub']} port {out['port']} serial {out['serial']}",
flush=True,
)
if out.get("mode") == "on-hub":
print(f" mode: on-hub reflex (scratchpad via pointer {args.pointer_index})", flush=True)
print(
f" peak current: {out['peak_ma']} mA (peak_ua={out['peak_ua']})",
flush=True,
)
print(
f" hub ticks: {out['ticks']} @ {out['sample_period_us']} µs → "
f"~{out['observation_us']} µs on timer grid",
flush=True,
)
print(
f" > {_REFLEX_THRESHOLD_MA} mA (reflex): ~{out['above_threshold_us']} µs "
f"({out['above_threshold_ticks']} ticks)",
flush=True,
)
print(
f" host wait: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']} "
f"store[{out['map_store_index']}] slot {out['map_slot']}",
flush=True,
)
if out["ticks"] == 0 and out.get("sample_period_us", 0) == 0:
print(
" ! ticks and sample_period_us are 0 — map likely not running or wrong pointer.",
file=sys.stderr,
flush=True,
)
else:
fat = out["first_above_threshold_ms"]
fat_s = f"{fat} ms" if fat is not None else "n/a"
print(f" mode: host-sample (getPortCurrent loop)", flush=True)
print(
f" peak current: {out['peak_ma']} mA (when max first rose: {out['peak_time_ms']} ms)",
flush=True,
)
print(
f" first > {out['above_threshold_ma']} mA: {fat_s}",
flush=True,
)
print(
f" > {out['above_threshold_ma']} mA for ~{out['above_threshold_ms']} ms "
f"({out['sample_count']} samples @ {out['interval_ms']} ms; wall {out['elapsed_ms']} ms)",
flush=True,
)
print(
f" nominal window: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']}",
flush=True,
)
return 0
finally:
c.disconnect()
if __name__ == "__main__":
raise SystemExit(main())