486 lines
15 KiB
Python
486 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Measure **USB inrush** on a local BrainStem hub port.
|
|
|
|
**Default — on-hub sampling** (``reflex/inrush.reflex``):
|
|
|
|
- Load the compiled ``.map`` into a hub store (ReflexLoader / HubTool), then run this script.
|
|
- Re-arms the map (``slotDisable`` / ``slotEnable``) to run ``mapEnable`` so Timer[0] and scratchpad
|
|
reset, optionally power-cycles the downstream port, **waits** ``--sample-ms`` on the host while
|
|
the hub samples current at its timer rate (e.g. 100 µs), then reads **scratchpad** via
|
|
``pointer[N]`` (offsets match ``reflex/inrush.reflex``).
|
|
|
|
**Legacy — host sampling** (``--host-sample``):
|
|
|
|
- Poll ``getPortCurrent`` on a deadline grid (default 1 ms). No Reflex map required.
|
|
|
|
**Standalone**::
|
|
|
|
python tests/check_inrush.py 1 0
|
|
python tests/check_inrush.py 1 0 --host-sample
|
|
python tests/check_inrush.py --config uax24 2 4 --map-slot 1 --json
|
|
|
|
``hub`` is **1-based**; ``port`` is **0-based**. On-hub mode: ``reflex/inrush.reflex`` is built for
|
|
**port 0** only unless you recompile with another index. ``--config`` sets ``FIWI_CONFIG``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
|
|
if _ROOT not in sys.path:
|
|
sys.path.insert(0, _ROOT)
|
|
|
|
# Must match reflex/inrush.reflex scratchpad layout (byte offsets into pad).
|
|
_PAD_PEAK_UA = 0
|
|
_PAD_TICKS = 4
|
|
_PAD_ABOVE_THR_TICKS = 8
|
|
_PAD_SAMPLE_PERIOD_US = 12
|
|
|
|
# Reflex uses 50 mA threshold (50000 µA); host --threshold-ma does not change on-hub tallies.
|
|
_REFLEX_THRESHOLD_MA = 50.0
|
|
|
|
|
|
def _configure_paths() -> None:
|
|
import fiwi.paths as paths_mod
|
|
|
|
paths_mod.configure(_ROOT)
|
|
|
|
|
|
def _sleep_until(deadline: float) -> None:
|
|
while True:
|
|
now = time.perf_counter()
|
|
if now >= deadline:
|
|
return
|
|
time.sleep(deadline - now)
|
|
|
|
|
|
def _i32(v: int) -> int:
|
|
v &= 0xFFFFFFFF
|
|
if v >= 0x80000000:
|
|
return v - 0x100000000
|
|
return v
|
|
|
|
|
|
def _read_pad_i32(stem, pointer_index: int, byte_offset: int, ok) -> int | None:
|
|
"""Read signed 32-bit from reflex scratchpad at byte offset."""
|
|
try:
|
|
ptr = stem.pointer[pointer_index]
|
|
except (IndexError, AttributeError, TypeError):
|
|
return None
|
|
pm = getattr(ptr, "POINTER_MODE_STATIC", 0)
|
|
try:
|
|
ptr.setMode(pm)
|
|
except Exception:
|
|
pass
|
|
if ptr.setOffset(byte_offset) != ok:
|
|
return None
|
|
r = ptr.getInt()
|
|
if r.error != ok:
|
|
return None
|
|
return _i32(int(r.value))
|
|
|
|
|
|
def _rearm_map_slot(stem, store_index: int, slot: int, ok) -> bool:
|
|
"""Disable/enable store slot to re-trigger ``mapEnable`` on the loaded reflex map."""
|
|
try:
|
|
st = stem.store[store_index]
|
|
except (IndexError, AttributeError, TypeError):
|
|
print(
|
|
"check_inrush: hub has no store at this index (on-hub mode needs a loaded .map).",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return False
|
|
if st.slotDisable(slot) != ok:
|
|
return False
|
|
time.sleep(0.05)
|
|
if st.slotEnable(slot) != ok:
|
|
return False
|
|
time.sleep(0.02)
|
|
return True
|
|
|
|
|
|
def _measure_inrush_on_hub(
|
|
c,
|
|
hub_idx_0: int,
|
|
port: int,
|
|
*,
|
|
off_s: float,
|
|
sample_s: float,
|
|
power_cycle: bool,
|
|
store_index: int,
|
|
map_slot: int,
|
|
pointer_index: int,
|
|
rearm_map: bool,
|
|
) -> dict[str, object]:
|
|
stem = c.hubs[hub_idx_0]
|
|
ok = c.SUCCESS
|
|
|
|
if power_cycle:
|
|
stem.usb.setPortDisable(port)
|
|
time.sleep(off_s)
|
|
|
|
if rearm_map:
|
|
if not _rearm_map_slot(stem, store_index, map_slot, ok):
|
|
return {"error": "rearm_map_failed"}
|
|
|
|
if power_cycle:
|
|
stem.usb.setPortEnable(port)
|
|
|
|
time.sleep(sample_s)
|
|
|
|
peak_ua = _read_pad_i32(stem, pointer_index, _PAD_PEAK_UA, ok)
|
|
ticks = _read_pad_i32(stem, pointer_index, _PAD_TICKS, ok)
|
|
above_ticks = _read_pad_i32(stem, pointer_index, _PAD_ABOVE_THR_TICKS, ok)
|
|
period_us = _read_pad_i32(stem, pointer_index, _PAD_SAMPLE_PERIOD_US, ok)
|
|
|
|
sn_res = stem.system.getSerialNumber()
|
|
serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?"
|
|
|
|
if peak_ua is None or ticks is None or above_ticks is None or period_us is None:
|
|
return {
|
|
"error": "scratchpad_read_failed",
|
|
"hub": hub_idx_0 + 1,
|
|
"port": port,
|
|
"serial": serial,
|
|
}
|
|
|
|
peak_ma = peak_ua / 1000.0
|
|
p_us = max(int(period_us), 0)
|
|
t = max(int(ticks), 0)
|
|
at = max(int(above_ticks), 0)
|
|
obs_us = t * p_us
|
|
above_us = at * p_us
|
|
|
|
return {
|
|
"mode": "on-hub",
|
|
"hub": hub_idx_0 + 1,
|
|
"port": port,
|
|
"serial": serial,
|
|
"peak_ma": round(peak_ma, 3),
|
|
"peak_ua": peak_ua,
|
|
"ticks": t,
|
|
"sample_period_us": p_us,
|
|
"observation_us": obs_us,
|
|
"above_threshold_ticks": at,
|
|
"above_threshold_us": above_us,
|
|
"reflex_threshold_ma": _REFLEX_THRESHOLD_MA,
|
|
"sample_window_ms": round(sample_s * 1000.0, 3),
|
|
"power_cycled": power_cycle,
|
|
"map_store_index": store_index,
|
|
"map_slot": map_slot,
|
|
"pointer_index": pointer_index,
|
|
}
|
|
|
|
|
|
def _measure_inrush_host(
|
|
c,
|
|
hub_idx_0: int,
|
|
port: int,
|
|
*,
|
|
off_s: float,
|
|
sample_s: float,
|
|
interval_s: float,
|
|
threshold_ma: float,
|
|
power_cycle: bool,
|
|
) -> dict[str, object]:
|
|
stem = c.hubs[hub_idx_0]
|
|
ok = c.SUCCESS
|
|
|
|
if power_cycle:
|
|
stem.usb.setPortDisable(port)
|
|
time.sleep(off_s)
|
|
stem.usb.setPortEnable(port)
|
|
|
|
interval_s = max(interval_s, 0.0005)
|
|
n_samples = max(1, int(round(sample_s / interval_s)))
|
|
|
|
samples: list[tuple[float, float]] = []
|
|
t0 = time.perf_counter()
|
|
peak_ma = float("-inf")
|
|
peak_first_s = 0.0
|
|
first_above_s: float | None = None
|
|
above_count = 0
|
|
|
|
for i in range(n_samples):
|
|
_sleep_until(t0 + i * interval_s)
|
|
rel_s = time.perf_counter() - t0
|
|
cr = stem.usb.getPortCurrent(port)
|
|
if cr.error != ok:
|
|
ma = float("nan")
|
|
else:
|
|
ma = cr.value / 1000.0
|
|
samples.append((rel_s, ma))
|
|
|
|
if not math.isnan(ma):
|
|
if ma > peak_ma:
|
|
peak_ma = ma
|
|
peak_first_s = rel_s
|
|
if ma > threshold_ma:
|
|
above_count += 1
|
|
if first_above_s is None:
|
|
first_above_s = rel_s
|
|
|
|
t_end = time.perf_counter()
|
|
if peak_ma == float("-inf") or math.isnan(peak_ma):
|
|
peak_ma = 0.0
|
|
|
|
above_s = above_count * interval_s
|
|
|
|
sn_res = stem.system.getSerialNumber()
|
|
serial = f"0x{sn_res.value:08X}" if sn_res.error == ok else "?"
|
|
|
|
return {
|
|
"mode": "host-sample",
|
|
"hub": hub_idx_0 + 1,
|
|
"port": port,
|
|
"serial": serial,
|
|
"peak_ma": round(peak_ma, 3),
|
|
"peak_time_ms": round(peak_first_s * 1000.0, 3),
|
|
"first_above_threshold_ms": (
|
|
None
|
|
if first_above_s is None
|
|
else round(first_above_s * 1000.0, 3)
|
|
),
|
|
"above_threshold_ma": threshold_ma,
|
|
"above_threshold_ms": round(above_s * 1000.0, 3),
|
|
"sample_count": len(samples),
|
|
"sample_window_ms": round(sample_s * 1000.0, 3),
|
|
"elapsed_ms": round((t_end - t0) * 1000.0, 3),
|
|
"interval_ms": round(interval_s * 1000.0, 3),
|
|
"power_cycled": power_cycle,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="USB inrush: default on-hub (reflex/inrush.reflex scratchpad); optional host polling.",
|
|
)
|
|
p.add_argument("hub", type=int, help="Hub index 1-based (first hub = 1)")
|
|
p.add_argument("port", type=int, help="Downstream port index 0-based")
|
|
p.add_argument(
|
|
"-c",
|
|
"--config",
|
|
metavar="PROFILE_OR_INI",
|
|
help="Set FIWI_CONFIG (profile name or absolute *.ini)",
|
|
)
|
|
p.add_argument(
|
|
"--host-sample",
|
|
action="store_true",
|
|
help="Poll getPortCurrent on the PC (legacy); default is on-hub reflex scratchpad.",
|
|
)
|
|
p.add_argument(
|
|
"--off-ms",
|
|
type=float,
|
|
default=250.0,
|
|
help="Milliseconds off before re-enable when power-cycling (default 250)",
|
|
)
|
|
p.add_argument(
|
|
"--sample-ms",
|
|
type=float,
|
|
default=500.0,
|
|
help="After re-arm (+ optional power-on), wait this long on host while hub samples (default 500)",
|
|
)
|
|
p.add_argument(
|
|
"--interval-ms",
|
|
type=float,
|
|
default=1.0,
|
|
help="(host-sample only) ms between getPortCurrent reads (default 1)",
|
|
)
|
|
p.add_argument(
|
|
"--threshold-ma",
|
|
type=float,
|
|
default=50.0,
|
|
help="(host-sample only) threshold for above-threshold tally",
|
|
)
|
|
p.add_argument(
|
|
"--no-power-cycle",
|
|
action="store_true",
|
|
help="Do not disable/enable the downstream port",
|
|
)
|
|
p.add_argument(
|
|
"--map-store-index",
|
|
type=int,
|
|
default=1,
|
|
help="stem.store index for slotDisable/Enable (USBHub3p: 0 internal, 1 RAM; default 1)",
|
|
)
|
|
p.add_argument(
|
|
"--map-slot",
|
|
type=int,
|
|
default=0,
|
|
help="Store slot number where inrush.map is loaded (default 0)",
|
|
)
|
|
p.add_argument(
|
|
"--pointer-index",
|
|
type=int,
|
|
default=0,
|
|
help="stem.pointer index for reflex scratchpad (default 0)",
|
|
)
|
|
p.add_argument(
|
|
"--no-rearm-map",
|
|
action="store_true",
|
|
help="Do not slotDisable/slotEnable before measure (counters/timer may be stale)",
|
|
)
|
|
p.add_argument("--json", action="store_true", help="Print one JSON object on stdout")
|
|
args = p.parse_args()
|
|
|
|
if args.config:
|
|
os.environ["FIWI_CONFIG"] = args.config.strip()
|
|
|
|
if args.hub < 1:
|
|
print("hub must be >= 1", file=sys.stderr, flush=True)
|
|
return 2
|
|
if args.port < 0:
|
|
print("port must be >= 0", file=sys.stderr, flush=True)
|
|
return 2
|
|
|
|
off_s = max(args.off_ms / 1000.0, 0.0)
|
|
sample_s = max(args.sample_ms / 1000.0, 0.01)
|
|
interval_s = max(args.interval_ms / 1000.0, 0.0005)
|
|
|
|
_configure_paths()
|
|
from fiwi.concentrator import FiWiConcentrator
|
|
|
|
c = FiWiConcentrator()
|
|
try:
|
|
if not c.connect():
|
|
print("No local USB power-control hubs connected.", file=sys.stderr, flush=True)
|
|
return 1
|
|
hi = args.hub - 1
|
|
if hi < 0 or hi >= len(c.hubs):
|
|
print(
|
|
f"Hub {args.hub} invalid (have {len(c.hubs)} hub(s)).",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return 1
|
|
stem = c.hubs[hi]
|
|
n = c._port_count(stem)
|
|
if args.port < 0 or args.port >= n:
|
|
print(
|
|
f"Port {args.port} out of range for hub {args.hub} (0..{n - 1}).",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return 1
|
|
|
|
if not args.host_sample and args.port != 0:
|
|
print(
|
|
"check_inrush: on-hub reflex/inrush.reflex monitors port 0 only; "
|
|
f"you asked for port {args.port}. Recompile Reflex or use --host-sample.",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
|
|
if args.host_sample:
|
|
out = _measure_inrush_host(
|
|
c,
|
|
hi,
|
|
args.port,
|
|
off_s=off_s,
|
|
sample_s=sample_s,
|
|
interval_s=interval_s,
|
|
threshold_ma=args.threshold_ma,
|
|
power_cycle=not args.no_power_cycle,
|
|
)
|
|
else:
|
|
out = _measure_inrush_on_hub(
|
|
c,
|
|
hi,
|
|
args.port,
|
|
off_s=off_s,
|
|
sample_s=sample_s,
|
|
power_cycle=not args.no_power_cycle,
|
|
store_index=args.map_store_index,
|
|
map_slot=args.map_slot,
|
|
pointer_index=args.pointer_index,
|
|
rearm_map=not args.no_rearm_map,
|
|
)
|
|
|
|
if out.get("error") == "rearm_map_failed":
|
|
print(
|
|
"check_inrush: could not re-arm map (store slotDisable/Enable failed). "
|
|
"Load reflex/inrush.map into --map-store-index / --map-slot, or try --no-rearm-map.",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return 1
|
|
if out.get("error") == "scratchpad_read_failed":
|
|
print(
|
|
"check_inrush: scratchpad read failed (pointer offset / hub type). "
|
|
"Confirm inrush.map is loaded, mapEnable ran, and pointer index matches Reflex.",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return 1
|
|
|
|
if args.json:
|
|
print(json.dumps(out), flush=True)
|
|
else:
|
|
print(
|
|
f"Hub {out['hub']} port {out['port']} serial {out['serial']}",
|
|
flush=True,
|
|
)
|
|
if out.get("mode") == "on-hub":
|
|
print(f" mode: on-hub reflex (scratchpad via pointer {args.pointer_index})", flush=True)
|
|
print(
|
|
f" peak current: {out['peak_ma']} mA (peak_ua={out['peak_ua']})",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" hub ticks: {out['ticks']} @ {out['sample_period_us']} µs → "
|
|
f"~{out['observation_us']} µs on timer grid",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" > {_REFLEX_THRESHOLD_MA} mA (reflex): ~{out['above_threshold_us']} µs "
|
|
f"({out['above_threshold_ticks']} ticks)",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" host wait: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']} "
|
|
f"store[{out['map_store_index']}] slot {out['map_slot']}",
|
|
flush=True,
|
|
)
|
|
if out["ticks"] == 0 and out.get("sample_period_us", 0) == 0:
|
|
print(
|
|
" ! ticks and sample_period_us are 0 — map likely not running or wrong pointer.",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
else:
|
|
fat = out["first_above_threshold_ms"]
|
|
fat_s = f"{fat} ms" if fat is not None else "n/a"
|
|
print(f" mode: host-sample (getPortCurrent loop)", flush=True)
|
|
print(
|
|
f" peak current: {out['peak_ma']} mA (when max first rose: {out['peak_time_ms']} ms)",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" first > {out['above_threshold_ma']} mA: {fat_s}",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" > {out['above_threshold_ma']} mA for ~{out['above_threshold_ms']} ms "
|
|
f"({out['sample_count']} samples @ {out['interval_ms']} ms; wall {out['elapsed_ms']} ms)",
|
|
flush=True,
|
|
)
|
|
print(
|
|
f" nominal window: {out['sample_window_ms']} ms power_cycled: {out['power_cycled']}",
|
|
flush=True,
|
|
)
|
|
return 0
|
|
finally:
|
|
c.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|