refactor(flows): rename async iperf entrypoints for clarity
Replace a-prefixed coroutines with descriptive names: run_traffic, start_traffic, stop_traffic, plot_histograms, shutdown_async_generators, analyze_ks_clusters. Sync helpers unchanged; update flows package docstring, README, and docs/flows.md. Made-with: Cursor
This commit is contained in:
parent
1bcefa93c8
commit
ab345cbae8
|
|
@ -9,7 +9,7 @@ This repository ships that distribution (**`fiwicontrol`** on PyPI / `pip`) with
|
|||
1. **`fiwicontrol.commands`** — run commands on remote rigs via OpenSSH or **`ush`** (`Command`, `CommandManager`, `ssh_node`). Remote Pi bootstrap: **`python3 -m fiwicontrol.commands <ip> --remote-repo …`** (see **`docs/install.md`**). Must not import **`fiwicontrol.power`**.
|
||||
2. **`fiwicontrol.lab`** — USB discovery (Acroname / Monsoon) and **`configs/*.ini`** inventory load + verify. Imports **`fiwicontrol.commands`** for SSH discovery only.
|
||||
3. **`fiwicontrol.power`** — **`Power`** (``.on()`` / ``.off()`` / ``.voltage()`` / …) via **`AcronamePower`** and **`MonsoonPower`**, plus a small CLI (**`--discovery-json`**, **`--list-power-devices`**, **`--verify-inventory`** with **`-c`**). Re-exports **`fiwicontrol.lab`** discovery/inventory for compatibility. May import **`fiwicontrol.commands`** and **`fiwicontrol.lab`**; the reverse is forbidden.
|
||||
4. **`fiwicontrol.flows`** — async **iperf**-driven traffic **flows** over **`ssh`** (remote server/client processes, sampling, histogram / KS tooling). Lives in **`flows/flows.py`**, structured for **Python 3.11** (**`asyncio.timeout`**, **`asyncio.gather`**, **`await`** entrypoints such as **`iperf_flow.arun`**; sync **`run`** / **`commence`** / … call **`asyncio.run`** only when no loop is already running). Depends on **SciPy**, **NumPy**, **Matplotlib**, and a compatible **iperf** binary on endpoints—use **`pip install -e ".[flows]"`** when you need it; **`import fiwicontrol.flows`** alone stays lightweight (lazy load). Not yet integrated with **`ssh_node`** or **`iperf 2` on Pi 5** roadmap text above—that wiring is follow-on work.
|
||||
4. **`fiwicontrol.flows`** — async **iperf**-driven traffic **flows** over **`ssh`** (remote server/client processes, sampling, histogram / KS tooling). Lives in **`flows/flows.py`**, structured for **Python 3.11** (**`asyncio.timeout`**, **`asyncio.gather`**, **`await iperf_flow.run_traffic`** and related coroutines; sync **`run`** / **`commence`** / … call **`asyncio.run`** only when no loop is already running). Depends on **SciPy**, **NumPy**, **Matplotlib**, and a compatible **iperf** binary on endpoints—use **`pip install -e ".[flows]"`** when you need it; **`import fiwicontrol.flows`** alone stays lightweight (lazy load). Not yet integrated with **`ssh_node`** or **`iperf 2` on Pi 5** roadmap text above—that wiring is follow-on work.
|
||||
5. **`fiwicontrol.spc`** — **SPC** primitives: **`ShewhartControlChart`** (individuals / MR limits) and **`HotellingT2`** (Phase I mean–covariance, *T*² and UCL). Under **`spc/`** (not **`flows/`**). Optional **`pip install -e ".[spc]"`** (**NumPy**, **SciPy**); **`import fiwicontrol.spc`** is lazy until you reference a class.
|
||||
|
||||
## Near-term focus: PCIe hot-swap testing
|
||||
|
|
|
|||
|
|
@ -31,16 +31,16 @@ Code is structured for **Python 3.11+**:
|
|||
|
||||
### Async entrypoints (preferred)
|
||||
|
||||
Use these **inside** an async function (same task / event loop as the rest of your app):
|
||||
Use these **inside** an async function (same task / event loop as the rest of your app). The **`await`** makes asynchrony obvious at the call site, so names are plain verbs (no **`a*`** prefix).
|
||||
|
||||
| Async API | Role |
|
||||
| Coroutine | Role |
|
||||
|-----------|------|
|
||||
| **`await iperf_flow.arun(...)`** | Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. |
|
||||
| **`await iperf_flow.acommence(...)`** | Start servers and clients (no full **`arun`** teardown path). |
|
||||
| **`await iperf_flow.acease(...)`** | Signal stop to TX then RX for selected flows. |
|
||||
| **`await iperf_flow.aplot(...)`** | Schedule histogram async plots under a timeout. |
|
||||
| **`await iperf_flow.aclose_loop()`** | **`await`** **`shutdown_asyncgens()`** on the **running** loop (rare; advanced). |
|
||||
| **`await flow.acompute_ks_table(...)`** | KS table / clustering work for one **`iperf_flow`** instance. |
|
||||
| **`await iperf_flow.run_traffic(...)`** | Full run: optional preclean, start RX/TX, optional traffic wait, stop clients/servers. |
|
||||
| **`await iperf_flow.start_traffic(...)`** | Start servers and clients only (no **`run_traffic`** wait/teardown path). |
|
||||
| **`await iperf_flow.stop_traffic(...)`** | Signal stop to TX then RX for selected flows. |
|
||||
| **`await iperf_flow.plot_histograms(...)`** | Schedule histogram async plots under a timeout. |
|
||||
| **`await iperf_flow.shutdown_async_generators()`** | **`await`** **`shutdown_asyncgens()`** on the **running** loop (rare; advanced). |
|
||||
| **`await flow.analyze_ks_clusters(...)`** | KS table, optional plots, and clustering / linkage for one **`iperf_flow`** instance. |
|
||||
|
||||
Example skeleton:
|
||||
|
||||
|
|
@ -50,14 +50,14 @@ from fiwicontrol.flows import iperf_flow
|
|||
|
||||
async def main() -> None:
|
||||
# construct iperf_flow instances (server/client endpoints) …
|
||||
await iperf_flow.arun(time=30, flows="all", preclean=True)
|
||||
await iperf_flow.run_traffic(time=30, flows="all", preclean=True)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
### Sync wrappers (legacy / scripts)
|
||||
|
||||
**`iperf_flow.run`**, **`commence`**, **`plot`**, **`cease`**, **`sleep`**, **`compute_ks_table`**, and **`close_loop`** call **`asyncio.run(...)`** **only when no event loop is already running**. If you call them while **`asyncio.get_running_loop()`** succeeds, they raise **`RuntimeError`** instructing you to use the **`a*`** async methods instead.
|
||||
**`iperf_flow.run`**, **`commence`**, **`plot`**, **`cease`**, **`sleep`**, **`compute_ks_table`**, and **`close_loop`** call **`asyncio.run(...)`** **only when no event loop is already running**. If you call them while **`asyncio.get_running_loop()`** succeeds, they raise **`RuntimeError`** instructing you to use the **`await …`** coroutines listed above instead.
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -73,7 +73,7 @@ Construct **`iperf_flow`** with **`server`** and **`client`** objects that expos
|
|||
|
||||
## SSH and cleanup
|
||||
|
||||
**`iperf_flow.cleanup`** runs **`ssh user@host pkill iperf`** (configurable **`sshcmd`** / **`user`**). **`arun`** can **`preclean`** those hosts before starting traffic.
|
||||
**`iperf_flow.cleanup`** runs **`ssh user@host pkill iperf`** (configurable **`sshcmd`** / **`user`**). **`run_traffic`** (and **`start_traffic`** with **`preclean=True`**) can **`preclean`** those hosts before starting traffic.
|
||||
|
||||
For workstation SSH quirks (Fedora **`ssh_config.d`**), see **`docs/install.md`** and **`FIWI_SSH_CONFIG`** (pytest sets a minimal config by default).
|
||||
|
||||
|
|
|
|||
|
|
@ -10,9 +10,13 @@ optional dependencies (**SciPy**, **NumPy**, **Matplotlib**, …). Use
|
|||
|
||||
Public symbols are loaded lazily so ``import fiwicontrol.flows`` stays light.
|
||||
|
||||
From async code, prefer awaiting :meth:`fiwicontrol.flows.flows.iperf_flow.arun` (and
|
||||
``acommence`` / ``aplot`` / ``acease`` / ``acompute_ks_table``). The legacy ``run`` /
|
||||
``commence`` / … methods call :func:`asyncio.run` only when **no** event loop is already
|
||||
From async code, prefer ``await`` on :meth:`fiwicontrol.flows.flows.iperf_flow.run_traffic`,
|
||||
:meth:`~fiwicontrol.flows.flows.iperf_flow.start_traffic`,
|
||||
:meth:`~fiwicontrol.flows.flows.iperf_flow.stop_traffic`,
|
||||
:meth:`~fiwicontrol.flows.flows.iperf_flow.plot_histograms`,
|
||||
:meth:`~fiwicontrol.flows.flows.iperf_flow.shutdown_async_generators`, and (per flow)
|
||||
:meth:`~fiwicontrol.flows.flows.iperf_flow.analyze_ks_clusters`. The synchronous ``run`` /
|
||||
``commence`` / … helpers call :func:`asyncio.run` only when **no** event loop is already
|
||||
running (Python 3.11+).
|
||||
"""
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ def _run_coro(coro: collections.abc.Coroutine) -> None:
|
|||
else:
|
||||
raise RuntimeError(
|
||||
"iperf_flow synchronous helpers cannot run under an active event loop; "
|
||||
"use the async entrypoints (e.g. await iperf_flow.arun(...)) instead."
|
||||
"use the async entrypoints (e.g. await iperf_flow.run_traffic(...)) instead."
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -75,14 +75,14 @@ class iperf_flow(object):
|
|||
return list(iperf_flow.instances)
|
||||
|
||||
@classmethod
|
||||
async def aclose_loop(cls) -> None:
|
||||
async def shutdown_async_generators(cls) -> None:
|
||||
"""Shut down async generators on the **running** loop (advanced cleanup)."""
|
||||
await asyncio.get_running_loop().shutdown_asyncgens()
|
||||
|
||||
@classmethod
|
||||
def close_loop(cls) -> None:
|
||||
"""Sync wrapper for :meth:`aclose_loop` (no active loop)."""
|
||||
_run_coro(cls.aclose_loop())
|
||||
"""Sync wrapper for :meth:`shutdown_async_generators` (no active loop)."""
|
||||
_run_coro(cls.shutdown_async_generators())
|
||||
|
||||
@classmethod
|
||||
def sleep(cls, delay: float = 0, text: str | None = None, stoptext: str | None = None) -> None:
|
||||
|
|
@ -90,7 +90,7 @@ class iperf_flow(object):
|
|||
_run_coro(_sleep_logs(delay, text, stoptext))
|
||||
|
||||
@classmethod
|
||||
async def arun(
|
||||
async def run_traffic(
|
||||
cls,
|
||||
time=None,
|
||||
amount=None,
|
||||
|
|
@ -187,9 +187,12 @@ class iperf_flow(object):
|
|||
parallel=None,
|
||||
epoch_sync=False,
|
||||
) -> None:
|
||||
"""Run traffic flows (uses :func:`asyncio.run` when no loop is running)."""
|
||||
"""Blocking full traffic run (uses :func:`asyncio.run` when no loop is running).
|
||||
|
||||
From async code, use ``await`` :meth:`run_traffic` instead.
|
||||
"""
|
||||
_run_coro(
|
||||
cls.arun(
|
||||
cls.run_traffic(
|
||||
time=time,
|
||||
amount=amount,
|
||||
flows=flows,
|
||||
|
|
@ -202,7 +205,7 @@ class iperf_flow(object):
|
|||
)
|
||||
|
||||
@classmethod
|
||||
async def acommence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None:
|
||||
async def start_traffic(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None:
|
||||
if flows == 'all':
|
||||
flows = iperf_flow.get_instances()
|
||||
if not flows:
|
||||
|
|
@ -234,10 +237,10 @@ class iperf_flow(object):
|
|||
|
||||
@classmethod
|
||||
def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) -> None:
|
||||
_run_coro(cls.acommence(time=time, flows=flows, sample_delay=sample_delay, io_timer=io_timer, preclean=preclean))
|
||||
_run_coro(cls.start_traffic(time=time, flows=flows, sample_delay=sample_delay, io_timer=io_timer, preclean=preclean))
|
||||
|
||||
@classmethod
|
||||
async def aplot(cls, flows='all', title='None', directory='None') -> None:
|
||||
async def plot_histograms(cls, flows='all', title='None', directory='None') -> None:
|
||||
if flows == 'all':
|
||||
flows = iperf_flow.get_instances()
|
||||
|
||||
|
|
@ -266,10 +269,10 @@ class iperf_flow(object):
|
|||
|
||||
@classmethod
|
||||
def plot(cls, flows='all', title='None', directory='None') -> None:
|
||||
_run_coro(cls.aplot(flows=flows, title=title, directory=directory))
|
||||
_run_coro(cls.plot_histograms(flows=flows, title=title, directory=directory))
|
||||
|
||||
@classmethod
|
||||
async def acease(cls, flows='all') -> None:
|
||||
async def stop_traffic(cls, flows='all') -> None:
|
||||
if flows == 'all':
|
||||
flows = iperf_flow.get_instances()
|
||||
|
||||
|
|
@ -285,7 +288,7 @@ class iperf_flow(object):
|
|||
|
||||
@classmethod
|
||||
def cease(cls, flows='all') -> None:
|
||||
_run_coro(cls.acease(flows=flows))
|
||||
_run_coro(cls.stop_traffic(flows=flows))
|
||||
|
||||
@classmethod
|
||||
async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') :
|
||||
|
|
@ -468,7 +471,7 @@ class iperf_flow(object):
|
|||
def stats(self):
|
||||
logging.info('stats')
|
||||
|
||||
async def acompute_ks_table(self, plot=True, directory='.', title=None) -> None:
|
||||
async def analyze_ks_clusters(self, plot=True, directory='.', title=None) -> None:
|
||||
if len(self.histogram_names) < 1:
|
||||
tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names))
|
||||
logging.info(tmp)
|
||||
|
|
@ -538,8 +541,8 @@ class iperf_flow(object):
|
|||
pass
|
||||
|
||||
def compute_ks_table(self, plot=True, directory='.', title=None) -> None:
|
||||
"""Sync wrapper; prefer :meth:`acompute_ks_table` inside async code."""
|
||||
_run_coro(self.acompute_ks_table(plot=plot, directory=directory, title=title))
|
||||
"""Sync wrapper; in async code use ``await flow.analyze_ks_clusters(...)``."""
|
||||
_run_coro(self.analyze_ks_clusters(plot=plot, directory=directory, title=title))
|
||||
|
||||
def dump_stats(self, directory='.') :
|
||||
logging.info("\n********************** dump_stats for flow {} **********************".format(self.name))
|
||||
|
|
|
|||
Loading…
Reference in New Issue