# Design: `fiwicontrol.commands` — asyncio execution (SSH / `ush`) **Status:** Current **Package:** `**fiwicontrol.commands`** (FiWiControl repository). **Implementation file:** `**src/fiwicontrol/commands/node_control.py`**. The main entry type is `**ssh_node**` — SSH and `**ush**` transports to a rig. **Scope:** Asyncio-based SSH/`ush` rig control: `**ssh_node`**, `**ssh_session**`, `**Command**`, `**CommandManager**`, concurrent execution via `**asyncio**`, line-oriented streaming, and logging. **Language:** **Python 3.11+** (stdlib `**asyncio.TaskGroup`**, `**asyncio.timeout**`, and related APIs assumed available). **Remote access:** For `**sshtype="ssh"`**, **passwordless SSH is required**. Commands are executed via `**ssh`** without an interactive TTY, so **key-based authentication** (or any non-interactive auth that does not prompt) must already work for `**root@`** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits. --- ## Running tests Integration tests for `**node_control.py**` live in `**tests/test_node_control.py**`. They open real `**ssh**` sessions to `**root@$FIWI_REMOTE_IP**` (default `**192.168.1.39**`). **Passwordless SSH must work** for that host. Tests are **opt-in**: set `**FIWI_RUN_REMOTE_TESTS=1`** or pytest **skips** the module (so default CI / laptop runs do not require a rig). **From a terminal** (repository root — e.g. `**~/Code/FiWiControl`**): ```bash cd ~/Code/FiWiControl export FIWI_RUN_REMOTE_TESTS=1 # Optional: target a different host (default is 192.168.1.39 if unset) export FIWI_REMOTE_IP=192.168.1.39 # Verbose pytest (recommended) python3 -m pytest tests/test_node_control.py -v # Quiet summary only python3 -m pytest tests/test_node_control.py -q ``` **Without pytest**, the file is also a `**unittest`** script (it sets `**FIWI_RUN_REMOTE_TESTS=1**` if unset, so you do not need that variable for a direct script run): ```bash cd ~/Code/FiWiControl python tests/test_node_control.py --remote-ip 192.168.1.39 # More asyncio logging: python tests/test_node_control.py --remote-ip 192.168.1.39 --debug ``` `**pytest**` picks up `**pythonpath = ["src"]**` from `**pyproject.toml**`, so you do **not** need `**PYTHONPATH=src`** when you run from the repo root as above. After `**pip install -e .**`, imports resolve the same way. **Example successful output** (line order and timings vary; captured with `**python3 -m pytest tests/test_node_control.py -v`** on Linux): ```text ============================= test session starts ============================== platform linux -- Python 3.13.11, pytest-9.0.3, pluggy-1.6.0 -- /usr/bin/python3 cachedir: .pytest_cache rootdir: /home/.../FiWiControl configfile: pyproject.toml plugins: anyio-4.8.0 collecting ... collected 6 items tests/test_node_control.py::TestRemoteSingleRunCommands::test_dmesg_tail PASSED [ 16%] tests/test_node_control.py::TestRemoteSingleRunCommands::test_ls_home_directory PASSED [ 33%] tests/test_node_control.py::TestRemoteRepeatingCommands::test_command_manager_add_and_self_clean PASSED [ 50%] tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_01_uname_r_every_1s_10_times PASSED [ 66%] tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_02_pwd_and_ls_every_1s_5_times PASSED [ 83%] tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_03_forever_can_be_stopped PASSED [100%] ============================== 6 passed in 16.47s ============================== ``` **Fedora / `ssh_config.d`:** pytest loads `**tests/conftest.py`**, which `**setdefault`s** `**FIWI_SSH_CONFIG`** to `**tests/fixtures/ssh_config_minimal**` when that file exists so `**ssh**` is invoked as `**ssh -F …**` and does not read broken system drop-ins under `**/etc/ssh/ssh_config.d/**`. Override with your own `**FIWI_SSH_CONFIG**` if needed. For manual `**ssh**`, export the same variable or fix system file permissions (see `**docs/install.md**`). If SSH is not configured, you will see failures (connection refused, auth errors, or timeouts) instead of `**PASSED**`. **Package import smoke** (no network; optional): ```bash cd ~/Code/FiWiControl python3 -m pytest tests/test_package_layout.py -q ``` --- ## How to use (examples) Implementation lives in `**src/fiwicontrol/commands/node_control.py**`. All remote I/O is **async** — call from a coroutine under `**asyncio.run()`** or your application event loop. ### Prerequisites (remote targets) 1. **Passwordless SSH** — From the machine running this code, `**ssh root@ true`** (or equivalent) must succeed **without** typing a password. Configure `**~/.ssh/authorized_keys`** on the target and use an unencrypted agent key or `**ssh-agent**` as appropriate for your environment. For `**sshtype="ssh"**`, `**ssh_session.post_cmd**` always passes an explicit `**root@**` ( `**ssh_session.user**` defaults to `**root**`), not a bare host, so the remote user is never your workstation login name. 2. Network reachability and correct `**ipaddr**` (or hostname) on the node. 3. Optional: `**BatchMode=yes**` in `**ssh_config**` for fail-fast behavior when keys are missing (recommended for automation). ### Remote host: Python on the rig (power / discovery) `**ssh_node.rexec**` runs shell commands on the rig over the **same** non-interactive `**ssh root@`** path as the prerequisites above. Anything you run remotely — including `**python3 -m fiwicontrol.power --discovery-json**` for inventory or USB discovery — needs that **Python environment installed on the node**, not only on your laptop. **Full checklist** (SSH keys, `**git clone`** on the Pi, `**python3 -m fiwicontrol.commands …**`, PEP 668 / `**--break-system-packages**`, verification `**pytest**`): → `**docs/install.md**` — section **“Remote host setup”** Power-specific behaviour (what discovery returns, INI format): `**docs/power-control-and-inventory.md`**. ### Imports Preferred (re-exported from `**fiwicontrol.commands**`): ```python import asyncio from fiwicontrol.commands import Command, CommandManager, ssh_node ``` Equivalent submodule import (same symbols): ```python from fiwicontrol.commands.node_control import Command, CommandManager, ssh_node ``` Install the package (editable: `**pip install -e .**` from the **FiWiControl** repo root; distribution name `**fiwicontrol`**). For `**pytest**` without installing, `**pythonpath = ["src"]**` is set in `**pyproject.toml**`. ### Create an `ssh_node` (rig / execution target) `**ssh_node**` is a **class** in `**fiwicontrol.commands`**: one instance represents **one** reachable rig (OpenSSH or `**ush`**). It is not the package name; the package is `**fiwicontrol.commands**`. **SSH (default)** — targets `**root@ipaddr`** with ControlMaster socket under `**/tmp/controlmasters_**` when `**ssh_controlmaster=True**` (default for `**rexec**` / `**Command**` sessions). ```python node = ssh_node( name="rack-pi", ipaddr="192.168.1.39", ssh_controlmaster=False, # optional; tests often disable for simplicity silent_mode=False, # False: stream stdout/stderr lines to logging ) ``` `**ush**` instead of plain `**ssh**`: ```python node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush") ``` **Jump host (`relay`)** — argv becomes `**ssh root@`** then inner `**ush**` or `**ssh**`. ```python node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com") ``` Keep a **strong reference** to each node you care about; the class registry (`**WeakSet`**) only lists nodes still alive from the rest of your program (§3.3). ### One-shot remote command (`rexec`) `**rexec**` builds an `**ssh_session**`, runs `**post_cmd**`, and returns the session. Output accumulates in `**session.results**` (bytes); line-oriented output also goes to the logger unless `**silent_mode=True**`. ```python async def run_once(): node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False) session = await node.rexec( cmd="ls ~", IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk) CMD_TIMEOUT=30, # wall-clock cap for the remote command CONNECT_TIMEOUT=20.0, repeat=None, # omit or False: single subprocess lifecycle ) text = session.results.decode("utf-8", errors="replace").strip() print(text) asyncio.run(run_once()) ``` ### Repeating commands (`Command`) `**Command**` drives **separate** `**ssh_session.post_cmd`** invocations on a schedule (`**interval**` in seconds, event-loop deadline-based — §5.3). - `**interval=None**` or `**<= 0**`: run **once** then stop. - `**interval > 0`**: repeat until `**count**` is reached or `**stop()**`. - `**count**`: finite integer, or `**Command.COUNT_FOREVER**` (`**-1**`) until `**stop()**`. ```python async def poll_uname(): node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False) cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10) await cmd.start() await asyncio.wait_for(cmd.task, timeout=25.0) print("runs:", cmd.run_count) # expect 10 asyncio.run(poll_uname()) ``` Stop a long-running / infinite command: ```python async def stop_forever(): node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False) cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER) await cmd.start() await asyncio.sleep(5) await cmd.stop() asyncio.run(stop_forever()) ``` Optional `**log_file**`: each run’s output is appended under a banner. Per-run timeouts: `**io_timeout**`, `**cmd_timeout**`, `**connect_timeout**` (override node defaults). ### Many commands (`CommandManager`) `**CommandManager**` assigns IDs, starts tasks, **removes finished commands** from `**list_active()`**, and records **exception/timeout** stops in `**list_abnormal_history()`**. ```python async def managed(): node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False) mgr = CommandManager() cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4) cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4) await asyncio.sleep(3) print("active:", mgr.list_active()) print("abnormal:", mgr.list_abnormal_history()) await mgr.stop_all() asyncio.run(managed()) ``` ### Parallel one-shots (`TaskGroup` / `gather`) For several **independent** `**rexec`**-style calls, schedule them together: ```python async def parallel(): a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False) b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False) async def one(node, cmd): s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0) return s.results.decode("utf-8", errors="replace") async with asyncio.TaskGroup() as tg: t1 = tg.create_task(one(a, "hostname")) t2 = tg.create_task(one(b, "hostname")) print(t1.result(), t2.result()) asyncio.run(parallel()) ``` Use `**asyncio.gather(..., return_exceptions=True)**` when you want every outcome without `**TaskGroup**` fail-fast behavior. ### Consoles (`open_consoles` / `close_consoles`) Class methods walk `**ssh_node.get_instances()**`, clean `**dmesg**` on SSH nodes, and optionally open long-lived ControlMaster `**dmesg -w**` streams (§3.4). Call only when your rig expects this pattern. ```python await ssh_node.open_consoles(silent_mode=False) # ... use nodes ... await ssh_node.close_consoles() ``` **Tests:** see **[Running tests](#running-tests)** at the top of this document. --- ## 1. Goals 1. **Streaming with line buffering** — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish. 2. **Run-to-completion** — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3). 3. **Concurrent execution from a prepared set** — Build coroutines (e.g. several `**post_cmd`** calls), then run them **concurrently** under `**asyncio.TaskGroup`** (Python 3.11+). Use `**asyncio.gather**` when a simple “wait for all results” shape fits better. 4. **Scheduled / periodic execution** — Support one-shot and repeating commands through a single `**Command**` type with explicit lifecycle (`start`/`stop`) and fixed-period scheduling semantics. --- ## 2. Architectural overview | Type | Responsibility | | ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `**ssh_node**` (class) | Per-host configuration: identity, `ssh`/`ush` argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. **Does not** own subprocess I/O or interval scheduling. | | `**ssh_session**` (class) | One SSH child process for a command or stream: `**post_cmd**`, `**close**`, `**SubprocessProtocol**` with `**pipe_data_received**`, timeouts, communicate-like completion. Reads settings from `**ssh_node**` (e.g. via attribute delegation). | | `**SSHReaderProtocol**` (nested class on `ssh_session`) | Subprocess protocol callbacks: `connection_made`, `pipe_data_received`, `pipe_connection_lost`, `process_exited`, watchdog timer hooks. | | `**Command**` (class) | Managed lifecycle for repeated or one-shot runs driven by `**ssh_node**` / `**ssh_session**`: `**interval**`, `**count**`, `**start`/`stop**`, log/sink attachment. Each execution uses `**ssh_session.post_cmd**`. | | `**CommandManager**` (class) | Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). | **Concurrent runs:** Prefer `**asyncio.TaskGroup**` for structured concurrency; `**asyncio.gather**` when appropriate. **Per-line logging:** Use `**logging.Logger**` and/or a **callable** sink; a formal `**LineSink**` protocol type is optional and only worth adding if multiple implementations need a shared interface. **Library asyncio rule:** Public APIs that do I/O are `**async**`. Coroutines and protocols use `**asyncio.get_running_loop()**`. The application (or `**asyncio.run**`) starts the event loop before calling into the library. --- ## 3. `ssh_node` ### 3.1 Role | Concern | `ssh_node` | | ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | | `**name**`, `**ipaddr**`, optional `**device**`, `**devip**` | Yes | | `**sshtype**` (`ssh` / `ush`), `**relay**`, `**self.ssh` argv prefix | Yes | | `**ssh_controlmaster**`, `**controlmasters**` (SSH only; off for `ush`) | Yes | | `**ssh_console_session**`, console task references | Yes (bookkeeping) | | Subprocess streaming / `**pipe_data_received**` | **No** — `**ssh_session`** | | Interval scheduling, periodic task lists | **No** — `**Command`** (callers hold `**Command**` instances and `**Task**` handles) | ### 3.2 Constructor parameters - `**name**`, `**ipaddr**`: label and target for argv assembly (`root@…` applied when building the remote command). - `**relay**`: optional jump host; argv begins with `ssh root@` then inner `ush` or `ssh`. - `**ssh_controlmaster**`: enable ControlMaster path for `ssh`; forced off for `ush`. - `**silent_mode**`: passed into sessions for line logging behavior. - `**console**`: optional rig-facing flag. ### 3.3 Registry Instances register in a class-level `**WeakSet**` (`instances`). `**get_instances()**` returns a snapshot of nodes that are **still alive** from the rest of the program’s perspective. `**WeakSet` properties:** - The registry supports **enumeration** (e.g. `**open_consoles`** over nodes the application is using). `**WeakSet**` stores **weak references** only: node **lifetime** follows **strong references** held by the application; when those go away, the node may be **collected** and **disappears from `instances`** automatically. - `**get_instances()**` therefore reflects nodes that are **still strongly referenced** elsewhere (variables, rig object, collections). - Callers keep a **strong reference** to each node they care about for as long as that node should appear in global enumeration. **Alternative pattern:** an explicit **list of nodes** passed into orchestration functions also works; `**WeakSet`** fits a **central index** without that list becoming the sole long-lived owner of every node. ### 3.4 Class-level orchestration `**open_consoles`**, `**close_consoles**`: multi-node cleanup (`pkill dmesg`), ControlMaster + long-lived console streams. `**async**`; require a running loop. Stopping scheduled work: callers `**await cmd.stop()**` and/or `**task.cancel()**` on the `**Task**` from `**Command.start()**` (exact API TBD). ### 3.5 Instance methods | Method | Behavior | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `**rexec**` | Constructs `**ssh_session**`, runs `**post_cmd**` with IO / command / connect timeouts. To run many `**rexec**`-style operations later in parallel, use `**asyncio.TaskGroup**` (or `**gather**` if that shape fits). | | `**clean**` | `**async**`: remote cleanup (e.g. `pkill dmesg`) via a one-off subprocess. | | `**close_console**` | `**async**`: tear down `**ssh_console_session**`. | Scheduled / interval execution: `**Command**` (§5). `**ssh_node**`: identity, transport, registry, console bookkeeping. ### 3.6 Optional utilities Thin `**async` helpers** (e.g. sleep delays) may exist as **module-level functions** when useful. --- ## 4. `ssh_session` ### 4.1 Role One logical SSH subprocess: spawn via `**loop.subprocess_exec`** with a `**SubprocessProtocol**`, stream stdout/stderr through `**pipe_data_received**`, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion. ### 4.2 Relationship to `ssh_node` The session holds a reference to `**ssh_node**` and delegates missing attributes (e.g. `**ipaddr**`, `**ssh**`, `**controlmasters**`) for argv and ControlMaster paths. ### 4.3 Operations - `**post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)**` — Build full `**ssh**` argv (including `**-o ControlPath**` / master setup when `**control_master**`). A `**repeat**` flag that loops **inside one subprocess lifecycle** is distinct from `**Command`’s** separate invocations on a schedule (§5). - `**close`** — Terminate console / master as required (`**async**`). ### 4.4 Inner protocol `**SSHReaderProtocol**` (name may match implementation): implements `**pipe_data_received**`, `**pipe_connection_lost**`, `**process_exited**`, watchdog timers via `**get_running_loop().call_later**`, and optional `**LoggerAdapter**` for prefixed logs. --- ## 5. `Command` ### 5.1 Name and scope The type is `**Command**` (or `**SshCommand**` if `Command` collides in consumer code). SSH is the default transport; **repetition** is modeled with an `**interval`** field on the same type. ### 5.2 Responsibilities | Concern | Owner | | ------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | Remote command string / argv fragment | `**Command**` | | `**interval**`: `None` or `0` → one execution per start/stop (or defined one-shot semantics); `> 0` → period between scheduled fires | `**Command**` | | `**count**`: finite run count or `**Command.COUNT_FOREVER**` for never-ending | `**Command**` | | Optional `**initial_delay**`, jitter | `**Command**` | | `**start()` / `stop()**`, background `**Task**`, cancellation, log flush | `**Command**` | | Log / line output target | `**Command**` — typically `**logging.Logger**` or a **callable**; formal `**LineSink`** protocol optional | | **Completion timer** (wall-clock cap per run) | `**Command`** — default passed into each `**post_cmd**` as `**CMD_TIMEOUT**` (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. | | **I/O idle timer** | `**Command`** — default passed into each `**post_cmd**` as `**IO_TIMEOUT**`: max time **without** stdout/stderr bytes; timer resets on `**pipe_data_received`**. Catches “stuck but still running” cases where the process produces no output. | | Subprocess + protocol | `**ssh_session**` only — each tick `**await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)**`; timers enforced inside `**SSHReaderProtocol**` | **Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` address **idle output** vs **overall run length**. `**Command`** supplies defaults and passes them into each `**post_cmd**`; `**SSHReaderProtocol**` implements the watchdogs. **Split of roles:** `**Command`** schedules runs and timer values; `**ssh_session**` runs `**post_cmd**` and owns `**pipe_data_received**` streaming. ### 5.3 Scheduling repeats Repeating `**Command**` uses **event-loop time** so each **period** is measured **between scheduled fire times**, independent of how long `**post_cmd`** took: 1. **Deadline-based:** at tick start, `**next_fire = loop.time() + period`**. After `**await post_cmd(...)**`, wait `**max(0.0, next_fire - loop.time())**` (or equivalent `**Event.wait**` with timeout). Document behavior when `**post_cmd**` exceeds `**period**` (immediate next run vs align to next `**next_fire**`). 2. `**loop.call_at` / `call_later`:** schedule the next tick at `**loop.time() + period`** (or the computed deadline). The wait between ticks is the time remaining until `**next_fire**` (or the next `**call_at**` / `**call_later**` fire), so the **period** stays anchored to the event-loop clock regardless of `**post_cmd`** duration. ### 5.4 `Command` vs bare `Task` A coroutine `**run_command_loop(...)**` with deadline scheduling plus `**asyncio.create_task**` is enough for callers who want minimal API surface. `**Command**` is optional sugar for `**start`/`stop**`, named fields, and encapsulation — add it when that ergonomics is worth the type. ### 5.5 API sketch ```text cmd = Command(node, cmd="...", interval=5.0, line_sink=...) cmd = Command(node, cmd="...", interval=None, line_sink=...) await cmd.start() await cmd.stop() ``` ### 5.6 Command manager `CommandManager` supports dynamic add/remove of commands while the system is running: ```text manager = CommandManager() cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10) await manager.stop_command(cid) await manager.stop_all() active = manager.list_active() history = manager.list_abnormal_history() ``` Self-cleaning behavior: - completed commands are removed from active state automatically. - abnormal stop records (exception/timeout) are retained in history. --- ## 6. Concurrent execution (`TaskGroup` / `gather`) **Requires Python 3.11+** for `**TaskGroup`** as documented here. - **Preparation:** build coroutines (e.g. `**session.post_cmd("cmd", ...)`** for several sessions). - **Execution (preferred):** `**async with asyncio.TaskGroup() as tg:`** then `**tg.create_task(coro)**` for each coroutine. `**TaskGroup**` cancels siblings on first failure (structured concurrency). - **Alternative:** `**await asyncio.gather(*coros, return_exceptions=True)`** when callers want all outcomes without `**TaskGroup**` fail-fast behavior. One top-level `**await**` runs the whole set concurrently. --- ## 7. Functional requirements | ID | Requirement | | --- | ----------------------------------------------------------------------------------------------------------------------------------------------------- | | R1 | Line-oriented streaming: decode, buffer, split on `\n`, optional final fragment on EOF (§10.3). | | R2 | Central logging via **logger and/or callable**; supports shared and per-session targets. | | R3 | Completion: process ended, stdin closed if used, pipes EOF’d, line buffers flushed (§9.2, §10). | | R4 | Prepared coroutines run **concurrently** via `**asyncio.TaskGroup`** (or `**gather**` when that shape fits); **Python 3.11+**. | | R5 | `**ush`**, `**ssh**`, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. | --- ## 8. Non-functional requirements | ID | Requirement | | --- | --------------------------------------------------------------------------------------------------------------------------------- | | N1 | Library I/O runs under `**asyncio.get_running_loop()**`; programs start work with `**asyncio.run**` or an application event loop. | | N2 | **Python 3.11+** for implementation and documentation examples. | | N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. | | N4 | Cancelling a command `**Task`** tears down subprocess and protocol cleanly. | --- ## 9. Line sink and completion ### 9.1 Per-line output Use `**logging.Logger**`, a **callable** `(stream, line) -> None`, or both. Normalize `**\r\n`**, strip `**\r**` for display. A dedicated `**LineSink**` **protocol** is optional sugar. ### 9.2 Communicate-like completion Completion means: 1. Subprocess has exited (return code available). 2. Stdout and stderr reached EOF (`**pipe_connection_lost`** for fds 1 and 2 as used). 3. Protocol line buffers flushed (**§10.3**); `**process_exited`** handled; completion event or future set. --- ## 10. Streaming: `SubprocessProtocol` and `pipe_data_received` The SSH streaming path uses `**loop.subprocess_exec**` with a `**SubprocessProtocol**`. **Line assembly happens in `pipe_data_received(fd, data)`** on arbitrary byte chunks from the transport. ### 10.1 Pipeline | Stage | Mechanism | | --------------- | ----------------------------------------------------------------------------------- | | Ingress | `**pipe_data_received(fd, data)**` — `fd` 1 = stdout, 2 = stderr | | Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable | | Line split | On `\n`, invoke `**on_line**` → logger / callable | | EOF | `**pipe_connection_lost**`: flush trailing fragment per §10.3 | | Done | `**process_exited**` + both pipes closed + internal finished state → complete await | ### 10.2 Transport vs logical lines **Ingress:** `**pipe_data_received`** delivers **byte chunks** (fragments and coalesced data). **Line boundaries** are applied in the protocol buffer: accumulate, split on `**\n`**, emit one logical line per `**on_line**` (Tcl `**gets**`-style delivery to parsers and loggers). ### 10.3 EOF / trailing fragment | Mode | Behavior | | ----------- | -------------------------------------------------------------------------------- | | **Default** | If buffer non-empty at EOF, emit one final line without requiring trailing `\n`. | | **Strict** | Drop or tag partial trailing line — configurable. | ### 10.4 Tcl-aligned behavior - **Input:** after internal buffering, `**on_line`** receives **complete lines** (same idea as Tcl `**gets`** on a readable channel). - **Output (Tcl `fconfigure -buffering line`):** In Tcl, **line-buffered** mode on a channel **flushes outgoing data when a newline is written** (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when **writing to the subprocess stdin** (if used), apply the same idea: **flush after each line** (writes ending with `**\n`**, or explicit flush) so the peer sees output without waiting for a full stdio buffer. --- ## 11. Timeouts | Timeout | Semantics | | ------------------ | -------------------------------------------------------------------- | | **Connect** | From spawn until `**connection_made`**; cancel timer when connected. | | **Command (wall)** | Maximum lifetime; cancel watchdogs and `**terminate`** transport. | | **IO idle** | Reset on each `**pipe_data_received`**; on fire, `**terminate**`. | If IO idle fires, completion still runs after terminate + drain (output may be partial). --- ## 12. Central log file Use `**logging**` with a `**FileHandler**`, or a **callable** that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional `**asyncio.Queue`** plus a single writer task. --- ## 13. API sketch (illustrative names) ```text async with asyncio.TaskGroup() as tg: tg.create_task(session_a.post_cmd("cmd1", ...)) tg.create_task(session_b.post_cmd("cmd2", ...)) cmd = Command(node, cmd="...", interval=10.0, line_sink=...) await cmd.start() await cmd.stop() ``` --- ## 14. Open decisions 1. Formal `**LineSink**` protocol vs **logger + callable** only. 2. **stderr:** merged to stdout vs separate channels. 3. Public name `**Command`** vs `**SshCommand**`. 4. Whether `**Command**` ships in v1 or callers use `**create_task(run_command_loop(...))**` only. --- ## 15. Specification checklist - Line emission, EOF, and CR handling documented in implementation. - Concurrent execution documented for **Python 3.11+** with `**TaskGroup`** (and `**gather**` where noted). - Completion = process exit + pipe EOF + buffer flush. - Central logging path documented. - Timeout and cancellation behavior documented. - Repeating `**Command**` scheduling uses **deadline-based** waits or `**call_at`/`call_later`** (§5.3). - Automated tests cover the scenarios in **§16**. --- ## 16. Test cases This section lists **behaviors to verify** in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints. ### 16.1 `ssh_session` / `post_cmd` / completion - **Happy path:** Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown. - **Non-zero exit:** Process exits with failure; completion still reached; return code or error path is observable as designed. - **Communicate-like barrier:** Completion does not report done until `**process_exited`** and pipe `**pipe_connection_lost**` semantics are satisfied (§9.2). ### 16.2 Streaming and line buffering (`pipe_data_received`) - **Chunked input:** Data arriving as multiple arbitrary chunks still yields **correct line splits** on `**\n`**. - **CR / CRLF:** `**\r\n`** and trailing `**\r**` handling matches §10.2 / §10.3. - **EOF partial line:** Buffer non-empty at EOF emits **one final line** under default policy (§10.3). - **Stdout vs stderr:** If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration. ### 16.3 Timeouts - **Connect timeout:** Fires when transport does not reach connected state in time; subprocess is terminated; completion reached. - **Command (wall / `CMD_TIMEOUT`):** Long-running command is terminated at wall limit; completion reached. - **IO idle (`IO_TIMEOUT`):** No `**pipe_data_received`** for the idle interval triggers terminate; completion reached (output may be partial) (§11). ### 16.4 `Command` (scheduled execution) - **One-shot (`interval` unset / zero):** `**start`** runs at most one `**post_cmd**` cycle per lifecycle (per defined semantics); `**stop**` ends cleanly. - **Repeating (`interval` > 0):** Multiple invocations occur; **period** between **scheduled** fires follows **deadline** or `**call_at`** rules (§5.3), not “sleep N after work” drift. - **Finite count:** `count=N` executes exactly N runs. - **Never-ending:** `count=Command.COUNT_FOREVER` runs until explicit stop/cancel. - **Timers on `Command`:** `**IO_TIMEOUT`** and `**CMD_TIMEOUT**` passed through to each `**post_cmd**` invocation behave as in §16.3. - **Stop / cancel:** `**stop()`** or `**Task**` cancellation ends the loop; no leaked tasks or open log handles. ### 16.4.1 `CommandManager` - **Add + self-clean:** Added finite commands appear in `list_active()` then auto-remove on normal completion. - **Stop one / stop all:** Explicit stop removes active entries. - **Abnormal history:** exception/timeout stops are retained in `list_abnormal_history()`. ### 16.5 Concurrent execution (Python 3.11+) - `**TaskGroup`:** Multiple `**post_cmd`** (or equivalent) tasks run **overlapped**; first failure cancels siblings if using default `**TaskGroup`** semantics. - `**gather` (when used):** `**return_exceptions=True`** path collects all results without aborting early, if that pattern is supported in helpers. ### 16.6 `ssh_node` / registry - `**WeakSet`:** After the application drops its last strong reference to a node, the node **disappears** from `**get_instances()`** following collection (§3.3). - **Strong reference:** While the app holds a node, it appears in `**get_instances()`** when enumeration runs. ### 16.7 Transport configuration - `**ssh` vs `ush`:** Argv shape and `**ssh_controlmaster`** interaction match §3 / §4 (as far as test doubles allow). - **Relay:** Jump-host argv prefix is built correctly when `**relay`** is set. ### 16.8 Logging / sink - **Per-line delivery:** Logger or callable receives **one line at a time** after buffering (§9.1, §10.2). - **Central vs per-session:** If both modes exist, each receives the expected lines (no cross-contamination unless designed). ### 16.9 Consoles / ControlMaster (if exercised in test suite) - `**open_consoles` / `close_consoles`:** Lifecycle can be covered with integration tests or marked **optional** when real hardware or SSH fixtures are unavailable. --- Implemented coverage reference: `tests/test_node_control.py` *End of design document.*