645 lines
39 KiB
Markdown
645 lines
39 KiB
Markdown
# Design: `fiwicontrol.commands` — asyncio execution (SSH / `ush`)
|
||
|
||
**Status:** Current
|
||
|
||
**Package:** `**fiwicontrol.commands`** (FiWiControl repository).
|
||
**Implementation file:** `**src/fiwicontrol/commands/node_control.py`**.
|
||
The main entry type is `**ssh_node**` — SSH and `**ush**` transports to a rig.
|
||
|
||
**Scope:** Asyncio-based SSH/`ush` rig control: `**ssh_node`**, `**ssh_session**`, `**Command**`, `**CommandManager**`, concurrent execution via `**asyncio**`, line-oriented streaming, and logging.
|
||
**Language:** **Python 3.11+** (stdlib `**asyncio.TaskGroup`**, `**asyncio.timeout**`, and related APIs assumed available).
|
||
|
||
**Remote access:** For `**sshtype="ssh"`**, **passwordless SSH is required**. Commands are executed via `**ssh`** without an interactive TTY, so **key-based authentication** (or any non-interactive auth that does not prompt) must already work for `**root@<ipaddr>`** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits.
|
||
|
||
---
|
||
|
||
## Running tests
|
||
|
||
Integration tests for `**node_control.py**` live in `**tests/test_node_control.py**`. They open real `**ssh**` sessions to `**root@$FIWI_REMOTE_IP**` (default `**192.168.1.39**`). **Passwordless SSH must work** for that host. Tests are **opt-in**: set `**FIWI_RUN_REMOTE_TESTS=1`** or pytest **skips** the module (so default CI / laptop runs do not require a rig).
|
||
|
||
**From a terminal** (repository root — e.g. `**~/Code/FiWiControl`**):
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
|
||
export FIWI_RUN_REMOTE_TESTS=1
|
||
# Optional: target a different host (default is 192.168.1.39 if unset)
|
||
export FIWI_REMOTE_IP=192.168.1.39
|
||
|
||
# Verbose pytest (recommended)
|
||
python3 -m pytest tests/test_node_control.py -v
|
||
|
||
# Quiet summary only
|
||
python3 -m pytest tests/test_node_control.py -q
|
||
```
|
||
|
||
**Without pytest**, the file is also a `**unittest`** script (it sets `**FIWI_RUN_REMOTE_TESTS=1**` if unset, so you do not need that variable for a direct script run):
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
python tests/test_node_control.py --remote-ip 192.168.1.39
|
||
# More asyncio logging:
|
||
python tests/test_node_control.py --remote-ip 192.168.1.39 --debug
|
||
```
|
||
|
||
`**pytest**` picks up `**pythonpath = ["src"]**` from `**pyproject.toml**`, so you do **not** need `**PYTHONPATH=src`** when you run from the repo root as above. After `**pip install -e .**`, imports resolve the same way.
|
||
|
||
**Example successful output** (line order and timings vary; captured with `**python3 -m pytest tests/test_node_control.py -v`** on Linux):
|
||
|
||
```text
|
||
============================= test session starts ==============================
|
||
platform linux -- Python 3.13.11, pytest-9.0.3, pluggy-1.6.0 -- /usr/bin/python3
|
||
cachedir: .pytest_cache
|
||
rootdir: /home/.../FiWiControl
|
||
configfile: pyproject.toml
|
||
plugins: anyio-4.8.0
|
||
collecting ... collected 6 items
|
||
|
||
tests/test_node_control.py::TestRemoteSingleRunCommands::test_dmesg_tail PASSED [ 16%]
|
||
tests/test_node_control.py::TestRemoteSingleRunCommands::test_ls_home_directory PASSED [ 33%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_command_manager_add_and_self_clean PASSED [ 50%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_01_uname_r_every_1s_10_times PASSED [ 66%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_02_pwd_and_ls_every_1s_5_times PASSED [ 83%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_03_forever_can_be_stopped PASSED [100%]
|
||
|
||
============================== 6 passed in 16.47s ==============================
|
||
```
|
||
|
||
**Fedora / `ssh_config.d`:** pytest loads `**tests/conftest.py`**, which `**setdefault`s** `**FIWI_SSH_CONFIG`** to `**tests/fixtures/ssh_config_minimal**` when that file exists so `**ssh**` is invoked as `**ssh -F …**` and does not read broken system drop-ins under `**/etc/ssh/ssh_config.d/**`. Override with your own `**FIWI_SSH_CONFIG**` if needed. For manual `**ssh**`, export the same variable or fix system file permissions (see `**docs/install.md**`).
|
||
|
||
If SSH is not configured, you will see failures (connection refused, auth errors, or timeouts) instead of `**PASSED**`.
|
||
|
||
**Package import smoke** (no network; optional):
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
python3 -m pytest tests/test_package_layout.py -q
|
||
```
|
||
|
||
---
|
||
|
||
## How to use (examples)
|
||
|
||
Implementation lives in `**src/fiwicontrol/commands/node_control.py**`. All remote I/O is **async** — call from a coroutine under `**asyncio.run()`** or your application event loop.
|
||
|
||
### Prerequisites (remote targets)
|
||
|
||
1. **Passwordless SSH** — From the machine running this code, `**ssh root@<ipaddr> true`** (or equivalent) must succeed **without** typing a password. Configure `**~/.ssh/authorized_keys`** on the target and use an unencrypted agent key or `**ssh-agent**` as appropriate for your environment. For `**sshtype="ssh"**`, `**ssh_session.post_cmd**` always passes an explicit `**root@<hostname>**` ( `**ssh_session.user**` defaults to `**root**`), not a bare host, so the remote user is never your workstation login name.
|
||
2. Network reachability and correct `**ipaddr**` (or hostname) on the node.
|
||
3. Optional: `**BatchMode=yes**` in `**ssh_config**` for fail-fast behavior when keys are missing (recommended for automation).
|
||
|
||
### Remote host: Python on the rig (power / discovery)
|
||
|
||
`**ssh_node.rexec**` runs shell commands on the rig over the **same** non-interactive `**ssh root@<ipaddr>`** path as the prerequisites above. Anything you run remotely — including `**python3 -m fiwicontrol.power --discovery-json**` for inventory or USB discovery — needs that **Python environment installed on the node**, not only on your laptop.
|
||
|
||
**Full checklist** (SSH keys, `**git clone`** on the Pi, `**python3 -m fiwicontrol.commands …**`, PEP 668 / `**--break-system-packages**`, verification `**pytest**`):
|
||
|
||
→ `**docs/install.md**` — section **“Remote host setup”**
|
||
|
||
Power-specific behaviour (what discovery returns, INI format): `**docs/power-control-and-inventory.md`**.
|
||
|
||
### Imports
|
||
|
||
Preferred (re-exported from `**fiwicontrol.commands**`):
|
||
|
||
```python
|
||
import asyncio
|
||
from fiwicontrol.commands import Command, CommandManager, ssh_node
|
||
```
|
||
|
||
Equivalent submodule import (same symbols):
|
||
|
||
```python
|
||
from fiwicontrol.commands.node_control import Command, CommandManager, ssh_node
|
||
```
|
||
|
||
Install the package (editable: `**pip install -e .**` from the **FiWiControl** repo root; distribution name `**fiwicontrol`**). For `**pytest**` without installing, `**pythonpath = ["src"]**` is set in `**pyproject.toml**`.
|
||
|
||
### Create an `ssh_node` (rig / execution target)
|
||
|
||
`**ssh_node**` is a **class** in `**fiwicontrol.commands`**: one instance represents **one** reachable rig (OpenSSH or `**ush`**). It is not the package name; the package is `**fiwicontrol.commands**`.
|
||
|
||
**SSH (default)** — targets `**root@ipaddr`** with ControlMaster socket under `**/tmp/controlmasters_<ip>**` when `**ssh_controlmaster=True**` (default for `**rexec**` / `**Command**` sessions).
|
||
|
||
```python
|
||
node = ssh_node(
|
||
name="rack-pi",
|
||
ipaddr="192.168.1.39",
|
||
ssh_controlmaster=False, # optional; tests often disable for simplicity
|
||
silent_mode=False, # False: stream stdout/stderr lines to logging
|
||
)
|
||
```
|
||
|
||
`**ush**` instead of plain `**ssh**`:
|
||
|
||
```python
|
||
node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush")
|
||
```
|
||
|
||
**Jump host (`relay`)** — argv becomes `**ssh root@<relay>`** then inner `**ush**` or `**ssh**`.
|
||
|
||
```python
|
||
node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com")
|
||
```
|
||
|
||
Keep a **strong reference** to each node you care about; the class registry (`**WeakSet`**) only lists nodes still alive from the rest of your program (§3.3).
|
||
|
||
### One-shot remote command (`rexec`)
|
||
|
||
`**rexec**` builds an `**ssh_session**`, runs `**post_cmd**`, and returns the session. Output accumulates in `**session.results**` (bytes); line-oriented output also goes to the logger unless `**silent_mode=True**`.
|
||
|
||
```python
|
||
async def run_once():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
session = await node.rexec(
|
||
cmd="ls ~",
|
||
IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk)
|
||
CMD_TIMEOUT=30, # wall-clock cap for the remote command
|
||
CONNECT_TIMEOUT=20.0,
|
||
repeat=None, # omit or False: single subprocess lifecycle
|
||
)
|
||
text = session.results.decode("utf-8", errors="replace").strip()
|
||
print(text)
|
||
|
||
asyncio.run(run_once())
|
||
```
|
||
|
||
### Repeating commands (`Command`)
|
||
|
||
`**Command**` drives **separate** `**ssh_session.post_cmd`** invocations on a schedule (`**interval**` in seconds, event-loop deadline-based — §5.3).
|
||
|
||
- `**interval=None**` or `**<= 0**`: run **once** then stop.
|
||
- `**interval > 0`**: repeat until `**count**` is reached or `**stop()**`.
|
||
- `**count**`: finite integer, or `**Command.COUNT_FOREVER**` (`**-1**`) until `**stop()**`.
|
||
|
||
```python
|
||
async def poll_uname():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
|
||
await cmd.start()
|
||
await asyncio.wait_for(cmd.task, timeout=25.0)
|
||
print("runs:", cmd.run_count) # expect 10
|
||
|
||
asyncio.run(poll_uname())
|
||
```
|
||
|
||
Stop a long-running / infinite command:
|
||
|
||
```python
|
||
async def stop_forever():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER)
|
||
await cmd.start()
|
||
await asyncio.sleep(5)
|
||
await cmd.stop()
|
||
|
||
asyncio.run(stop_forever())
|
||
```
|
||
|
||
Optional `**log_file**`: each run’s output is appended under a banner. Per-run timeouts: `**io_timeout**`, `**cmd_timeout**`, `**connect_timeout**` (override node defaults).
|
||
|
||
### Many commands (`CommandManager`)
|
||
|
||
`**CommandManager**` assigns IDs, starts tasks, **removes finished commands** from `**list_active()`**, and records **exception/timeout** stops in `**list_abnormal_history()`**.
|
||
|
||
```python
|
||
async def managed():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
mgr = CommandManager()
|
||
cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4)
|
||
cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4)
|
||
await asyncio.sleep(3)
|
||
print("active:", mgr.list_active())
|
||
print("abnormal:", mgr.list_abnormal_history())
|
||
await mgr.stop_all()
|
||
|
||
asyncio.run(managed())
|
||
```
|
||
|
||
### Parallel one-shots (`TaskGroup` / `gather`)
|
||
|
||
For several **independent** `**rexec`**-style calls, schedule them together:
|
||
|
||
```python
|
||
async def parallel():
|
||
a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False)
|
||
b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False)
|
||
|
||
async def one(node, cmd):
|
||
s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0)
|
||
return s.results.decode("utf-8", errors="replace")
|
||
|
||
async with asyncio.TaskGroup() as tg:
|
||
t1 = tg.create_task(one(a, "hostname"))
|
||
t2 = tg.create_task(one(b, "hostname"))
|
||
print(t1.result(), t2.result())
|
||
|
||
asyncio.run(parallel())
|
||
```
|
||
|
||
Use `**asyncio.gather(..., return_exceptions=True)**` when you want every outcome without `**TaskGroup**` fail-fast behavior.
|
||
|
||
### Consoles (`open_consoles` / `close_consoles`)
|
||
|
||
Class methods walk `**ssh_node.get_instances()**`, clean `**dmesg**` on SSH nodes, and optionally open long-lived ControlMaster `**dmesg -w**` streams (§3.4). Call only when your rig expects this pattern.
|
||
|
||
```python
|
||
await ssh_node.open_consoles(silent_mode=False)
|
||
# ... use nodes ...
|
||
await ssh_node.close_consoles()
|
||
```
|
||
|
||
**Tests:** see **[Running tests](#running-tests)** at the top of this document.
|
||
|
||
---
|
||
|
||
## 1. Goals
|
||
|
||
1. **Streaming with line buffering** — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish.
|
||
2. **Run-to-completion** — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3).
|
||
3. **Concurrent execution from a prepared set** — Build coroutines (e.g. several `**post_cmd`** calls), then run them **concurrently** under `**asyncio.TaskGroup`** (Python 3.11+). Use `**asyncio.gather**` when a simple “wait for all results” shape fits better.
|
||
4. **Scheduled / periodic execution** — Support one-shot and repeating commands through a single `**Command**` type with explicit lifecycle (`start`/`stop`) and fixed-period scheduling semantics.
|
||
|
||
---
|
||
|
||
## 2. Architectural overview
|
||
|
||
|
||
| Type | Responsibility |
|
||
| ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
| `**ssh_node**` (class) | Per-host configuration: identity, `ssh`/`ush` argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. **Does not** own subprocess I/O or interval scheduling. |
|
||
| `**ssh_session**` (class) | One SSH child process for a command or stream: `**post_cmd**`, `**close**`, `**SubprocessProtocol**` with `**pipe_data_received**`, timeouts, communicate-like completion. Reads settings from `**ssh_node**` (e.g. via attribute delegation). |
|
||
| `**SSHReaderProtocol**` (nested class on `ssh_session`) | Subprocess protocol callbacks: `connection_made`, `pipe_data_received`, `pipe_connection_lost`, `process_exited`, watchdog timer hooks. |
|
||
| `**Command**` (class) | Managed lifecycle for repeated or one-shot runs driven by `**ssh_node**` / `**ssh_session**`: `**interval**`, `**count**`, `**start`/`stop**`, log/sink attachment. Each execution uses `**ssh_session.post_cmd**`. |
|
||
| `**CommandManager**` (class) | Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). |
|
||
|
||
|
||
**Concurrent runs:** Prefer `**asyncio.TaskGroup**` for structured concurrency; `**asyncio.gather**` when appropriate.
|
||
|
||
**Per-line logging:** Use `**logging.Logger**` and/or a **callable** sink; a formal `**LineSink**` protocol type is optional and only worth adding if multiple implementations need a shared interface.
|
||
|
||
**Library asyncio rule:** Public APIs that do I/O are `**async**`. Coroutines and protocols use `**asyncio.get_running_loop()**`. The application (or `**asyncio.run**`) starts the event loop before calling into the library.
|
||
|
||
---
|
||
|
||
## 3. `ssh_node`
|
||
|
||
### 3.1 Role
|
||
|
||
|
||
| Concern | `ssh_node` |
|
||
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
|
||
| `**name**`, `**ipaddr**`, optional `**device**`, `**devip**` | Yes |
|
||
| `**sshtype**` (`ssh` / `ush`), `**relay**`, `**self.ssh` argv prefix | Yes |
|
||
| `**ssh_controlmaster**`, `**controlmasters**` (SSH only; off for `ush`) | Yes |
|
||
| `**ssh_console_session**`, console task references | Yes (bookkeeping) |
|
||
| Subprocess streaming / `**pipe_data_received**` | **No** — `**ssh_session`** |
|
||
| Interval scheduling, periodic task lists | **No** — `**Command`** (callers hold `**Command**` instances and `**Task**` handles) |
|
||
|
||
|
||
### 3.2 Constructor parameters
|
||
|
||
- `**name**`, `**ipaddr**`: label and target for argv assembly (`root@…` applied when building the remote command).
|
||
- `**relay**`: optional jump host; argv begins with `ssh root@<relay>` then inner `ush` or `ssh`.
|
||
- `**ssh_controlmaster**`: enable ControlMaster path for `ssh`; forced off for `ush`.
|
||
- `**silent_mode**`: passed into sessions for line logging behavior.
|
||
- `**console**`: optional rig-facing flag.
|
||
|
||
### 3.3 Registry
|
||
|
||
Instances register in a class-level `**WeakSet**` (`instances`). `**get_instances()**` returns a snapshot of nodes that are **still alive** from the rest of the program’s perspective.
|
||
|
||
`**WeakSet` properties:**
|
||
|
||
- The registry supports **enumeration** (e.g. `**open_consoles`** over nodes the application is using). `**WeakSet**` stores **weak references** only: node **lifetime** follows **strong references** held by the application; when those go away, the node may be **collected** and **disappears from `instances`** automatically.
|
||
- `**get_instances()**` therefore reflects nodes that are **still strongly referenced** elsewhere (variables, rig object, collections).
|
||
- Callers keep a **strong reference** to each node they care about for as long as that node should appear in global enumeration.
|
||
|
||
**Alternative pattern:** an explicit **list of nodes** passed into orchestration functions also works; `**WeakSet`** fits a **central index** without that list becoming the sole long-lived owner of every node.
|
||
|
||
### 3.4 Class-level orchestration
|
||
|
||
`**open_consoles`**, `**close_consoles**`: multi-node cleanup (`pkill dmesg`), ControlMaster + long-lived console streams. `**async**`; require a running loop.
|
||
|
||
Stopping scheduled work: callers `**await cmd.stop()**` and/or `**task.cancel()**` on the `**Task**` from `**Command.start()**` (exact API TBD).
|
||
|
||
### 3.5 Instance methods
|
||
|
||
|
||
| Method | Behavior |
|
||
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
| `**rexec**` | Constructs `**ssh_session**`, runs `**post_cmd**` with IO / command / connect timeouts. To run many `**rexec**`-style operations later in parallel, use `**asyncio.TaskGroup**` (or `**gather**` if that shape fits). |
|
||
| `**clean**` | `**async**`: remote cleanup (e.g. `pkill dmesg`) via a one-off subprocess. |
|
||
| `**close_console**` | `**async**`: tear down `**ssh_console_session**`. |
|
||
|
||
|
||
Scheduled / interval execution: `**Command**` (§5). `**ssh_node**`: identity, transport, registry, console bookkeeping.
|
||
|
||
### 3.6 Optional utilities
|
||
|
||
Thin `**async` helpers** (e.g. sleep delays) may exist as **module-level functions** when useful.
|
||
|
||
---
|
||
|
||
## 4. `ssh_session`
|
||
|
||
### 4.1 Role
|
||
|
||
One logical SSH subprocess: spawn via `**loop.subprocess_exec`** with a `**SubprocessProtocol**`, stream stdout/stderr through `**pipe_data_received**`, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion.
|
||
|
||
### 4.2 Relationship to `ssh_node`
|
||
|
||
The session holds a reference to `**ssh_node**` and delegates missing attributes (e.g. `**ipaddr**`, `**ssh**`, `**controlmasters**`) for argv and ControlMaster paths.
|
||
|
||
### 4.3 Operations
|
||
|
||
- `**post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)**` — Build full `**ssh**` argv (including `**-o ControlPath**` / master setup when `**control_master**`). A `**repeat**` flag that loops **inside one subprocess lifecycle** is distinct from `**Command`’s** separate invocations on a schedule (§5).
|
||
- `**close`** — Terminate console / master as required (`**async**`).
|
||
|
||
### 4.4 Inner protocol
|
||
|
||
`**SSHReaderProtocol**` (name may match implementation): implements `**pipe_data_received**`, `**pipe_connection_lost**`, `**process_exited**`, watchdog timers via `**get_running_loop().call_later**`, and optional `**LoggerAdapter**` for prefixed logs.
|
||
|
||
---
|
||
|
||
## 5. `Command`
|
||
|
||
### 5.1 Name and scope
|
||
|
||
The type is `**Command**` (or `**SshCommand**` if `Command` collides in consumer code). SSH is the default transport; **repetition** is modeled with an `**interval`** field on the same type.
|
||
|
||
### 5.2 Responsibilities
|
||
|
||
|
||
| Concern | Owner |
|
||
| ------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
| Remote command string / argv fragment | `**Command**` |
|
||
| `**interval**`: `None` or `0` → one execution per start/stop (or defined one-shot semantics); `> 0` → period between scheduled fires | `**Command**` |
|
||
| `**count**`: finite run count or `**Command.COUNT_FOREVER**` for never-ending | `**Command**` |
|
||
| Optional `**initial_delay**`, jitter | `**Command**` |
|
||
| `**start()` / `stop()**`, background `**Task**`, cancellation, log flush | `**Command**` |
|
||
| Log / line output target | `**Command**` — typically `**logging.Logger**` or a **callable**; formal `**LineSink`** protocol optional |
|
||
| **Completion timer** (wall-clock cap per run) | `**Command`** — default passed into each `**post_cmd**` as `**CMD_TIMEOUT**` (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. |
|
||
| **I/O idle timer** | `**Command`** — default passed into each `**post_cmd**` as `**IO_TIMEOUT**`: max time **without** stdout/stderr bytes; timer resets on `**pipe_data_received`**. Catches “stuck but still running” cases where the process produces no output. |
|
||
| Subprocess + protocol | `**ssh_session**` only — each tick `**await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)**`; timers enforced inside `**SSHReaderProtocol**` |
|
||
|
||
|
||
**Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` address **idle output** vs **overall run length**. `**Command`** supplies defaults and passes them into each `**post_cmd**`; `**SSHReaderProtocol**` implements the watchdogs.
|
||
|
||
**Split of roles:** `**Command`** schedules runs and timer values; `**ssh_session**` runs `**post_cmd**` and owns `**pipe_data_received**` streaming.
|
||
|
||
### 5.3 Scheduling repeats
|
||
|
||
Repeating `**Command**` uses **event-loop time** so each **period** is measured **between scheduled fire times**, independent of how long `**post_cmd`** took:
|
||
|
||
1. **Deadline-based:** at tick start, `**next_fire = loop.time() + period`**. After `**await post_cmd(...)**`, wait `**max(0.0, next_fire - loop.time())**` (or equivalent `**Event.wait**` with timeout). Document behavior when `**post_cmd**` exceeds `**period**` (immediate next run vs align to next `**next_fire**`).
|
||
2. `**loop.call_at` / `call_later`:** schedule the next tick at `**loop.time() + period`** (or the computed deadline).
|
||
|
||
The wait between ticks is the time remaining until `**next_fire**` (or the next `**call_at**` / `**call_later**` fire), so the **period** stays anchored to the event-loop clock regardless of `**post_cmd`** duration.
|
||
|
||
### 5.4 `Command` vs bare `Task`
|
||
|
||
A coroutine `**run_command_loop(...)**` with deadline scheduling plus `**asyncio.create_task**` is enough for callers who want minimal API surface. `**Command**` is optional sugar for `**start`/`stop**`, named fields, and encapsulation — add it when that ergonomics is worth the type.
|
||
|
||
### 5.5 API sketch
|
||
|
||
```text
|
||
cmd = Command(node, cmd="...", interval=5.0, line_sink=...)
|
||
cmd = Command(node, cmd="...", interval=None, line_sink=...)
|
||
await cmd.start()
|
||
await cmd.stop()
|
||
```
|
||
|
||
### 5.6 Command manager
|
||
|
||
`CommandManager` supports dynamic add/remove of commands while the system is running:
|
||
|
||
```text
|
||
manager = CommandManager()
|
||
cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10)
|
||
await manager.stop_command(cid)
|
||
await manager.stop_all()
|
||
active = manager.list_active()
|
||
history = manager.list_abnormal_history()
|
||
```
|
||
|
||
Self-cleaning behavior:
|
||
|
||
- completed commands are removed from active state automatically.
|
||
- abnormal stop records (exception/timeout) are retained in history.
|
||
|
||
---
|
||
|
||
## 6. Concurrent execution (`TaskGroup` / `gather`)
|
||
|
||
**Requires Python 3.11+** for `**TaskGroup`** as documented here.
|
||
|
||
- **Preparation:** build coroutines (e.g. `**session.post_cmd("cmd", ...)`** for several sessions).
|
||
- **Execution (preferred):** `**async with asyncio.TaskGroup() as tg:`** then `**tg.create_task(coro)**` for each coroutine. `**TaskGroup**` cancels siblings on first failure (structured concurrency).
|
||
- **Alternative:** `**await asyncio.gather(*coros, return_exceptions=True)`** when callers want all outcomes without `**TaskGroup**` fail-fast behavior.
|
||
|
||
One top-level `**await**` runs the whole set concurrently.
|
||
|
||
---
|
||
|
||
## 7. Functional requirements
|
||
|
||
|
||
| ID | Requirement |
|
||
| --- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
| R1 | Line-oriented streaming: decode, buffer, split on `\n`, optional final fragment on EOF (§10.3). |
|
||
| R2 | Central logging via **logger and/or callable**; supports shared and per-session targets. |
|
||
| R3 | Completion: process ended, stdin closed if used, pipes EOF’d, line buffers flushed (§9.2, §10). |
|
||
| R4 | Prepared coroutines run **concurrently** via `**asyncio.TaskGroup`** (or `**gather**` when that shape fits); **Python 3.11+**. |
|
||
| R5 | `**ush`**, `**ssh**`, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. |
|
||
|
||
|
||
---
|
||
|
||
## 8. Non-functional requirements
|
||
|
||
|
||
| ID | Requirement |
|
||
| --- | --------------------------------------------------------------------------------------------------------------------------------- |
|
||
| N1 | Library I/O runs under `**asyncio.get_running_loop()**`; programs start work with `**asyncio.run**` or an application event loop. |
|
||
| N2 | **Python 3.11+** for implementation and documentation examples. |
|
||
| N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. |
|
||
| N4 | Cancelling a command `**Task`** tears down subprocess and protocol cleanly. |
|
||
|
||
|
||
---
|
||
|
||
## 9. Line sink and completion
|
||
|
||
### 9.1 Per-line output
|
||
|
||
Use `**logging.Logger**`, a **callable** `(stream, line) -> None`, or both. Normalize `**\r\n`**, strip `**\r**` for display. A dedicated `**LineSink**` **protocol** is optional sugar.
|
||
|
||
### 9.2 Communicate-like completion
|
||
|
||
Completion means:
|
||
|
||
1. Subprocess has exited (return code available).
|
||
2. Stdout and stderr reached EOF (`**pipe_connection_lost`** for fds 1 and 2 as used).
|
||
3. Protocol line buffers flushed (**§10.3**); `**process_exited`** handled; completion event or future set.
|
||
|
||
---
|
||
|
||
## 10. Streaming: `SubprocessProtocol` and `pipe_data_received`
|
||
|
||
The SSH streaming path uses `**loop.subprocess_exec**` with a `**SubprocessProtocol**`. **Line assembly happens in `pipe_data_received(fd, data)`** on arbitrary byte chunks from the transport.
|
||
|
||
### 10.1 Pipeline
|
||
|
||
|
||
| Stage | Mechanism |
|
||
| --------------- | ----------------------------------------------------------------------------------- |
|
||
| Ingress | `**pipe_data_received(fd, data)**` — `fd` 1 = stdout, 2 = stderr |
|
||
| Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable |
|
||
| Line split | On `\n`, invoke `**on_line**` → logger / callable |
|
||
| EOF | `**pipe_connection_lost**`: flush trailing fragment per §10.3 |
|
||
| Done | `**process_exited**` + both pipes closed + internal finished state → complete await |
|
||
|
||
|
||
### 10.2 Transport vs logical lines
|
||
|
||
**Ingress:** `**pipe_data_received`** delivers **byte chunks** (fragments and coalesced data). **Line boundaries** are applied in the protocol buffer: accumulate, split on `**\n`**, emit one logical line per `**on_line**` (Tcl `**gets**`-style delivery to parsers and loggers).
|
||
|
||
### 10.3 EOF / trailing fragment
|
||
|
||
|
||
| Mode | Behavior |
|
||
| ----------- | -------------------------------------------------------------------------------- |
|
||
| **Default** | If buffer non-empty at EOF, emit one final line without requiring trailing `\n`. |
|
||
| **Strict** | Drop or tag partial trailing line — configurable. |
|
||
|
||
|
||
### 10.4 Tcl-aligned behavior
|
||
|
||
- **Input:** after internal buffering, `**on_line`** receives **complete lines** (same idea as Tcl `**gets`** on a readable channel).
|
||
- **Output (Tcl `fconfigure -buffering line`):** In Tcl, **line-buffered** mode on a channel **flushes outgoing data when a newline is written** (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when **writing to the subprocess stdin** (if used), apply the same idea: **flush after each line** (writes ending with `**\n`**, or explicit flush) so the peer sees output without waiting for a full stdio buffer.
|
||
|
||
---
|
||
|
||
## 11. Timeouts
|
||
|
||
|
||
| Timeout | Semantics |
|
||
| ------------------ | -------------------------------------------------------------------- |
|
||
| **Connect** | From spawn until `**connection_made`**; cancel timer when connected. |
|
||
| **Command (wall)** | Maximum lifetime; cancel watchdogs and `**terminate`** transport. |
|
||
| **IO idle** | Reset on each `**pipe_data_received`**; on fire, `**terminate**`. |
|
||
|
||
|
||
If IO idle fires, completion still runs after terminate + drain (output may be partial).
|
||
|
||
---
|
||
|
||
## 12. Central log file
|
||
|
||
Use `**logging**` with a `**FileHandler**`, or a **callable** that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional `**asyncio.Queue`** plus a single writer task.
|
||
|
||
---
|
||
|
||
## 13. API sketch (illustrative names)
|
||
|
||
```text
|
||
async with asyncio.TaskGroup() as tg:
|
||
tg.create_task(session_a.post_cmd("cmd1", ...))
|
||
tg.create_task(session_b.post_cmd("cmd2", ...))
|
||
|
||
cmd = Command(node, cmd="...", interval=10.0, line_sink=...)
|
||
await cmd.start()
|
||
await cmd.stop()
|
||
```
|
||
|
||
---
|
||
|
||
## 14. Open decisions
|
||
|
||
1. Formal `**LineSink**` protocol vs **logger + callable** only.
|
||
2. **stderr:** merged to stdout vs separate channels.
|
||
3. Public name `**Command`** vs `**SshCommand**`.
|
||
4. Whether `**Command**` ships in v1 or callers use `**create_task(run_command_loop(...))**` only.
|
||
|
||
---
|
||
|
||
## 15. Specification checklist
|
||
|
||
- Line emission, EOF, and CR handling documented in implementation.
|
||
- Concurrent execution documented for **Python 3.11+** with `**TaskGroup`** (and `**gather**` where noted).
|
||
- Completion = process exit + pipe EOF + buffer flush.
|
||
- Central logging path documented.
|
||
- Timeout and cancellation behavior documented.
|
||
- Repeating `**Command**` scheduling uses **deadline-based** waits or `**call_at`/`call_later`** (§5.3).
|
||
- Automated tests cover the scenarios in **§16**.
|
||
|
||
---
|
||
|
||
## 16. Test cases
|
||
|
||
This section lists **behaviors to verify** in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints.
|
||
|
||
### 16.1 `ssh_session` / `post_cmd` / completion
|
||
|
||
- **Happy path:** Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown.
|
||
- **Non-zero exit:** Process exits with failure; completion still reached; return code or error path is observable as designed.
|
||
- **Communicate-like barrier:** Completion does not report done until `**process_exited`** and pipe `**pipe_connection_lost**` semantics are satisfied (§9.2).
|
||
|
||
### 16.2 Streaming and line buffering (`pipe_data_received`)
|
||
|
||
- **Chunked input:** Data arriving as multiple arbitrary chunks still yields **correct line splits** on `**\n`**.
|
||
- **CR / CRLF:** `**\r\n`** and trailing `**\r**` handling matches §10.2 / §10.3.
|
||
- **EOF partial line:** Buffer non-empty at EOF emits **one final line** under default policy (§10.3).
|
||
- **Stdout vs stderr:** If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration.
|
||
|
||
### 16.3 Timeouts
|
||
|
||
- **Connect timeout:** Fires when transport does not reach connected state in time; subprocess is terminated; completion reached.
|
||
- **Command (wall / `CMD_TIMEOUT`):** Long-running command is terminated at wall limit; completion reached.
|
||
- **IO idle (`IO_TIMEOUT`):** No `**pipe_data_received`** for the idle interval triggers terminate; completion reached (output may be partial) (§11).
|
||
|
||
### 16.4 `Command` (scheduled execution)
|
||
|
||
- **One-shot (`interval` unset / zero):** `**start`** runs at most one `**post_cmd**` cycle per lifecycle (per defined semantics); `**stop**` ends cleanly.
|
||
- **Repeating (`interval` > 0):** Multiple invocations occur; **period** between **scheduled** fires follows **deadline** or `**call_at`** rules (§5.3), not “sleep N after work” drift.
|
||
- **Finite count:** `count=N` executes exactly N runs.
|
||
- **Never-ending:** `count=Command.COUNT_FOREVER` runs until explicit stop/cancel.
|
||
- **Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` passed through to each `**post_cmd**` invocation behave as in §16.3.
|
||
- **Stop / cancel:** `**stop()`** or `**Task**` cancellation ends the loop; no leaked tasks or open log handles.
|
||
|
||
### 16.4.1 `CommandManager`
|
||
|
||
- **Add + self-clean:** Added finite commands appear in `list_active()` then auto-remove on normal completion.
|
||
- **Stop one / stop all:** Explicit stop removes active entries.
|
||
- **Abnormal history:** exception/timeout stops are retained in `list_abnormal_history()`.
|
||
|
||
### 16.5 Concurrent execution (Python 3.11+)
|
||
|
||
- `**TaskGroup`:** Multiple `**post_cmd`** (or equivalent) tasks run **overlapped**; first failure cancels siblings if using default `**TaskGroup`** semantics.
|
||
- `**gather` (when used):** `**return_exceptions=True`** path collects all results without aborting early, if that pattern is supported in helpers.
|
||
|
||
### 16.6 `ssh_node` / registry
|
||
|
||
- `**WeakSet`:** After the application drops its last strong reference to a node, the node **disappears** from `**get_instances()`** following collection (§3.3).
|
||
- **Strong reference:** While the app holds a node, it appears in `**get_instances()`** when enumeration runs.
|
||
|
||
### 16.7 Transport configuration
|
||
|
||
- `**ssh` vs `ush`:** Argv shape and `**ssh_controlmaster`** interaction match §3 / §4 (as far as test doubles allow).
|
||
- **Relay:** Jump-host argv prefix is built correctly when `**relay`** is set.
|
||
|
||
### 16.8 Logging / sink
|
||
|
||
- **Per-line delivery:** Logger or callable receives **one line at a time** after buffering (§9.1, §10.2).
|
||
- **Central vs per-session:** If both modes exist, each receives the expected lines (no cross-contamination unless designed).
|
||
|
||
### 16.9 Consoles / ControlMaster (if exercised in test suite)
|
||
|
||
- `**open_consoles` / `close_consoles`:** Lifecycle can be covered with integration tests or marked **optional** when real hardware or SSH fixtures are unavailable.
|
||
|
||
---
|
||
|
||
Implemented coverage reference: `tests/test_node_control.py`
|
||
|
||
*End of design document.* |