diff --git a/README.md b/README.md index 6d4131b..48c6484 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ This repository ships that distribution (**`fiwicontrol`** on PyPI / `pip`) with 1. **`fiwicontrol.commands`** — run commands on remote rigs via OpenSSH or **`ush`** (`Command`, `CommandManager`, `ssh_node`). Remote Pi bootstrap: **`python3 -m fiwicontrol.commands --remote-repo …`** (see **`docs/install.md`**). Must not import **`fiwicontrol.power`**. 2. **`fiwicontrol.lab`** — USB discovery (Acroname / Monsoon) and **`configs/*.ini`** inventory load + verify. Imports **`fiwicontrol.commands`** for SSH discovery only. 3. **`fiwicontrol.power`** — **`Power`** (``.on()`` / ``.off()`` / ``.voltage()`` / …) via **`AcronamePower`** and **`MonsoonPower`**, plus a small CLI (**`--discovery-json`**, **`--list-power-devices`**, **`--verify-inventory`** with **`-c`**). Re-exports **`fiwicontrol.lab`** discovery/inventory for compatibility. May import **`fiwicontrol.commands`** and **`fiwicontrol.lab`**; the reverse is forbidden. -4. **`fiwicontrol.flows`** — async **iperf**-driven traffic **flows** over **`ssh`** (remote server/client processes, sampling, histogram / KS tooling). Lives in **`flows/flows.py`**, structured for **Python 3.11** (**`asyncio.timeout`**, **`asyncio.gather`**, **`await`** entrypoints such as **`iperf_flow.arun`**; sync **`run`** / **`commence`** / … call **`asyncio.run`** only when no loop is already running). Depends on **SciPy**, **NumPy**, **Matplotlib**, and a compatible **iperf** binary on endpoints—use **`pip install -e ".[flows]"`** when you need it; **`import fiwicontrol.flows`** alone stays lightweight (lazy load). Not yet integrated with **`ssh_node`** or **`iperf 2` on Pi 5** roadmap text above—that wiring is follow-on work. +4. **`fiwicontrol.flows`** — async **iperf**-driven traffic **flows** over **`ssh`** (remote server/client processes, sampling, histogram / KS tooling). Lives in **`flows/flows.py`**, structured for **Python 3.11** (**`asyncio.timeout`**, **`asyncio.gather`**, **`await iperf_flow.run_traffic`** and related coroutines; sync **`run`** / **`commence`** / … call **`asyncio.run`** only when no loop is already running). Depends on **SciPy**, **NumPy**, **Matplotlib**, and a compatible **iperf** binary on endpoints—use **`pip install -e ".[flows]"`** when you need it; **`import fiwicontrol.flows`** alone stays lightweight (lazy load). Not yet integrated with **`ssh_node`** or **`iperf 2` on Pi 5** roadmap text above—that wiring is follow-on work. 5. **`fiwicontrol.spc`** — **SPC** primitives: **`ShewhartControlChart`** (individuals / MR limits) and **`HotellingT2`** (Phase I mean–covariance, *T*² and UCL). Under **`spc/`** (not **`flows/`**). Optional **`pip install -e ".[spc]"`** (**NumPy**, **SciPy**); **`import fiwicontrol.spc`** is lazy until you reference a class. ## Near-term focus: PCIe hot-swap testing diff --git a/docs/flows.md b/docs/flows.md index 68cbbc1..282740f 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -31,16 +31,16 @@ Code is structured for **Python 3.11+**: ### Async entrypoints (preferred) -Use these **inside** an async function (same task / event loop as the rest of your app): +Use these **inside** an async function (same task / event loop as the rest of your app). The **`await`** makes asynchrony obvious at the call site, so names are plain verbs (no **`a*`** prefix). -| Async API | Role | +| Coroutine | Role | |-----------|------| -| **`await iperf_flow.arun(...)`** | Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. | -| **`await iperf_flow.acommence(...)`** | Start servers and clients (no full **`arun`** teardown path). | -| **`await iperf_flow.acease(...)`** | Signal stop to TX then RX for selected flows. | -| **`await iperf_flow.aplot(...)`** | Schedule histogram async plots under a timeout. | -| **`await iperf_flow.aclose_loop()`** | **`await`** **`shutdown_asyncgens()`** on the **running** loop (rare; advanced). | -| **`await flow.acompute_ks_table(...)`** | KS table / clustering work for one **`iperf_flow`** instance. | +| **`await iperf_flow.run_traffic(...)`** | Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. | +| **`await iperf_flow.start_traffic(...)`** | Start servers and clients only (no **`run_traffic`** wait/teardown path). | +| **`await iperf_flow.stop_traffic(...)`** | Signal stop to TX then RX for selected flows. | +| **`await iperf_flow.plot_histograms(...)`** | Schedule histogram async plots under a timeout. | +| **`await iperf_flow.shutdown_async_generators()`** | **`await`** **`shutdown_asyncgens()`** on the **running** loop (rare; advanced). | +| **`await flow.analyze_ks_clusters(...)`** | KS table, optional plots, and clustering / linkage for one **`iperf_flow`** instance. | Example skeleton: @@ -50,14 +50,14 @@ from fiwicontrol.flows import iperf_flow async def main() -> None: # construct iperf_flow instances (server/client endpoints) … - await iperf_flow.arun(time=30, flows="all", preclean=True) + await iperf_flow.run_traffic(time=30, flows="all", preclean=True) asyncio.run(main()) ``` ### Sync wrappers (legacy / scripts) -**`iperf_flow.run`**, **`commence`**, **`plot`**, **`cease`**, **`sleep`**, **`compute_ks_table`**, and **`close_loop`** call **`asyncio.run(...)`** **only when no event loop is already running**. If you call them while **`asyncio.get_running_loop()`** succeeds, they raise **`RuntimeError`** instructing you to use the **`a*`** async methods instead. +**`iperf_flow.run`**, **`commence`**, **`plot`**, **`cease`**, **`sleep`**, **`compute_ks_table`**, and **`close_loop`** call **`asyncio.run(...)`** **only when no event loop is already running**. If you call them while **`asyncio.get_running_loop()`** succeeds, they raise **`RuntimeError`** instructing you to use the **`await …`** coroutines listed above instead. --- @@ -73,7 +73,7 @@ Construct **`iperf_flow`** with **`server`** and **`client`** objects that expos ## SSH and cleanup -**`iperf_flow.cleanup`** runs **`ssh user@host pkill iperf`** (configurable **`sshcmd`** / **`user`**). **`arun`** can **`preclean`** those hosts before starting traffic. +**`iperf_flow.cleanup`** runs **`ssh user@host pkill iperf`** (configurable **`sshcmd`** / **`user`**). **`run_traffic`** (and **`start_traffic`** with **`preclean=True`**) can **`preclean`** those hosts before starting traffic. For workstation SSH quirks (Fedora **`ssh_config.d`**), see **`docs/install.md`** and **`FIWI_SSH_CONFIG`** (pytest sets a minimal config by default). diff --git a/src/fiwicontrol/flows/__init__.py b/src/fiwicontrol/flows/__init__.py index 034836a..e1a6e1d 100644 --- a/src/fiwicontrol/flows/__init__.py +++ b/src/fiwicontrol/flows/__init__.py @@ -10,9 +10,13 @@ optional dependencies (**SciPy**, **NumPy**, **Matplotlib**, …). Use Public symbols are loaded lazily so ``import fiwicontrol.flows`` stays light. -From async code, prefer awaiting :meth:`fiwicontrol.flows.flows.iperf_flow.arun` (and -``acommence`` / ``aplot`` / ``acease`` / ``acompute_ks_table``). The legacy ``run`` / -``commence`` / … methods call :func:`asyncio.run` only when **no** event loop is already +From async code, prefer ``await`` on :meth:`fiwicontrol.flows.flows.iperf_flow.run_traffic`, +:meth:`~fiwicontrol.flows.flows.iperf_flow.start_traffic`, +:meth:`~fiwicontrol.flows.flows.iperf_flow.stop_traffic`, +:meth:`~fiwicontrol.flows.flows.iperf_flow.plot_histograms`, +:meth:`~fiwicontrol.flows.flows.iperf_flow.shutdown_async_generators`, and (per flow) +:meth:`~fiwicontrol.flows.flows.iperf_flow.analyze_ks_clusters`. The synchronous ``run`` / +``commence`` / … helpers call :func:`asyncio.run` only when **no** event loop is already running (Python 3.11+). """ diff --git a/src/fiwicontrol/flows/flows.py b/src/fiwicontrol/flows/flows.py index d277160..61b6dff 100644 --- a/src/fiwicontrol/flows/flows.py +++ b/src/fiwicontrol/flows/flows.py @@ -58,7 +58,7 @@ def _run_coro(coro: collections.abc.Coroutine) -> None: else: raise RuntimeError( "iperf_flow synchronous helpers cannot run under an active event loop; " - "use the async entrypoints (e.g. await iperf_flow.arun(...)) instead." + "use the async entrypoints (e.g. await iperf_flow.run_traffic(...)) instead." ) @@ -75,14 +75,14 @@ class iperf_flow(object): return list(iperf_flow.instances) @classmethod - async def aclose_loop(cls) -> None: + async def shutdown_async_generators(cls) -> None: """Shut down async generators on the **running** loop (advanced cleanup).""" await asyncio.get_running_loop().shutdown_asyncgens() @classmethod def close_loop(cls) -> None: - """Sync wrapper for :meth:`aclose_loop` (no active loop).""" - _run_coro(cls.aclose_loop()) + """Sync wrapper for :meth:`shutdown_async_generators` (no active loop).""" + _run_coro(cls.shutdown_async_generators()) @classmethod def sleep(cls, delay: float = 0, text: str | None = None, stoptext: str | None = None) -> None: @@ -90,7 +90,7 @@ class iperf_flow(object): _run_coro(_sleep_logs(delay, text, stoptext)) @classmethod - async def arun( + async def run_traffic( cls, time=None, amount=None, @@ -187,9 +187,12 @@ class iperf_flow(object): parallel=None, epoch_sync=False, ) -> None: - """Run traffic flows (uses :func:`asyncio.run` when no loop is running).""" + """Blocking full traffic run (uses :func:`asyncio.run` when no loop is running). + + From async code, use ``await`` :meth:`run_traffic` instead. + """ _run_coro( - cls.arun( + cls.run_traffic( time=time, amount=amount, flows=flows, @@ -202,7 +205,7 @@ class iperf_flow(object): ) @classmethod - async def acommence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None: + async def start_traffic(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None: if flows == 'all': flows = iperf_flow.get_instances() if not flows: @@ -234,10 +237,10 @@ class iperf_flow(object): @classmethod def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None: - _run_coro(cls.acommence(time=time, flows=flows, sample_delay=sample_delay, io_timer=io_timer, preclean=preclean)) + _run_coro(cls.start_traffic(time=time, flows=flows, sample_delay=sample_delay, io_timer=io_timer, preclean=preclean)) @classmethod - async def aplot(cls, flows='all', title='None', directory='None') -> None: + async def plot_histograms(cls, flows='all', title='None', directory='None') -> None: if flows == 'all': flows = iperf_flow.get_instances() @@ -266,10 +269,10 @@ class iperf_flow(object): @classmethod def plot(cls, flows='all', title='None', directory='None') -> None: - _run_coro(cls.aplot(flows=flows, title=title, directory=directory)) + _run_coro(cls.plot_histograms(flows=flows, title=title, directory=directory)) @classmethod - async def acease(cls, flows='all') -> None: + async def stop_traffic(cls, flows='all') -> None: if flows == 'all': flows = iperf_flow.get_instances() @@ -285,7 +288,7 @@ class iperf_flow(object): @classmethod def cease(cls, flows='all') -> None: - _run_coro(cls.acease(flows=flows)) + _run_coro(cls.stop_traffic(flows=flows)) @classmethod async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') : @@ -468,7 +471,7 @@ class iperf_flow(object): def stats(self): logging.info('stats') - async def acompute_ks_table(self, plot=True, directory='.', title=None) -> None: + async def analyze_ks_clusters(self, plot=True, directory='.', title=None) -> None: if len(self.histogram_names) < 1: tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names)) logging.info(tmp) @@ -538,8 +541,8 @@ class iperf_flow(object): pass def compute_ks_table(self, plot=True, directory='.', title=None) -> None: - """Sync wrapper; prefer :meth:`acompute_ks_table` inside async code.""" - _run_coro(self.acompute_ks_table(plot=plot, directory=directory, title=title)) + """Sync wrapper; in async code use ``await flow.analyze_ks_clusters(...)``.""" + _run_coro(self.analyze_ks_clusters(plot=plot, directory=directory, title=title)) def dump_stats(self, directory='.') : logging.info("\n********************** dump_stats for flow {} **********************".format(self.name))