629 lines
32 KiB
Markdown
629 lines
32 KiB
Markdown
# Design: `fiwicontrol.commands` — asyncio execution (SSH / `ush`)
|
||
|
||
**Status:** Current
|
||
|
||
**Package:** **`fiwicontrol.commands`** (FiWiControl repository).
|
||
**Implementation file:** **`src/fiwicontrol/commands/node_control.py`**.
|
||
The main entry type is still named **`ssh_node`** (historical name: SSH/`ush`-oriented rig connection).
|
||
|
||
**Scope:** Asyncio-based SSH/`ush` rig control: **`ssh_node`**, **`ssh_session`**, **`Command`**, **`CommandManager`**, concurrent execution via **`asyncio`**, line-oriented streaming, and logging.
|
||
**Language:** **Python 3.11+** (stdlib **`asyncio.TaskGroup`**, **`asyncio.timeout`**, and related APIs assumed available).
|
||
|
||
**Remote access:** For **`sshtype="ssh"`**, **passwordless SSH is required**. Commands are executed via **`ssh`** without an interactive TTY, so **key-based authentication** (or any non-interactive auth that does not prompt) must already work for **`root@<ipaddr>`** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits.
|
||
|
||
---
|
||
|
||
## Running tests
|
||
|
||
Integration tests for **`node_control.py`** live in **`tests/test_node_control.py`**. They open real **`ssh`** sessions to **`root@$FIWI_REMOTE_IP`** (default **`192.168.1.39`**). **Passwordless SSH must work** for that host.
|
||
|
||
**From a terminal** (repository root — e.g. **`~/Code/FiWiControl`**):
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
|
||
# Optional: target a different host (default is 192.168.1.39 if unset)
|
||
export FIWI_REMOTE_IP=192.168.1.39
|
||
|
||
# Verbose pytest (recommended)
|
||
python3 -m pytest tests/test_node_control.py -v
|
||
|
||
# Quiet summary only
|
||
python3 -m pytest tests/test_node_control.py -q
|
||
```
|
||
|
||
**Without pytest**, the file is also a **`unittest`** script:
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
python tests/test_node_control.py --remote-ip 192.168.1.39
|
||
# More asyncio logging:
|
||
python tests/test_node_control.py --remote-ip 192.168.1.39 --debug
|
||
```
|
||
|
||
**`pytest`** picks up **`pythonpath = ["src"]`** from **`pyproject.toml`**, so you do **not** need **`PYTHONPATH=src`** when you run from the repo root as above. After **`pip install -e .`**, imports resolve the same way.
|
||
|
||
**Example successful output** (line order and timings vary; captured with **`python3 -m pytest tests/test_node_control.py -v`** on Linux):
|
||
|
||
```text
|
||
============================= test session starts ==============================
|
||
platform linux -- Python 3.13.11, pytest-9.0.3, pluggy-1.6.0 -- /usr/bin/python3
|
||
cachedir: .pytest_cache
|
||
rootdir: /home/.../FiWiControl
|
||
configfile: pyproject.toml
|
||
plugins: anyio-4.8.0
|
||
collecting ... collected 6 items
|
||
|
||
tests/test_node_control.py::TestRemoteSingleRunCommands::test_dmesg_tail PASSED [ 16%]
|
||
tests/test_node_control.py::TestRemoteSingleRunCommands::test_ls_home_directory PASSED [ 33%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_command_manager_add_and_self_clean PASSED [ 50%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_01_uname_r_every_1s_10_times PASSED [ 66%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_02_pwd_and_ls_every_1s_5_times PASSED [ 83%]
|
||
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_03_forever_can_be_stopped PASSED [100%]
|
||
|
||
============================== 6 passed in 16.47s ==============================
|
||
```
|
||
|
||
**Fedora / `ssh_config.d`:** pytest loads **`tests/conftest.py`**, which **`setdefault`s** **`FIWI_SSH_CONFIG`** to **`tests/fixtures/ssh_config_minimal`** when that file exists so **`ssh`** is invoked as **`ssh -F …`** and does not read broken system drop-ins under **`/etc/ssh/ssh_config.d/`**. Override with your own **`FIWI_SSH_CONFIG`** if needed. For manual **`ssh`**, export the same variable or fix system file permissions (see **`docs/install.md`**).
|
||
|
||
If SSH is not configured, you will see failures (connection refused, auth errors, or timeouts) instead of **`PASSED`**.
|
||
|
||
**Package import smoke** (no network; optional):
|
||
|
||
```bash
|
||
cd ~/Code/FiWiControl
|
||
python3 -m pytest tests/test_package_layout.py -q
|
||
```
|
||
|
||
---
|
||
|
||
## How to use (examples)
|
||
|
||
Implementation lives in **`src/fiwicontrol/commands/node_control.py`**. All remote I/O is **async** — call from a coroutine under **`asyncio.run()`** or your application event loop.
|
||
|
||
### Prerequisites (remote targets)
|
||
|
||
1. **Passwordless SSH** — From the machine running this code, **`ssh root@<ipaddr> true`** (or equivalent) must succeed **without** typing a password. Configure **`~/.ssh/authorized_keys`** on the target and use an unencrypted agent key or **`ssh-agent`** as appropriate for your environment. For **`sshtype="ssh"`**, **`ssh_session.post_cmd`** always passes an explicit **`root@<hostname>`** ( **`ssh_session.user`** defaults to **`root`**), not a bare host, so the remote user is never your workstation login name.
|
||
2. Network reachability and correct **`ipaddr`** (or hostname) on the node.
|
||
3. Optional: **`BatchMode=yes`** in **`ssh_config`** for fail-fast behavior when keys are missing (recommended for automation).
|
||
|
||
### Remote host: Python on the rig (power / discovery)
|
||
|
||
**`ssh_node.rexec`** runs shell commands on the rig over the **same** non-interactive **`ssh root@<ipaddr>`** path as the prerequisites above. Anything you run remotely — including **`python3 -m fiwicontrol.power --discovery-json`** for inventory or USB discovery — needs that **Python environment installed on the node**, not only on your laptop.
|
||
|
||
**Full checklist** (SSH keys, **`git clone`** on the Pi, **`python3 -m fiwicontrol.commands …`**, PEP 668 / **`--break-system-packages`**, verification **`pytest`**):
|
||
|
||
→ **`docs/install.md`** — section **“Remote host setup”**
|
||
|
||
Power-specific behaviour (what discovery returns, INI format): **`docs/power-control-and-inventory.md`**.
|
||
|
||
### Imports
|
||
|
||
Preferred (re-exported from **`fiwicontrol.commands`**):
|
||
|
||
```python
|
||
import asyncio
|
||
from fiwicontrol.commands import Command, CommandManager, ssh_node
|
||
```
|
||
|
||
Equivalent submodule import (same symbols):
|
||
|
||
```python
|
||
from fiwicontrol.commands.node_control import Command, CommandManager, ssh_node
|
||
```
|
||
|
||
Install the package (editable: **`pip install -e .`** from the **FiWiControl** repo root; distribution name **`fiwicontrol`**). For **`pytest`** without installing, **`pythonpath = ["src"]`** is set in **`pyproject.toml`**.
|
||
|
||
### Create an `ssh_node` (rig / execution target)
|
||
|
||
**`ssh_node`** is a **class** in **`fiwicontrol.commands`**: one instance represents **one** reachable rig (OpenSSH or **`ush`**). It is not the package name; the package is **`fiwicontrol.commands`**.
|
||
|
||
**SSH (default)** — targets **`root@ipaddr`** with ControlMaster socket under **`/tmp/controlmasters_<ip>`** when **`ssh_controlmaster=True`** (default for **`rexec`** / **`Command`** sessions).
|
||
|
||
```python
|
||
node = ssh_node(
|
||
name="rack-pi",
|
||
ipaddr="192.168.1.39",
|
||
ssh_controlmaster=False, # optional; tests often disable for simplicity
|
||
silent_mode=False, # False: stream stdout/stderr lines to logging
|
||
)
|
||
```
|
||
|
||
**`ush`** instead of plain **`ssh`**:
|
||
|
||
```python
|
||
node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush")
|
||
```
|
||
|
||
**Jump host (`relay`)** — argv becomes **`ssh root@<relay>`** then inner **`ush`** or **`ssh`**.
|
||
|
||
```python
|
||
node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com")
|
||
```
|
||
|
||
Keep a **strong reference** to each node you care about; the class registry (**`WeakSet`**) only lists nodes still alive from the rest of your program (§3.3).
|
||
|
||
### One-shot remote command (`rexec`)
|
||
|
||
**`rexec`** builds an **`ssh_session`**, runs **`post_cmd`**, and returns the session. Output accumulates in **`session.results`** (bytes); line-oriented output also goes to the logger unless **`silent_mode=True`**.
|
||
|
||
```python
|
||
async def run_once():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
session = await node.rexec(
|
||
cmd="ls ~",
|
||
IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk)
|
||
CMD_TIMEOUT=30, # wall-clock cap for the remote command
|
||
CONNECT_TIMEOUT=20.0,
|
||
repeat=None, # omit or False: single subprocess lifecycle
|
||
)
|
||
text = session.results.decode("utf-8", errors="replace").strip()
|
||
print(text)
|
||
|
||
asyncio.run(run_once())
|
||
```
|
||
|
||
### Repeating commands (`Command`)
|
||
|
||
**`Command`** drives **separate** **`ssh_session.post_cmd`** invocations on a schedule (**`interval`** in seconds, event-loop deadline-based — §5.3).
|
||
|
||
- **`interval=None`** or **`<= 0`**: run **once** then stop.
|
||
- **`interval > 0`**: repeat until **`count`** is reached or **`stop()`**.
|
||
- **`count`**: finite integer, or **`Command.COUNT_FOREVER`** (**`-1`**) until **`stop()`**.
|
||
|
||
```python
|
||
async def poll_uname():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
|
||
await cmd.start()
|
||
await asyncio.wait_for(cmd.task, timeout=25.0)
|
||
print("runs:", cmd.run_count) # expect 10
|
||
|
||
asyncio.run(poll_uname())
|
||
```
|
||
|
||
Stop a long-running / infinite command:
|
||
|
||
```python
|
||
async def stop_forever():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER)
|
||
await cmd.start()
|
||
await asyncio.sleep(5)
|
||
await cmd.stop()
|
||
|
||
asyncio.run(stop_forever())
|
||
```
|
||
|
||
Optional **`log_file`**: each run’s output is appended under a banner. Per-run timeouts: **`io_timeout`**, **`cmd_timeout`**, **`connect_timeout`** (override node defaults).
|
||
|
||
### Many commands (`CommandManager`)
|
||
|
||
**`CommandManager`** assigns IDs, starts tasks, **removes finished commands** from **`list_active()`**, and records **exception/timeout** stops in **`list_abnormal_history()`**.
|
||
|
||
```python
|
||
async def managed():
|
||
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
|
||
mgr = CommandManager()
|
||
cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4)
|
||
cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4)
|
||
await asyncio.sleep(3)
|
||
print("active:", mgr.list_active())
|
||
print("abnormal:", mgr.list_abnormal_history())
|
||
await mgr.stop_all()
|
||
|
||
asyncio.run(managed())
|
||
```
|
||
|
||
### Parallel one-shots (`TaskGroup` / `gather`)
|
||
|
||
For several **independent** **`rexec`**-style calls, schedule them together:
|
||
|
||
```python
|
||
async def parallel():
|
||
a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False)
|
||
b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False)
|
||
|
||
async def one(node, cmd):
|
||
s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0)
|
||
return s.results.decode("utf-8", errors="replace")
|
||
|
||
async with asyncio.TaskGroup() as tg:
|
||
t1 = tg.create_task(one(a, "hostname"))
|
||
t2 = tg.create_task(one(b, "hostname"))
|
||
print(t1.result(), t2.result())
|
||
|
||
asyncio.run(parallel())
|
||
```
|
||
|
||
Use **`asyncio.gather(..., return_exceptions=True)`** when you want every outcome without **`TaskGroup`** fail-fast behavior.
|
||
|
||
### Consoles (`open_consoles` / `close_consoles`)
|
||
|
||
Class methods walk **`ssh_node.get_instances()`**, clean **`dmesg`** on SSH nodes, and optionally open long-lived ControlMaster **`dmesg -w`** streams (§3.4). Call only when your rig expects this pattern.
|
||
|
||
```python
|
||
await ssh_node.open_consoles(silent_mode=False)
|
||
# ... use nodes ...
|
||
await ssh_node.close_consoles()
|
||
```
|
||
|
||
**Tests:** see **[Running tests](#running-tests)** at the top of this document.
|
||
|
||
---
|
||
|
||
## 1. Goals
|
||
|
||
1. **Streaming with line buffering** — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish.
|
||
|
||
2. **Run-to-completion** — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3).
|
||
|
||
3. **Concurrent execution from a prepared set** — Build coroutines (e.g. several **`post_cmd`** calls), then run them **concurrently** under **`asyncio.TaskGroup`** (Python 3.11+). Use **`asyncio.gather`** when a simple “wait for all results” shape fits better.
|
||
|
||
4. **Scheduled / periodic execution** — Support one-shot and repeating commands through a single **`Command`** type with explicit lifecycle (`start`/`stop`) and fixed-period scheduling semantics.
|
||
|
||
---
|
||
|
||
## 2. Architectural overview
|
||
|
||
| Type | Responsibility |
|
||
|------|----------------|
|
||
| **`ssh_node`** (class) | Per-host configuration: identity, `ssh`/`ush` argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. **Does not** own subprocess I/O or interval scheduling. |
|
||
| **`ssh_session`** (class) | One SSH child process for a command or stream: **`post_cmd`**, **`close`**, **`SubprocessProtocol`** with **`pipe_data_received`**, timeouts, communicate-like completion. Reads settings from **`ssh_node`** (e.g. via attribute delegation). |
|
||
| **`SSHReaderProtocol`** (nested class on `ssh_session`) | Subprocess protocol callbacks: `connection_made`, `pipe_data_received`, `pipe_connection_lost`, `process_exited`, watchdog timer hooks. |
|
||
| **`Command`** (class) | Managed lifecycle for repeated or one-shot runs driven by **`ssh_node`** / **`ssh_session`**: **`interval`**, **`count`**, **`start`/`stop`**, log/sink attachment. Each execution uses **`ssh_session.post_cmd`**. |
|
||
| **`CommandManager`** (class) | Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). |
|
||
|
||
**Concurrent runs:** Prefer **`asyncio.TaskGroup`** for structured concurrency; **`asyncio.gather`** when appropriate.
|
||
|
||
**Per-line logging:** Use **`logging.Logger`** and/or a **callable** sink; a formal **`LineSink`** protocol type is optional and only worth adding if multiple implementations need a shared interface.
|
||
|
||
**Library asyncio rule:** Public APIs that do I/O are **`async`**. Coroutines and protocols use **`asyncio.get_running_loop()`**. The application (or **`asyncio.run`**) starts the event loop before calling into the library.
|
||
|
||
---
|
||
|
||
## 3. `ssh_node`
|
||
|
||
### 3.1 Role
|
||
|
||
| Concern | `ssh_node` |
|
||
|---------|------------|
|
||
| **`name`**, **`ipaddr`**, optional **`device`**, **`devip`** | Yes |
|
||
| **`sshtype`** (`ssh` / `ush`), **`relay`**, **`self.ssh` argv prefix | Yes |
|
||
| **`ssh_controlmaster`**, **`controlmasters`** (SSH only; off for `ush`) | Yes |
|
||
| **`ssh_console_session`**, console task references | Yes (bookkeeping) |
|
||
| Subprocess streaming / **`pipe_data_received`** | **No** — **`ssh_session`** |
|
||
| Interval scheduling, periodic task lists | **No** — **`Command`** (callers hold **`Command`** instances and **`Task`** handles) |
|
||
|
||
### 3.2 Constructor parameters
|
||
|
||
- **`name`**, **`ipaddr`**: label and target for argv assembly (`root@…` applied when building the remote command).
|
||
- **`relay`**: optional jump host; argv begins with `ssh root@<relay>` then inner `ush` or `ssh`.
|
||
- **`ssh_controlmaster`**: enable ControlMaster path for `ssh`; forced off for `ush`.
|
||
- **`silent_mode`**: passed into sessions for line logging behavior.
|
||
- **`console`**: optional rig-facing flag.
|
||
|
||
### 3.3 Registry
|
||
|
||
Instances register in a class-level **`WeakSet`** (`instances`). **`get_instances()`** returns a snapshot of nodes that are **still alive** from the rest of the program’s perspective.
|
||
|
||
**`WeakSet` properties:**
|
||
|
||
- The registry supports **enumeration** (e.g. **`open_consoles`** over nodes the application is using). **`WeakSet`** stores **weak references** only: node **lifetime** follows **strong references** held by the application; when those go away, the node may be **collected** and **disappears from `instances`** automatically.
|
||
- **`get_instances()`** therefore reflects nodes that are **still strongly referenced** elsewhere (variables, rig object, collections).
|
||
- Callers keep a **strong reference** to each node they care about for as long as that node should appear in global enumeration.
|
||
|
||
**Alternative pattern:** an explicit **list of nodes** passed into orchestration functions also works; **`WeakSet`** fits a **central index** without that list becoming the sole long-lived owner of every node.
|
||
|
||
### 3.4 Class-level orchestration
|
||
|
||
**`open_consoles`**, **`close_consoles`**: multi-node cleanup (`pkill dmesg`), ControlMaster + long-lived console streams. **`async`**; require a running loop.
|
||
|
||
Stopping scheduled work: callers **`await cmd.stop()`** and/or **`task.cancel()`** on the **`Task`** from **`Command.start()`** (exact API TBD).
|
||
|
||
### 3.5 Instance methods
|
||
|
||
| Method | Behavior |
|
||
|--------|----------|
|
||
| **`rexec`** | Constructs **`ssh_session`**, runs **`post_cmd`** with IO / command / connect timeouts. To run many **`rexec`**-style operations later in parallel, use **`asyncio.TaskGroup`** (or **`gather`** if that shape fits). |
|
||
| **`clean`** | **`async`**: remote cleanup (e.g. `pkill dmesg`) via a one-off subprocess. |
|
||
| **`close_console`** | **`async`**: tear down **`ssh_console_session`**. |
|
||
|
||
Scheduled / interval execution: **`Command`** (§5). **`ssh_node`**: identity, transport, registry, console bookkeeping.
|
||
|
||
### 3.6 Optional utilities
|
||
|
||
Thin **`async` helpers** (e.g. sleep delays) may exist as **module-level functions** when useful.
|
||
|
||
---
|
||
|
||
## 4. `ssh_session`
|
||
|
||
### 4.1 Role
|
||
|
||
One logical SSH subprocess: spawn via **`loop.subprocess_exec`** with a **`SubprocessProtocol`**, stream stdout/stderr through **`pipe_data_received`**, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion.
|
||
|
||
### 4.2 Relationship to `ssh_node`
|
||
|
||
The session holds a reference to **`ssh_node`** and delegates missing attributes (e.g. **`ipaddr`**, **`ssh`**, **`controlmasters`**) for argv and ControlMaster paths.
|
||
|
||
### 4.3 Operations
|
||
|
||
- **`post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)`** — Build full **`ssh`** argv (including **`-o ControlPath`** / master setup when **`control_master`**). A **`repeat`** flag that loops **inside one subprocess lifecycle** is distinct from **`Command`’s** separate invocations on a schedule (§5).
|
||
- **`close`** — Terminate console / master as required (**`async`**).
|
||
|
||
### 4.4 Inner protocol
|
||
|
||
**`SSHReaderProtocol`** (name may match implementation): implements **`pipe_data_received`**, **`pipe_connection_lost`**, **`process_exited`**, watchdog timers via **`get_running_loop().call_later`**, and optional **`LoggerAdapter`** for prefixed logs.
|
||
|
||
---
|
||
|
||
## 5. `Command`
|
||
|
||
### 5.1 Name and scope
|
||
|
||
The type is **`Command`** (or **`SshCommand`** if `Command` collides in consumer code). SSH is the default transport; **repetition** is modeled with an **`interval`** field on the same type.
|
||
|
||
### 5.2 Responsibilities
|
||
|
||
| Concern | Owner |
|
||
|--------|--------|
|
||
| Remote command string / argv fragment | **`Command`** |
|
||
| **`interval`**: `None` or `0` → one execution per start/stop (or defined one-shot semantics); `> 0` → period between scheduled fires | **`Command`** |
|
||
| **`count`**: finite run count or **`Command.COUNT_FOREVER`** for never-ending | **`Command`** |
|
||
| Optional **`initial_delay`**, jitter | **`Command`** |
|
||
| **`start()` / `stop()`**, background **`Task`**, cancellation, log flush | **`Command`** |
|
||
| Log / line output target | **`Command`** — typically **`logging.Logger`** or a **callable**; formal **`LineSink`** protocol optional |
|
||
| **Completion timer** (wall-clock cap per run) | **`Command`** — default passed into each **`post_cmd`** as **`CMD_TIMEOUT`** (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. |
|
||
| **I/O idle timer** | **`Command`** — default passed into each **`post_cmd`** as **`IO_TIMEOUT`**: max time **without** stdout/stderr bytes; timer resets on **`pipe_data_received`**. Catches “stuck but still running” cases where the process produces no output. |
|
||
| Subprocess + protocol | **`ssh_session`** only — each tick **`await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)`**; timers enforced inside **`SSHReaderProtocol`** |
|
||
|
||
**Timers on `Command`:** **`IO_TIMEOUT`** and **`CMD_TIMEOUT`** address **idle output** vs **overall run length**. **`Command`** supplies defaults and passes them into each **`post_cmd`**; **`SSHReaderProtocol`** implements the watchdogs.
|
||
|
||
**Split of roles:** **`Command`** schedules runs and timer values; **`ssh_session`** runs **`post_cmd`** and owns **`pipe_data_received`** streaming.
|
||
|
||
### 5.3 Scheduling repeats
|
||
|
||
Repeating **`Command`** uses **event-loop time** so each **period** is measured **between scheduled fire times**, independent of how long **`post_cmd`** took:
|
||
|
||
1. **Deadline-based:** at tick start, **`next_fire = loop.time() + period`**. After **`await post_cmd(...)`**, wait **`max(0.0, next_fire - loop.time())`** (or equivalent **`Event.wait`** with timeout). Document behavior when **`post_cmd`** exceeds **`period`** (immediate next run vs align to next **`next_fire`**).
|
||
2. **`loop.call_at` / `call_later`:** schedule the next tick at **`loop.time() + period`** (or the computed deadline).
|
||
|
||
The wait between ticks is the time remaining until **`next_fire`** (or the next **`call_at`** / **`call_later`** fire), so the **period** stays anchored to the event-loop clock regardless of **`post_cmd`** duration.
|
||
|
||
### 5.4 `Command` vs bare `Task`
|
||
|
||
A coroutine **`run_command_loop(...)`** with deadline scheduling plus **`asyncio.create_task`** is enough for callers who want minimal API surface. **`Command`** is optional sugar for **`start`/`stop`**, named fields, and encapsulation — add it when that ergonomics is worth the type.
|
||
|
||
### 5.5 API sketch
|
||
|
||
```text
|
||
cmd = Command(node, cmd="...", interval=5.0, line_sink=...)
|
||
cmd = Command(node, cmd="...", interval=None, line_sink=...)
|
||
await cmd.start()
|
||
await cmd.stop()
|
||
```
|
||
|
||
### 5.6 Command manager
|
||
|
||
`CommandManager` supports dynamic add/remove of commands while the system is running:
|
||
|
||
```text
|
||
manager = CommandManager()
|
||
cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10)
|
||
await manager.stop_command(cid)
|
||
await manager.stop_all()
|
||
active = manager.list_active()
|
||
history = manager.list_abnormal_history()
|
||
```
|
||
|
||
Self-cleaning behavior:
|
||
- completed commands are removed from active state automatically.
|
||
- abnormal stop records (exception/timeout) are retained in history.
|
||
|
||
---
|
||
|
||
## 6. Concurrent execution (`TaskGroup` / `gather`)
|
||
|
||
**Requires Python 3.11+** for **`TaskGroup`** as documented here.
|
||
|
||
- **Preparation:** build coroutines (e.g. **`session.post_cmd("cmd", ...)`** for several sessions).
|
||
- **Execution (preferred):** **`async with asyncio.TaskGroup() as tg:`** then **`tg.create_task(coro)`** for each coroutine. **`TaskGroup`** cancels siblings on first failure (structured concurrency).
|
||
- **Alternative:** **`await asyncio.gather(*coros, return_exceptions=True)`** when callers want all outcomes without **`TaskGroup`** fail-fast behavior.
|
||
|
||
One top-level **`await`** runs the whole set concurrently.
|
||
|
||
---
|
||
|
||
## 7. Functional requirements
|
||
|
||
| ID | Requirement |
|
||
|----|-------------|
|
||
| R1 | Line-oriented streaming: decode, buffer, split on `\n`, optional final fragment on EOF (§10.3). |
|
||
| R2 | Central logging via **logger and/or callable**; supports shared and per-session targets. |
|
||
| R3 | Completion: process ended, stdin closed if used, pipes EOF’d, line buffers flushed (§9.2, §10). |
|
||
| R4 | Prepared coroutines run **concurrently** via **`asyncio.TaskGroup`** (or **`gather`** when that shape fits); **Python 3.11+**. |
|
||
| R5 | **`ush`**, **`ssh`**, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. |
|
||
|
||
---
|
||
|
||
## 8. Non-functional requirements
|
||
|
||
| ID | Requirement |
|
||
|----|-------------|
|
||
| N1 | Library I/O runs under **`asyncio.get_running_loop()`**; programs start work with **`asyncio.run`** or an application event loop. |
|
||
| N2 | **Python 3.11+** for implementation and documentation examples. |
|
||
| N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. |
|
||
| N4 | Cancelling a command **`Task`** tears down subprocess and protocol cleanly. |
|
||
|
||
---
|
||
|
||
## 9. Line sink and completion
|
||
|
||
### 9.1 Per-line output
|
||
|
||
Use **`logging.Logger`**, a **callable** `(stream, line) -> None`, or both. Normalize **`\r\n`**, strip **`\r`** for display. A dedicated **`LineSink`** **protocol** is optional sugar.
|
||
|
||
### 9.2 Communicate-like completion
|
||
|
||
Completion means:
|
||
|
||
1. Subprocess has exited (return code available).
|
||
2. Stdout and stderr reached EOF (**`pipe_connection_lost`** for fds 1 and 2 as used).
|
||
3. Protocol line buffers flushed (**§10.3**); **`process_exited`** handled; completion event or future set.
|
||
|
||
---
|
||
|
||
## 10. Streaming: `SubprocessProtocol` and `pipe_data_received`
|
||
|
||
The SSH streaming path uses **`loop.subprocess_exec`** with a **`SubprocessProtocol`**. **Line assembly happens in `pipe_data_received(fd, data)`** on arbitrary byte chunks from the transport.
|
||
|
||
### 10.1 Pipeline
|
||
|
||
| Stage | Mechanism |
|
||
|--------|-----------|
|
||
| Ingress | **`pipe_data_received(fd, data)`** — `fd` 1 = stdout, 2 = stderr |
|
||
| Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable |
|
||
| Line split | On `\n`, invoke **`on_line`** → logger / callable |
|
||
| EOF | **`pipe_connection_lost`**: flush trailing fragment per §10.3 |
|
||
| Done | **`process_exited`** + both pipes closed + internal finished state → complete await |
|
||
|
||
### 10.2 Transport vs logical lines
|
||
|
||
**Ingress:** **`pipe_data_received`** delivers **byte chunks** (fragments and coalesced data). **Line boundaries** are applied in the protocol buffer: accumulate, split on **`\n`**, emit one logical line per **`on_line`** (Tcl **`gets`**-style delivery to parsers and loggers).
|
||
|
||
### 10.3 EOF / trailing fragment
|
||
|
||
| Mode | Behavior |
|
||
|------|----------|
|
||
| **Default** | If buffer non-empty at EOF, emit one final line without requiring trailing `\n`. |
|
||
| **Strict** | Drop or tag partial trailing line — configurable. |
|
||
|
||
### 10.4 Tcl-aligned behavior
|
||
|
||
- **Input:** after internal buffering, **`on_line`** receives **complete lines** (same idea as Tcl **`gets`** on a readable channel).
|
||
- **Output (Tcl `fconfigure -buffering line`):** In Tcl, **line-buffered** mode on a channel **flushes outgoing data when a newline is written** (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when **writing to the subprocess stdin** (if used), apply the same idea: **flush after each line** (writes ending with **`\n`**, or explicit flush) so the peer sees output without waiting for a full stdio buffer.
|
||
|
||
---
|
||
|
||
## 11. Timeouts
|
||
|
||
| Timeout | Semantics |
|
||
|---------|-----------|
|
||
| **Connect** | From spawn until **`connection_made`**; cancel timer when connected. |
|
||
| **Command (wall)** | Maximum lifetime; cancel watchdogs and **`terminate`** transport. |
|
||
| **IO idle** | Reset on each **`pipe_data_received`**; on fire, **`terminate`**. |
|
||
|
||
If IO idle fires, completion still runs after terminate + drain (output may be partial).
|
||
|
||
---
|
||
|
||
## 12. Central log file
|
||
|
||
Use **`logging`** with a **`FileHandler`**, or a **callable** that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional **`asyncio.Queue`** plus a single writer task.
|
||
|
||
---
|
||
|
||
## 13. API sketch (illustrative names)
|
||
|
||
```text
|
||
async with asyncio.TaskGroup() as tg:
|
||
tg.create_task(session_a.post_cmd("cmd1", ...))
|
||
tg.create_task(session_b.post_cmd("cmd2", ...))
|
||
|
||
cmd = Command(node, cmd="...", interval=10.0, line_sink=...)
|
||
await cmd.start()
|
||
await cmd.stop()
|
||
```
|
||
|
||
---
|
||
|
||
## 14. Open decisions
|
||
|
||
1. Formal **`LineSink`** protocol vs **logger + callable** only.
|
||
2. **stderr:** merged to stdout vs separate channels.
|
||
3. Public name **`Command`** vs **`SshCommand`**.
|
||
4. Whether **`Command`** ships in v1 or callers use **`create_task(run_command_loop(...))`** only.
|
||
|
||
---
|
||
|
||
## 15. Specification checklist
|
||
|
||
- [ ] Line emission, EOF, and CR handling documented in implementation.
|
||
- [ ] Concurrent execution documented for **Python 3.11+** with **`TaskGroup`** (and **`gather`** where noted).
|
||
- [ ] Completion = process exit + pipe EOF + buffer flush.
|
||
- [ ] Central logging path documented.
|
||
- [ ] Timeout and cancellation behavior documented.
|
||
- [ ] Repeating **`Command`** scheduling uses **deadline-based** waits or **`call_at`/`call_later`** (§5.3).
|
||
- [ ] Automated tests cover the scenarios in **§16**.
|
||
|
||
---
|
||
|
||
## 16. Test cases
|
||
|
||
This section lists **behaviors to verify** in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints.
|
||
|
||
### 16.1 `ssh_session` / `post_cmd` / completion
|
||
|
||
- **Happy path:** Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown.
|
||
- **Non-zero exit:** Process exits with failure; completion still reached; return code or error path is observable as designed.
|
||
- **Communicate-like barrier:** Completion does not report done until **`process_exited`** and pipe **`pipe_connection_lost`** semantics are satisfied (§9.2).
|
||
|
||
### 16.2 Streaming and line buffering (`pipe_data_received`)
|
||
|
||
- **Chunked input:** Data arriving as multiple arbitrary chunks still yields **correct line splits** on **`\n`**.
|
||
- **CR / CRLF:** **`\r\n`** and trailing **`\r`** handling matches §10.2 / §10.3.
|
||
- **EOF partial line:** Buffer non-empty at EOF emits **one final line** under default policy (§10.3).
|
||
- **Stdout vs stderr:** If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration.
|
||
|
||
### 16.3 Timeouts
|
||
|
||
- **Connect timeout:** Fires when transport does not reach connected state in time; subprocess is terminated; completion reached.
|
||
- **Command (wall / `CMD_TIMEOUT`):** Long-running command is terminated at wall limit; completion reached.
|
||
- **IO idle (`IO_TIMEOUT`):** No **`pipe_data_received`** for the idle interval triggers terminate; completion reached (output may be partial) (§11).
|
||
|
||
### 16.4 `Command` (scheduled execution)
|
||
|
||
- **One-shot (`interval` unset / zero):** **`start`** runs at most one **`post_cmd`** cycle per lifecycle (per defined semantics); **`stop`** ends cleanly.
|
||
- **Repeating (`interval` > 0):** Multiple invocations occur; **period** between **scheduled** fires follows **deadline** or **`call_at`** rules (§5.3), not “sleep N after work” drift.
|
||
- **Finite count:** `count=N` executes exactly N runs.
|
||
- **Never-ending:** `count=Command.COUNT_FOREVER` runs until explicit stop/cancel.
|
||
- **Timers on `Command`:** **`IO_TIMEOUT`** and **`CMD_TIMEOUT`** passed through to each **`post_cmd`** invocation behave as in §16.3.
|
||
- **Stop / cancel:** **`stop()`** or **`Task`** cancellation ends the loop; no leaked tasks or open log handles.
|
||
|
||
### 16.4.1 `CommandManager`
|
||
|
||
- **Add + self-clean:** Added finite commands appear in `list_active()` then auto-remove on normal completion.
|
||
- **Stop one / stop all:** Explicit stop removes active entries.
|
||
- **Abnormal history:** exception/timeout stops are retained in `list_abnormal_history()`.
|
||
|
||
### 16.5 Concurrent execution (Python 3.11+)
|
||
|
||
- **`TaskGroup`:** Multiple **`post_cmd`** (or equivalent) tasks run **overlapped**; first failure cancels siblings if using default **`TaskGroup`** semantics.
|
||
- **`gather` (when used):** **`return_exceptions=True`** path collects all results without aborting early, if that pattern is supported in helpers.
|
||
|
||
### 16.6 `ssh_node` / registry
|
||
|
||
- **`WeakSet`:** After the application drops its last strong reference to a node, the node **disappears** from **`get_instances()`** following collection (§3.3).
|
||
- **Strong reference:** While the app holds a node, it appears in **`get_instances()`** when enumeration runs.
|
||
|
||
### 16.7 Transport configuration
|
||
|
||
- **`ssh` vs `ush`:** Argv shape and **`ssh_controlmaster`** interaction match §3 / §4 (as far as test doubles allow).
|
||
- **Relay:** Jump-host argv prefix is built correctly when **`relay`** is set.
|
||
|
||
### 16.8 Logging / sink
|
||
|
||
- **Per-line delivery:** Logger or callable receives **one line at a time** after buffering (§9.1, §10.2).
|
||
- **Central vs per-session:** If both modes exist, each receives the expected lines (no cross-contamination unless designed).
|
||
|
||
### 16.9 Consoles / ControlMaster (if exercised in test suite)
|
||
|
||
- **`open_consoles` / `close_consoles`:** Lifecycle can be covered with integration tests or marked **optional** when real hardware or SSH fixtures are unavailable.
|
||
|
||
---
|
||
|
||
Implemented coverage reference: `tests/test_node_control.py`
|
||
|
||
*End of design document.*
|