# FiWiControl flows — async iperf over SSH **Package:** `**fiwicontrol.flows`** **Implementation:** `**src/fiwicontrol/flows/flows.py`** (lazy-loaded from `**fiwicontrol/flows/__init__.py**`) This module drives **classic iperf** (often **iperf 2.x** builds) on **remote hosts** over `**ssh`**, parses streaming output into **per-flow statistics**, and supports **histograms**, **Kolmogorov–Smirnov** tables, and **plotting** (SciPy / Matplotlib). It is aimed at **lab traffic** and repeatable measurement, not at replacing `**fiwicontrol.commands`** for general rig control. --- ## Install Flows pull **optional** heavy dependencies: ```bash cd ~/Code/FiWiControl python3 -m pip install -e ".[flows]" ``` `**[flows]**` includes **NumPy**, **SciPy**, and **Matplotlib**. The module also imports `**tkinter`** and expects a compatible `**iperf**` binary on **each endpoint** (paths in code default to `**/usr/local/bin/iperf`** on remotes and `**/usr/bin/iperf**` for the class default—adjust in your harness or fork if your layout differs). `**import fiwicontrol.flows**` does **not** import `**flows.py`** immediately; symbols such as `**iperf_flow**` load on first attribute access. --- ## Python 3.11 asyncio model Code is structured for **Python 3.11+**: - `**asyncio.timeout`** plus `**asyncio.gather**` replace older `**ensure_future(..., loop=...)**` + `**asyncio.wait**` + `**run_until_complete**` patterns. - Subprocess transports use `**asyncio.get_running_loop().subprocess_exec(...)**`. ### Async entrypoints (preferred) Use these **inside** an async function (same task / event loop as the rest of your app). The `**await`** makes asynchrony obvious at the call site, so names are plain verbs (no `**a***` prefix). | Coroutine | Role | | -------------------------------------------------- | -------------------------------------------------------------------------------------- | | `**await iperf_flow.run_traffic(...)**` | Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. | | `**await iperf_flow.start_traffic(...)**` | Start servers and clients only (no `**run_traffic**` wait/teardown path). | | `**await iperf_flow.stop_traffic(...)**` | Signal stop to TX then RX for selected flows. | | `**await iperf_flow.plot_histograms(...)**` | Schedule histogram async plots under a timeout. | | `**await iperf_flow.shutdown_async_generators()**` | `**await**` `**shutdown_asyncgens()**` on the **running** loop (rare; advanced). | | `**await flow.analyze_ks_clusters(...)`** | KS table, optional plots, and clustering / linkage for one `**iperf_flow**` instance. | Example skeleton: ```python import asyncio from fiwicontrol.flows import iperf_flow async def main() -> None: # construct iperf_flow instances (server/client endpoints) … await iperf_flow.run_traffic(time=30, flows="all", preclean=True) asyncio.run(main()) ``` ### Sync wrappers (scripts) `**iperf_flow.run**`, `**commence**`, `**plot**`, `**cease**`, `**sleep**`, `**compute_ks_table**`, and `**close_loop**` call `**asyncio.run(...)**` **only when no event loop is already running**. If you call them while `**asyncio.get_running_loop()`** succeeds, they raise `**RuntimeError**` instructing you to use the `**await …**` coroutines listed above instead. --- ## Core types - `**iperf_flow**` — One logical flow; holds `**iperf_server**` / `**iperf_client**`, stats dict, histogram metadata. `**iperf_flow.instances**` tracks active flows ( `**get_instances()**` ). - `**iperf_server**` / `**iperf_client**` — Build `**ssh user@host iperf ...**` argv and drive `**asyncio` subprocess protocols** to parse iperf lines into `**flowstats`**. - `**flow_histogram**` — PDF / histogram helpers and `**gnuplot**` integration for plots. Construct `**iperf_flow**` with `**server**` and `**client**` objects that expose `**ipaddr**` (and optionally `**device**` for bind/interface). See `**flows.py**` `**iperf_flow.__init__**` for the full parameter surface (TOS, UDP/TCP, offered load, trip times, etc.). --- ## SSH and cleanup `**iperf_flow.cleanup**` runs `**ssh user@host pkill iperf**` (configurable `**sshcmd**` / `**user**`). `**run_traffic**` (and `**start_traffic**` with `**preclean=True**`) can `**preclean**` those hosts before starting traffic. For workstation SSH quirks (Fedora `**ssh_config.d**`), see `**docs/install.md**` and `**FIWI_SSH_CONFIG**` (pytest sets a minimal config by default). --- ## Integration status `**flows**` is a **standalone** library in this repo: it is **not** wired into `**ssh_node`** or the Pi 5 **iperf 2** lab narrative in `**README.md`** in this release. --- ## See also - `**README.md**` — package list (`**fronthaul**`, `**telemetry**`, `**radio**`, `**concentrator**`, `**fabric**`, …), `**src/fiwicontrol/**` tree, and roadmap context. - `**docs/node-control-asyncio-design.md**` — asyncio patterns for `**ssh_node**`. - `**docs/install.md**` — workstation install and SSH notes.