5.2 KiB
FiWiControl flows — async iperf over SSH
Package: **fiwicontrol.flows**
Implementation: **src/fiwicontrol/flows/flows.py** (lazy-loaded from **fiwicontrol/flows/__init__.py**)
This module drives classic iperf (often iperf 2.x builds) on remote hosts over **ssh, parses streaming output into per-flow statistics, and supports histograms, Kolmogorov–Smirnov tables, and plotting (SciPy / Matplotlib). It is aimed at lab traffic and repeatable measurement, not at replacing **fiwicontrol.commands for general rig control.
Install
Flows pull optional heavy dependencies:
cd ~/Code/FiWiControl
python3 -m pip install -e ".[flows]"
**[flows]** includes NumPy, SciPy, and Matplotlib. The module also imports **tkinter** and expects a compatible **iperf** binary on each endpoint (paths in code default to **/usr/local/bin/iperf** on remotes and **/usr/bin/iperf** for the class default—adjust in your harness or fork if your layout differs).
**import fiwicontrol.flows** does not import **flows.py** immediately; symbols such as **iperf_flow** load on first attribute access.
Python 3.11 asyncio model
Code is structured for Python 3.11+:
**asyncio.timeout** plus**asyncio.gather**replace older**ensure_future(..., loop=...)**+**asyncio.wait**+**run_until_complete**patterns.- Subprocess transports use
**asyncio.get_running_loop().subprocess_exec(...)**.
Async entrypoints (preferred)
Use these inside an async function (same task / event loop as the rest of your app). The **await** makes asynchrony obvious at the call site, so names are plain verbs (no **a*** prefix).
| Coroutine | Role |
|---|---|
**await iperf_flow.run_traffic(...)** |
Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. |
**await iperf_flow.start_traffic(...)** |
Start servers and clients only (no **run_traffic** wait/teardown path). |
**await iperf_flow.stop_traffic(...)** |
Signal stop to TX then RX for selected flows. |
**await iperf_flow.plot_histograms(...)** |
Schedule histogram async plots under a timeout. |
**await iperf_flow.shutdown_async_generators()** |
**await** **shutdown_asyncgens()** on the running loop (rare; advanced). |
**await flow.analyze_ks_clusters(...)** |
KS table, optional plots, and clustering / linkage for one **iperf_flow** instance. |
Example skeleton:
import asyncio
from fiwicontrol.flows import iperf_flow
async def main() -> None:
# construct iperf_flow instances (server/client endpoints) …
await iperf_flow.run_traffic(time=30, flows="all", preclean=True)
asyncio.run(main())
Sync wrappers (scripts)
**iperf_flow.run**, **commence**, **plot**, **cease**, **sleep**, **compute_ks_table**, and **close_loop** call **asyncio.run(...)** only when no event loop is already running. If you call them while **asyncio.get_running_loop()** succeeds, they raise **RuntimeError** instructing you to use the **await …** coroutines listed above instead.
Core types
**iperf_flow**— One logical flow; holds**iperf_server**/**iperf_client**, stats dict, histogram metadata.**iperf_flow.instances**tracks active flows (**get_instances()**).**iperf_server**/**iperf_client**— Build**ssh user@host iperf ...**argv and drive**asynciosubprocess protocols** to parse iperf lines into**flowstats**.**flow_histogram**— PDF / histogram helpers and**gnuplot**integration for plots.
Construct **iperf_flow** with **server** and **client** objects that expose **ipaddr** (and optionally **device** for bind/interface). See **flows.py** **iperf_flow.__init__** for the full parameter surface (TOS, UDP/TCP, offered load, trip times, etc.).
SSH and cleanup
**iperf_flow.cleanup** runs **ssh user@host pkill iperf** (configurable **sshcmd** / **user**). **run_traffic** (and **start_traffic** with **preclean=True**) can **preclean** those hosts before starting traffic.
For workstation SSH quirks (Fedora **ssh_config.d**), see **docs/install.md** and **FIWI_SSH_CONFIG** (pytest sets a minimal config by default).
Integration status
**flows** is a standalone library in this repo: it is not wired into **ssh_node** or the Pi 5 iperf 2 lab narrative in **README.md** in this release.
See also
**README.md**— package list (**fronthaul**,**telemetry**,**radio**,**concentrator**,**fabric**, …),**src/fiwicontrol/**tree, and roadmap context.**docs/node-control-asyncio-design.md**— asyncio patterns for**ssh_node**.**docs/install.md**— workstation install and SSH notes.