FiWiControl/docs/node-control-asyncio-design.md

645 lines
39 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Design: `fiwicontrol.commands` — asyncio execution (SSH / `ush`)
**Status:** Current
**Package:** `**fiwicontrol.commands`** (FiWiControl repository).
**Implementation file:** `**src/fiwicontrol/commands/node_control.py`**.
The main entry type is `**ssh_node**` — SSH and `**ush**` transports to a rig.
**Scope:** Asyncio-based SSH/`ush` rig control: `**ssh_node`**, `**ssh_session**`, `**Command**`, `**CommandManager**`, concurrent execution via `**asyncio**`, line-oriented streaming, and logging.
**Language:** **Python 3.11+** (stdlib `**asyncio.TaskGroup`**, `**asyncio.timeout**`, and related APIs assumed available).
**Remote access:** For `**sshtype="ssh"`**, **passwordless SSH is required**. Commands are executed via `**ssh`** without an interactive TTY, so **key-based authentication** (or any non-interactive auth that does not prompt) must already work for `**root@<ipaddr>`** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits.
---
## Running tests
Integration tests for `**node_control.py**` live in `**tests/test_node_control.py**`. They open real `**ssh**` sessions to `**root@$FIWI_REMOTE_IP**` (default `**192.168.1.39**`). **Passwordless SSH must work** for that host. Tests are **opt-in**: set `**FIWI_RUN_REMOTE_TESTS=1`** or pytest **skips** the module (so default CI / laptop runs do not require a rig).
**From a terminal** (repository root — e.g. `**~/Code/FiWiControl`**):
```bash
cd ~/Code/FiWiControl
export FIWI_RUN_REMOTE_TESTS=1
# Optional: target a different host (default is 192.168.1.39 if unset)
export FIWI_REMOTE_IP=192.168.1.39
# Verbose pytest (recommended)
python3 -m pytest tests/test_node_control.py -v
# Quiet summary only
python3 -m pytest tests/test_node_control.py -q
```
**Without pytest**, the file is also a `**unittest`** script (it sets `**FIWI_RUN_REMOTE_TESTS=1**` if unset, so you do not need that variable for a direct script run):
```bash
cd ~/Code/FiWiControl
python tests/test_node_control.py --remote-ip 192.168.1.39
# More asyncio logging:
python tests/test_node_control.py --remote-ip 192.168.1.39 --debug
```
`**pytest**` picks up `**pythonpath = ["src"]**` from `**pyproject.toml**`, so you do **not** need `**PYTHONPATH=src`** when you run from the repo root as above. After `**pip install -e .**`, imports resolve the same way.
**Example successful output** (line order and timings vary; captured with `**python3 -m pytest tests/test_node_control.py -v`** on Linux):
```text
============================= test session starts ==============================
platform linux -- Python 3.13.11, pytest-9.0.3, pluggy-1.6.0 -- /usr/bin/python3
cachedir: .pytest_cache
rootdir: /home/.../FiWiControl
configfile: pyproject.toml
plugins: anyio-4.8.0
collecting ... collected 6 items
tests/test_node_control.py::TestRemoteSingleRunCommands::test_dmesg_tail PASSED [ 16%]
tests/test_node_control.py::TestRemoteSingleRunCommands::test_ls_home_directory PASSED [ 33%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_command_manager_add_and_self_clean PASSED [ 50%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_01_uname_r_every_1s_10_times PASSED [ 66%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_02_pwd_and_ls_every_1s_5_times PASSED [ 83%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_03_forever_can_be_stopped PASSED [100%]
============================== 6 passed in 16.47s ==============================
```
**Fedora / `ssh_config.d`:** pytest loads `**tests/conftest.py`**, which `**setdefault`s** `**FIWI_SSH_CONFIG`** to `**tests/fixtures/ssh_config_minimal**` when that file exists so `**ssh**` is invoked as `**ssh -F …**` and does not read broken system drop-ins under `**/etc/ssh/ssh_config.d/**`. Override with your own `**FIWI_SSH_CONFIG**` if needed. For manual `**ssh**`, export the same variable or fix system file permissions (see `**docs/install.md**`).
If SSH is not configured, you will see failures (connection refused, auth errors, or timeouts) instead of `**PASSED**`.
**Package import smoke** (no network; optional):
```bash
cd ~/Code/FiWiControl
python3 -m pytest tests/test_package_layout.py -q
```
---
## How to use (examples)
Implementation lives in `**src/fiwicontrol/commands/node_control.py**`. All remote I/O is **async** — call from a coroutine under `**asyncio.run()`** or your application event loop.
### Prerequisites (remote targets)
1. **Passwordless SSH** — From the machine running this code, `**ssh root@<ipaddr> true`** (or equivalent) must succeed **without** typing a password. Configure `**~/.ssh/authorized_keys`** on the target and use an unencrypted agent key or `**ssh-agent**` as appropriate for your environment. For `**sshtype="ssh"**`, `**ssh_session.post_cmd**` always passes an explicit `**root@<hostname>**` ( `**ssh_session.user**` defaults to `**root**`), not a bare host, so the remote user is never your workstation login name.
2. Network reachability and correct `**ipaddr**` (or hostname) on the node.
3. Optional: `**BatchMode=yes**` in `**ssh_config**` for fail-fast behavior when keys are missing (recommended for automation).
### Remote host: Python on the rig (power / discovery)
`**ssh_node.rexec**` runs shell commands on the rig over the **same** non-interactive `**ssh root@<ipaddr>`** path as the prerequisites above. Anything you run remotely — including `**python3 -m fiwicontrol.power --discovery-json**` for inventory or USB discovery — needs that **Python environment installed on the node**, not only on your laptop.
**Full checklist** (SSH keys, `**git clone`** on the Pi, `**python3 -m fiwicontrol.commands …**`, PEP 668 / `**--break-system-packages**`, verification `**pytest**`):
`**docs/install.md**` — section **“Remote host setup”**
Power-specific behaviour (what discovery returns, INI format): `**docs/power-control-and-inventory.md`**.
### Imports
Preferred (re-exported from `**fiwicontrol.commands**`):
```python
import asyncio
from fiwicontrol.commands import Command, CommandManager, ssh_node
```
Equivalent submodule import (same symbols):
```python
from fiwicontrol.commands.node_control import Command, CommandManager, ssh_node
```
Install the package (editable: `**pip install -e .**` from the **FiWiControl** repo root; distribution name `**fiwicontrol`**). For `**pytest**` without installing, `**pythonpath = ["src"]**` is set in `**pyproject.toml**`.
### Create an `ssh_node` (rig / execution target)
`**ssh_node**` is a **class** in `**fiwicontrol.commands`**: one instance represents **one** reachable rig (OpenSSH or `**ush`**). It is not the package name; the package is `**fiwicontrol.commands**`.
**SSH (default)** — targets `**root@ipaddr`** with ControlMaster socket under `**/tmp/controlmasters_<ip>**` when `**ssh_controlmaster=True**` (default for `**rexec**` / `**Command**` sessions).
```python
node = ssh_node(
name="rack-pi",
ipaddr="192.168.1.39",
ssh_controlmaster=False, # optional; tests often disable for simplicity
silent_mode=False, # False: stream stdout/stderr lines to logging
)
```
`**ush**` instead of plain `**ssh**`:
```python
node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush")
```
**Jump host (`relay`)** — argv becomes `**ssh root@<relay>`** then inner `**ush**` or `**ssh**`.
```python
node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com")
```
Keep a **strong reference** to each node you care about; the class registry (`**WeakSet`**) only lists nodes still alive from the rest of your program (§3.3).
### One-shot remote command (`rexec`)
`**rexec**` builds an `**ssh_session**`, runs `**post_cmd**`, and returns the session. Output accumulates in `**session.results**` (bytes); line-oriented output also goes to the logger unless `**silent_mode=True**`.
```python
async def run_once():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
session = await node.rexec(
cmd="ls ~",
IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk)
CMD_TIMEOUT=30, # wall-clock cap for the remote command
CONNECT_TIMEOUT=20.0,
repeat=None, # omit or False: single subprocess lifecycle
)
text = session.results.decode("utf-8", errors="replace").strip()
print(text)
asyncio.run(run_once())
```
### Repeating commands (`Command`)
`**Command**` drives **separate** `**ssh_session.post_cmd`** invocations on a schedule (`**interval**` in seconds, event-loop deadline-based — §5.3).
- `**interval=None**` or `**<= 0**`: run **once** then stop.
- `**interval > 0`**: repeat until `**count**` is reached or `**stop()**`.
- `**count**`: finite integer, or `**Command.COUNT_FOREVER**` (`**-1**`) until `**stop()**`.
```python
async def poll_uname():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
await cmd.start()
await asyncio.wait_for(cmd.task, timeout=25.0)
print("runs:", cmd.run_count) # expect 10
asyncio.run(poll_uname())
```
Stop a long-running / infinite command:
```python
async def stop_forever():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER)
await cmd.start()
await asyncio.sleep(5)
await cmd.stop()
asyncio.run(stop_forever())
```
Optional `**log_file**`: each runs output is appended under a banner. Per-run timeouts: `**io_timeout**`, `**cmd_timeout**`, `**connect_timeout**` (override node defaults).
### Many commands (`CommandManager`)
`**CommandManager**` assigns IDs, starts tasks, **removes finished commands** from `**list_active()`**, and records **exception/timeout** stops in `**list_abnormal_history()`**.
```python
async def managed():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
mgr = CommandManager()
cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4)
cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4)
await asyncio.sleep(3)
print("active:", mgr.list_active())
print("abnormal:", mgr.list_abnormal_history())
await mgr.stop_all()
asyncio.run(managed())
```
### Parallel one-shots (`TaskGroup` / `gather`)
For several **independent** `**rexec`**-style calls, schedule them together:
```python
async def parallel():
a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False)
b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False)
async def one(node, cmd):
s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0)
return s.results.decode("utf-8", errors="replace")
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(one(a, "hostname"))
t2 = tg.create_task(one(b, "hostname"))
print(t1.result(), t2.result())
asyncio.run(parallel())
```
Use `**asyncio.gather(..., return_exceptions=True)**` when you want every outcome without `**TaskGroup**` fail-fast behavior.
### Consoles (`open_consoles` / `close_consoles`)
Class methods walk `**ssh_node.get_instances()**`, clean `**dmesg**` on SSH nodes, and optionally open long-lived ControlMaster `**dmesg -w**` streams (§3.4). Call only when your rig expects this pattern.
```python
await ssh_node.open_consoles(silent_mode=False)
# ... use nodes ...
await ssh_node.close_consoles()
```
**Tests:** see **[Running tests](#running-tests)** at the top of this document.
---
## 1. Goals
1. **Streaming with line buffering** — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish.
2. **Run-to-completion** — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3).
3. **Concurrent execution from a prepared set** — Build coroutines (e.g. several `**post_cmd`** calls), then run them **concurrently** under `**asyncio.TaskGroup`** (Python 3.11+). Use `**asyncio.gather**` when a simple “wait for all results” shape fits better.
4. **Scheduled / periodic execution** — Support one-shot and repeating commands through a single `**Command**` type with explicit lifecycle (`start`/`stop`) and fixed-period scheduling semantics.
---
## 2. Architectural overview
| Type | Responsibility |
| ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `**ssh_node**` (class) | Per-host configuration: identity, `ssh`/`ush` argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. **Does not** own subprocess I/O or interval scheduling. |
| `**ssh_session**` (class) | One SSH child process for a command or stream: `**post_cmd**`, `**close**`, `**SubprocessProtocol**` with `**pipe_data_received**`, timeouts, communicate-like completion. Reads settings from `**ssh_node**` (e.g. via attribute delegation). |
| `**SSHReaderProtocol**` (nested class on `ssh_session`) | Subprocess protocol callbacks: `connection_made`, `pipe_data_received`, `pipe_connection_lost`, `process_exited`, watchdog timer hooks. |
| `**Command**` (class) | Managed lifecycle for repeated or one-shot runs driven by `**ssh_node**` / `**ssh_session**`: `**interval**`, `**count**`, `**start`/`stop**`, log/sink attachment. Each execution uses `**ssh_session.post_cmd**`. |
| `**CommandManager**` (class) | Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). |
**Concurrent runs:** Prefer `**asyncio.TaskGroup**` for structured concurrency; `**asyncio.gather**` when appropriate.
**Per-line logging:** Use `**logging.Logger**` and/or a **callable** sink; a formal `**LineSink**` protocol type is optional and only worth adding if multiple implementations need a shared interface.
**Library asyncio rule:** Public APIs that do I/O are `**async**`. Coroutines and protocols use `**asyncio.get_running_loop()**`. The application (or `**asyncio.run**`) starts the event loop before calling into the library.
---
## 3. `ssh_node`
### 3.1 Role
| Concern | `ssh_node` |
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
| `**name**`, `**ipaddr**`, optional `**device**`, `**devip**` | Yes |
| `**sshtype**` (`ssh` / `ush`), `**relay**`, `**self.ssh` argv prefix | Yes |
| `**ssh_controlmaster**`, `**controlmasters**` (SSH only; off for `ush`) | Yes |
| `**ssh_console_session**`, console task references | Yes (bookkeeping) |
| Subprocess streaming / `**pipe_data_received**` | **No**`**ssh_session`** |
| Interval scheduling, periodic task lists | **No**`**Command`** (callers hold `**Command**` instances and `**Task**` handles) |
### 3.2 Constructor parameters
- `**name**`, `**ipaddr**`: label and target for argv assembly (`root@…` applied when building the remote command).
- `**relay**`: optional jump host; argv begins with `ssh root@<relay>` then inner `ush` or `ssh`.
- `**ssh_controlmaster**`: enable ControlMaster path for `ssh`; forced off for `ush`.
- `**silent_mode**`: passed into sessions for line logging behavior.
- `**console**`: optional rig-facing flag.
### 3.3 Registry
Instances register in a class-level `**WeakSet**` (`instances`). `**get_instances()**` returns a snapshot of nodes that are **still alive** from the rest of the programs perspective.
`**WeakSet` properties:**
- The registry supports **enumeration** (e.g. `**open_consoles`** over nodes the application is using). `**WeakSet**` stores **weak references** only: node **lifetime** follows **strong references** held by the application; when those go away, the node may be **collected** and **disappears from `instances`** automatically.
- `**get_instances()**` therefore reflects nodes that are **still strongly referenced** elsewhere (variables, rig object, collections).
- Callers keep a **strong reference** to each node they care about for as long as that node should appear in global enumeration.
**Alternative pattern:** an explicit **list of nodes** passed into orchestration functions also works; `**WeakSet`** fits a **central index** without that list becoming the sole long-lived owner of every node.
### 3.4 Class-level orchestration
`**open_consoles`**, `**close_consoles**`: multi-node cleanup (`pkill dmesg`), ControlMaster + long-lived console streams. `**async**`; require a running loop.
Stopping scheduled work: callers `**await cmd.stop()**` and/or `**task.cancel()**` on the `**Task**` from `**Command.start()**` (exact API TBD).
### 3.5 Instance methods
| Method | Behavior |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `**rexec**` | Constructs `**ssh_session**`, runs `**post_cmd**` with IO / command / connect timeouts. To run many `**rexec**`-style operations later in parallel, use `**asyncio.TaskGroup**` (or `**gather**` if that shape fits). |
| `**clean**` | `**async**`: remote cleanup (e.g. `pkill dmesg`) via a one-off subprocess. |
| `**close_console**` | `**async**`: tear down `**ssh_console_session**`. |
Scheduled / interval execution: `**Command**` (§5). `**ssh_node**`: identity, transport, registry, console bookkeeping.
### 3.6 Optional utilities
Thin `**async` helpers** (e.g. sleep delays) may exist as **module-level functions** when useful.
---
## 4. `ssh_session`
### 4.1 Role
One logical SSH subprocess: spawn via `**loop.subprocess_exec`** with a `**SubprocessProtocol**`, stream stdout/stderr through `**pipe_data_received**`, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion.
### 4.2 Relationship to `ssh_node`
The session holds a reference to `**ssh_node**` and delegates missing attributes (e.g. `**ipaddr**`, `**ssh**`, `**controlmasters**`) for argv and ControlMaster paths.
### 4.3 Operations
- `**post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)**` — Build full `**ssh**` argv (including `**-o ControlPath**` / master setup when `**control_master**`). A `**repeat**` flag that loops **inside one subprocess lifecycle** is distinct from `**Command`s** separate invocations on a schedule (§5).
- `**close`** — Terminate console / master as required (`**async**`).
### 4.4 Inner protocol
`**SSHReaderProtocol**` (name may match implementation): implements `**pipe_data_received**`, `**pipe_connection_lost**`, `**process_exited**`, watchdog timers via `**get_running_loop().call_later**`, and optional `**LoggerAdapter**` for prefixed logs.
---
## 5. `Command`
### 5.1 Name and scope
The type is `**Command**` (or `**SshCommand**` if `Command` collides in consumer code). SSH is the default transport; **repetition** is modeled with an `**interval`** field on the same type.
### 5.2 Responsibilities
| Concern | Owner |
| ------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Remote command string / argv fragment | `**Command**` |
| `**interval**`: `None` or `0` → one execution per start/stop (or defined one-shot semantics); `> 0` → period between scheduled fires | `**Command**` |
| `**count**`: finite run count or `**Command.COUNT_FOREVER**` for never-ending | `**Command**` |
| Optional `**initial_delay**`, jitter | `**Command**` |
| `**start()` / `stop()**`, background `**Task**`, cancellation, log flush | `**Command**` |
| Log / line output target | `**Command**` — typically `**logging.Logger**` or a **callable**; formal `**LineSink`** protocol optional |
| **Completion timer** (wall-clock cap per run) | `**Command`** — default passed into each `**post_cmd**` as `**CMD_TIMEOUT**` (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. |
| **I/O idle timer** | `**Command`** — default passed into each `**post_cmd**` as `**IO_TIMEOUT**`: max time **without** stdout/stderr bytes; timer resets on `**pipe_data_received`**. Catches “stuck but still running” cases where the process produces no output. |
| Subprocess + protocol | `**ssh_session**` only — each tick `**await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)**`; timers enforced inside `**SSHReaderProtocol**` |
**Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` address **idle output** vs **overall run length**. `**Command`** supplies defaults and passes them into each `**post_cmd**`; `**SSHReaderProtocol**` implements the watchdogs.
**Split of roles:** `**Command`** schedules runs and timer values; `**ssh_session**` runs `**post_cmd**` and owns `**pipe_data_received**` streaming.
### 5.3 Scheduling repeats
Repeating `**Command**` uses **event-loop time** so each **period** is measured **between scheduled fire times**, independent of how long `**post_cmd`** took:
1. **Deadline-based:** at tick start, `**next_fire = loop.time() + period`**. After `**await post_cmd(...)**`, wait `**max(0.0, next_fire - loop.time())**` (or equivalent `**Event.wait**` with timeout). Document behavior when `**post_cmd**` exceeds `**period**` (immediate next run vs align to next `**next_fire**`).
2. `**loop.call_at` / `call_later`:** schedule the next tick at `**loop.time() + period`** (or the computed deadline).
The wait between ticks is the time remaining until `**next_fire**` (or the next `**call_at**` / `**call_later**` fire), so the **period** stays anchored to the event-loop clock regardless of `**post_cmd`** duration.
### 5.4 `Command` vs bare `Task`
A coroutine `**run_command_loop(...)**` with deadline scheduling plus `**asyncio.create_task**` is enough for callers who want minimal API surface. `**Command**` is optional sugar for `**start`/`stop**`, named fields, and encapsulation — add it when that ergonomics is worth the type.
### 5.5 API sketch
```text
cmd = Command(node, cmd="...", interval=5.0, line_sink=...)
cmd = Command(node, cmd="...", interval=None, line_sink=...)
await cmd.start()
await cmd.stop()
```
### 5.6 Command manager
`CommandManager` supports dynamic add/remove of commands while the system is running:
```text
manager = CommandManager()
cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10)
await manager.stop_command(cid)
await manager.stop_all()
active = manager.list_active()
history = manager.list_abnormal_history()
```
Self-cleaning behavior:
- completed commands are removed from active state automatically.
- abnormal stop records (exception/timeout) are retained in history.
---
## 6. Concurrent execution (`TaskGroup` / `gather`)
**Requires Python 3.11+** for `**TaskGroup`** as documented here.
- **Preparation:** build coroutines (e.g. `**session.post_cmd("cmd", ...)`** for several sessions).
- **Execution (preferred):** `**async with asyncio.TaskGroup() as tg:`** then `**tg.create_task(coro)**` for each coroutine. `**TaskGroup**` cancels siblings on first failure (structured concurrency).
- **Alternative:** `**await asyncio.gather(*coros, return_exceptions=True)`** when callers want all outcomes without `**TaskGroup**` fail-fast behavior.
One top-level `**await**` runs the whole set concurrently.
---
## 7. Functional requirements
| ID | Requirement |
| --- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| R1 | Line-oriented streaming: decode, buffer, split on `\n`, optional final fragment on EOF (§10.3). |
| R2 | Central logging via **logger and/or callable**; supports shared and per-session targets. |
| R3 | Completion: process ended, stdin closed if used, pipes EOFd, line buffers flushed (§9.2, §10). |
| R4 | Prepared coroutines run **concurrently** via `**asyncio.TaskGroup`** (or `**gather**` when that shape fits); **Python 3.11+**. |
| R5 | `**ush`**, `**ssh**`, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. |
---
## 8. Non-functional requirements
| ID | Requirement |
| --- | --------------------------------------------------------------------------------------------------------------------------------- |
| N1 | Library I/O runs under `**asyncio.get_running_loop()**`; programs start work with `**asyncio.run**` or an application event loop. |
| N2 | **Python 3.11+** for implementation and documentation examples. |
| N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. |
| N4 | Cancelling a command `**Task`** tears down subprocess and protocol cleanly. |
---
## 9. Line sink and completion
### 9.1 Per-line output
Use `**logging.Logger**`, a **callable** `(stream, line) -> None`, or both. Normalize `**\r\n`**, strip `**\r**` for display. A dedicated `**LineSink**` **protocol** is optional sugar.
### 9.2 Communicate-like completion
Completion means:
1. Subprocess has exited (return code available).
2. Stdout and stderr reached EOF (`**pipe_connection_lost`** for fds 1 and 2 as used).
3. Protocol line buffers flushed (**§10.3**); `**process_exited`** handled; completion event or future set.
---
## 10. Streaming: `SubprocessProtocol` and `pipe_data_received`
The SSH streaming path uses `**loop.subprocess_exec**` with a `**SubprocessProtocol**`. **Line assembly happens in `pipe_data_received(fd, data)`** on arbitrary byte chunks from the transport.
### 10.1 Pipeline
| Stage | Mechanism |
| --------------- | ----------------------------------------------------------------------------------- |
| Ingress | `**pipe_data_received(fd, data)**``fd` 1 = stdout, 2 = stderr |
| Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable |
| Line split | On `\n`, invoke `**on_line**` → logger / callable |
| EOF | `**pipe_connection_lost**`: flush trailing fragment per §10.3 |
| Done | `**process_exited**` + both pipes closed + internal finished state → complete await |
### 10.2 Transport vs logical lines
**Ingress:** `**pipe_data_received`** delivers **byte chunks** (fragments and coalesced data). **Line boundaries** are applied in the protocol buffer: accumulate, split on `**\n`**, emit one logical line per `**on_line**` (Tcl `**gets**`-style delivery to parsers and loggers).
### 10.3 EOF / trailing fragment
| Mode | Behavior |
| ----------- | -------------------------------------------------------------------------------- |
| **Default** | If buffer non-empty at EOF, emit one final line without requiring trailing `\n`. |
| **Strict** | Drop or tag partial trailing line — configurable. |
### 10.4 Tcl-aligned behavior
- **Input:** after internal buffering, `**on_line`** receives **complete lines** (same idea as Tcl `**gets`** on a readable channel).
- **Output (Tcl `fconfigure -buffering line`):** In Tcl, **line-buffered** mode on a channel **flushes outgoing data when a newline is written** (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when **writing to the subprocess stdin** (if used), apply the same idea: **flush after each line** (writes ending with `**\n`**, or explicit flush) so the peer sees output without waiting for a full stdio buffer.
---
## 11. Timeouts
| Timeout | Semantics |
| ------------------ | -------------------------------------------------------------------- |
| **Connect** | From spawn until `**connection_made`**; cancel timer when connected. |
| **Command (wall)** | Maximum lifetime; cancel watchdogs and `**terminate`** transport. |
| **IO idle** | Reset on each `**pipe_data_received`**; on fire, `**terminate**`. |
If IO idle fires, completion still runs after terminate + drain (output may be partial).
---
## 12. Central log file
Use `**logging**` with a `**FileHandler**`, or a **callable** that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional `**asyncio.Queue`** plus a single writer task.
---
## 13. API sketch (illustrative names)
```text
async with asyncio.TaskGroup() as tg:
tg.create_task(session_a.post_cmd("cmd1", ...))
tg.create_task(session_b.post_cmd("cmd2", ...))
cmd = Command(node, cmd="...", interval=10.0, line_sink=...)
await cmd.start()
await cmd.stop()
```
---
## 14. Open decisions
1. Formal `**LineSink**` protocol vs **logger + callable** only.
2. **stderr:** merged to stdout vs separate channels.
3. Public name `**Command`** vs `**SshCommand**`.
4. Whether `**Command**` ships in v1 or callers use `**create_task(run_command_loop(...))**` only.
---
## 15. Specification checklist
- Line emission, EOF, and CR handling documented in implementation.
- Concurrent execution documented for **Python 3.11+** with `**TaskGroup`** (and `**gather**` where noted).
- Completion = process exit + pipe EOF + buffer flush.
- Central logging path documented.
- Timeout and cancellation behavior documented.
- Repeating `**Command**` scheduling uses **deadline-based** waits or `**call_at`/`call_later`** (§5.3).
- Automated tests cover the scenarios in **§16**.
---
## 16. Test cases
This section lists **behaviors to verify** in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints.
### 16.1 `ssh_session` / `post_cmd` / completion
- **Happy path:** Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown.
- **Non-zero exit:** Process exits with failure; completion still reached; return code or error path is observable as designed.
- **Communicate-like barrier:** Completion does not report done until `**process_exited`** and pipe `**pipe_connection_lost**` semantics are satisfied (§9.2).
### 16.2 Streaming and line buffering (`pipe_data_received`)
- **Chunked input:** Data arriving as multiple arbitrary chunks still yields **correct line splits** on `**\n`**.
- **CR / CRLF:** `**\r\n`** and trailing `**\r**` handling matches §10.2 / §10.3.
- **EOF partial line:** Buffer non-empty at EOF emits **one final line** under default policy (§10.3).
- **Stdout vs stderr:** If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration.
### 16.3 Timeouts
- **Connect timeout:** Fires when transport does not reach connected state in time; subprocess is terminated; completion reached.
- **Command (wall / `CMD_TIMEOUT`):** Long-running command is terminated at wall limit; completion reached.
- **IO idle (`IO_TIMEOUT`):** No `**pipe_data_received`** for the idle interval triggers terminate; completion reached (output may be partial) (§11).
### 16.4 `Command` (scheduled execution)
- **One-shot (`interval` unset / zero):** `**start`** runs at most one `**post_cmd**` cycle per lifecycle (per defined semantics); `**stop**` ends cleanly.
- **Repeating (`interval` > 0):** Multiple invocations occur; **period** between **scheduled** fires follows **deadline** or `**call_at`** rules (§5.3), not “sleep N after work” drift.
- **Finite count:** `count=N` executes exactly N runs.
- **Never-ending:** `count=Command.COUNT_FOREVER` runs until explicit stop/cancel.
- **Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` passed through to each `**post_cmd**` invocation behave as in §16.3.
- **Stop / cancel:** `**stop()`** or `**Task**` cancellation ends the loop; no leaked tasks or open log handles.
### 16.4.1 `CommandManager`
- **Add + self-clean:** Added finite commands appear in `list_active()` then auto-remove on normal completion.
- **Stop one / stop all:** Explicit stop removes active entries.
- **Abnormal history:** exception/timeout stops are retained in `list_abnormal_history()`.
### 16.5 Concurrent execution (Python 3.11+)
- `**TaskGroup`:** Multiple `**post_cmd`** (or equivalent) tasks run **overlapped**; first failure cancels siblings if using default `**TaskGroup`** semantics.
- `**gather` (when used):** `**return_exceptions=True`** path collects all results without aborting early, if that pattern is supported in helpers.
### 16.6 `ssh_node` / registry
- `**WeakSet`:** After the application drops its last strong reference to a node, the node **disappears** from `**get_instances()`** following collection (§3.3).
- **Strong reference:** While the app holds a node, it appears in `**get_instances()`** when enumeration runs.
### 16.7 Transport configuration
- `**ssh` vs `ush`:** Argv shape and `**ssh_controlmaster`** interaction match §3 / §4 (as far as test doubles allow).
- **Relay:** Jump-host argv prefix is built correctly when `**relay`** is set.
### 16.8 Logging / sink
- **Per-line delivery:** Logger or callable receives **one line at a time** after buffering (§9.1, §10.2).
- **Central vs per-session:** If both modes exist, each receives the expected lines (no cross-contamination unless designed).
### 16.9 Consoles / ControlMaster (if exercised in test suite)
- `**open_consoles` / `close_consoles`:** Lifecycle can be covered with integration tests or marked **optional** when real hardware or SSH fixtures are unavailable.
---
Implemented coverage reference: `tests/test_node_control.py`
*End of design document.*