39 KiB
Design: fiwicontrol.commands — asyncio execution (SSH / ush)
Status: Current
Package: **fiwicontrol.commands** (FiWiControl repository).
Implementation file: **src/fiwicontrol/commands/node_control.py**.
The main entry type is **ssh_node** — SSH and **ush** transports to a rig.
Scope: Asyncio-based SSH/ush rig control: **ssh_node, **ssh_session**, **Command**, **CommandManager**, concurrent execution via **asyncio**, line-oriented streaming, and logging.
Language: Python 3.11+ (stdlib **asyncio.TaskGroup, **asyncio.timeout**, and related APIs assumed available).
Remote access: For **sshtype="ssh", passwordless SSH is required. Commands are executed via **ssh without an interactive TTY, so key-based authentication (or any non-interactive auth that does not prompt) must already work for **root@<ipaddr>** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits.
Running tests
Integration tests for **node_control.py** live in **tests/test_node_control.py**. They open real **ssh** sessions to **root@$FIWI_REMOTE_IP** (default **192.168.1.39**). Passwordless SSH must work for that host. Tests are opt-in: set **FIWI_RUN_REMOTE_TESTS=1** or pytest skips the module (so default CI / laptop runs do not require a rig).
From a terminal (repository root — e.g. **~/Code/FiWiControl**):
cd ~/Code/FiWiControl
export FIWI_RUN_REMOTE_TESTS=1
# Optional: target a different host (default is 192.168.1.39 if unset)
export FIWI_REMOTE_IP=192.168.1.39
# Verbose pytest (recommended)
python3 -m pytest tests/test_node_control.py -v
# Quiet summary only
python3 -m pytest tests/test_node_control.py -q
Without pytest, the file is also a **unittest** script (it sets **FIWI_RUN_REMOTE_TESTS=1** if unset, so you do not need that variable for a direct script run):
cd ~/Code/FiWiControl
python tests/test_node_control.py --remote-ip 192.168.1.39
# More asyncio logging:
python tests/test_node_control.py --remote-ip 192.168.1.39 --debug
**pytest** picks up **pythonpath = ["src"]** from **pyproject.toml**, so you do not need **PYTHONPATH=src** when you run from the repo root as above. After **pip install -e .**, imports resolve the same way.
Example successful output (line order and timings vary; captured with **python3 -m pytest tests/test_node_control.py -v** on Linux):
============================= test session starts ==============================
platform linux -- Python 3.13.11, pytest-9.0.3, pluggy-1.6.0 -- /usr/bin/python3
cachedir: .pytest_cache
rootdir: /home/.../FiWiControl
configfile: pyproject.toml
plugins: anyio-4.8.0
collecting ... collected 6 items
tests/test_node_control.py::TestRemoteSingleRunCommands::test_dmesg_tail PASSED [ 16%]
tests/test_node_control.py::TestRemoteSingleRunCommands::test_ls_home_directory PASSED [ 33%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_command_manager_add_and_self_clean PASSED [ 50%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_01_uname_r_every_1s_10_times PASSED [ 66%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_02_pwd_and_ls_every_1s_5_times PASSED [ 83%]
tests/test_node_control.py::TestRemoteRepeatingCommands::test_repeat_03_forever_can_be_stopped PASSED [100%]
============================== 6 passed in 16.47s ==============================
Fedora / ssh_config.d: pytest loads **tests/conftest.py, which **setdefaults **FIWI_SSH_CONFIG** to **tests/fixtures/ssh_config_minimal** when that file exists so **ssh** is invoked as **ssh -F …** and does not read broken system drop-ins under **/etc/ssh/ssh_config.d/**. Override with your own **FIWI_SSH_CONFIG** if needed. For manual **ssh**, export the same variable or fix system file permissions (see **docs/install.md**).
If SSH is not configured, you will see failures (connection refused, auth errors, or timeouts) instead of **PASSED**.
Package import smoke (no network; optional):
cd ~/Code/FiWiControl
python3 -m pytest tests/test_package_layout.py -q
How to use (examples)
Implementation lives in **src/fiwicontrol/commands/node_control.py**. All remote I/O is async — call from a coroutine under **asyncio.run()** or your application event loop.
Prerequisites (remote targets)
- Passwordless SSH — From the machine running this code,
**ssh root@<ipaddr> true** (or equivalent) must succeed without typing a password. Configure**~/.ssh/authorized_keys** on the target and use an unencrypted agent key or**ssh-agent**as appropriate for your environment. For**sshtype="ssh"**,**ssh_session.post_cmd**always passes an explicit**root@<hostname>**(**ssh_session.user**defaults to**root**), not a bare host, so the remote user is never your workstation login name. - Network reachability and correct
**ipaddr**(or hostname) on the node. - Optional:
**BatchMode=yes**in**ssh_config**for fail-fast behavior when keys are missing (recommended for automation).
Remote host: Python on the rig (power / discovery)
**ssh_node.rexec** runs shell commands on the rig over the same non-interactive **ssh root@<ipaddr>** path as the prerequisites above. Anything you run remotely — including **python3 -m fiwicontrol.power --discovery-json** for inventory or USB discovery — needs that Python environment installed on the node, not only on your laptop.
Full checklist (SSH keys, **git clone** on the Pi, **python3 -m fiwicontrol.commands …**, PEP 668 / **--break-system-packages**, verification **pytest**):
→ **docs/install.md** — section “Remote host setup”
Power-specific behaviour (what discovery returns, INI format): **docs/power-control-and-inventory.md**.
Imports
Preferred (re-exported from **fiwicontrol.commands**):
import asyncio
from fiwicontrol.commands import Command, CommandManager, ssh_node
Equivalent submodule import (same symbols):
from fiwicontrol.commands.node_control import Command, CommandManager, ssh_node
Install the package (editable: **pip install -e .** from the FiWiControl repo root; distribution name **fiwicontrol**). For **pytest** without installing, **pythonpath = ["src"]** is set in **pyproject.toml**.
Create an ssh_node (rig / execution target)
**ssh_node** is a class in **fiwicontrol.commands: one instance represents one reachable rig (OpenSSH or **ush). It is not the package name; the package is **fiwicontrol.commands**.
SSH (default) — targets **root@ipaddr** with ControlMaster socket under **/tmp/controlmasters_<ip>** when **ssh_controlmaster=True** (default for **rexec** / **Command** sessions).
node = ssh_node(
name="rack-pi",
ipaddr="192.168.1.39",
ssh_controlmaster=False, # optional; tests often disable for simplicity
silent_mode=False, # False: stream stdout/stderr lines to logging
)
**ush** instead of plain **ssh**:
node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush")
Jump host (relay) — argv becomes **ssh root@<relay>** then inner **ush** or **ssh**.
node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com")
Keep a strong reference to each node you care about; the class registry (**WeakSet**) only lists nodes still alive from the rest of your program (§3.3).
One-shot remote command (rexec)
**rexec** builds an **ssh_session**, runs **post_cmd**, and returns the session. Output accumulates in **session.results** (bytes); line-oriented output also goes to the logger unless **silent_mode=True**.
async def run_once():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
session = await node.rexec(
cmd="ls ~",
IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk)
CMD_TIMEOUT=30, # wall-clock cap for the remote command
CONNECT_TIMEOUT=20.0,
repeat=None, # omit or False: single subprocess lifecycle
)
text = session.results.decode("utf-8", errors="replace").strip()
print(text)
asyncio.run(run_once())
Repeating commands (Command)
**Command** drives separate **ssh_session.post_cmd** invocations on a schedule (**interval** in seconds, event-loop deadline-based — §5.3).
**interval=None**or**<= 0**: run once then stop.**interval > 0**: repeat until**count**is reached or**stop()**.**count**: finite integer, or**Command.COUNT_FOREVER**(**-1**) until**stop()**.
async def poll_uname():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
await cmd.start()
await asyncio.wait_for(cmd.task, timeout=25.0)
print("runs:", cmd.run_count) # expect 10
asyncio.run(poll_uname())
Stop a long-running / infinite command:
async def stop_forever():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER)
await cmd.start()
await asyncio.sleep(5)
await cmd.stop()
asyncio.run(stop_forever())
Optional **log_file**: each run’s output is appended under a banner. Per-run timeouts: **io_timeout**, **cmd_timeout**, **connect_timeout** (override node defaults).
Many commands (CommandManager)
**CommandManager** assigns IDs, starts tasks, removes finished commands from **list_active(), and records exception/timeout stops in **list_abnormal_history().
async def managed():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
mgr = CommandManager()
cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4)
cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4)
await asyncio.sleep(3)
print("active:", mgr.list_active())
print("abnormal:", mgr.list_abnormal_history())
await mgr.stop_all()
asyncio.run(managed())
Parallel one-shots (TaskGroup / gather)
For several independent **rexec**-style calls, schedule them together:
async def parallel():
a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False)
b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False)
async def one(node, cmd):
s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0)
return s.results.decode("utf-8", errors="replace")
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(one(a, "hostname"))
t2 = tg.create_task(one(b, "hostname"))
print(t1.result(), t2.result())
asyncio.run(parallel())
Use **asyncio.gather(..., return_exceptions=True)** when you want every outcome without **TaskGroup** fail-fast behavior.
Consoles (open_consoles / close_consoles)
Class methods walk **ssh_node.get_instances()**, clean **dmesg** on SSH nodes, and optionally open long-lived ControlMaster **dmesg -w** streams (§3.4). Call only when your rig expects this pattern.
await ssh_node.open_consoles(silent_mode=False)
# ... use nodes ...
await ssh_node.close_consoles()
Tests: see Running tests at the top of this document.
1. Goals
- Streaming with line buffering — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish.
- Run-to-completion — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3).
- Concurrent execution from a prepared set — Build coroutines (e.g. several
**post_cmd** calls), then run them concurrently under**asyncio.TaskGroup** (Python 3.11+). Use**asyncio.gather**when a simple “wait for all results” shape fits better. - Scheduled / periodic execution — Support one-shot and repeating commands through a single
**Command**type with explicit lifecycle (start/stop) and fixed-period scheduling semantics.
2. Architectural overview
| Type | Responsibility |
|---|---|
**ssh_node** (class) |
Per-host configuration: identity, ssh/ush argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. Does not own subprocess I/O or interval scheduling. |
**ssh_session** (class) |
One SSH child process for a command or stream: **post_cmd**, **close**, **SubprocessProtocol** with **pipe_data_received**, timeouts, communicate-like completion. Reads settings from **ssh_node** (e.g. via attribute delegation). |
**SSHReaderProtocol** (nested class on ssh_session) |
Subprocess protocol callbacks: connection_made, pipe_data_received, pipe_connection_lost, process_exited, watchdog timer hooks. |
**Command** (class) |
Managed lifecycle for repeated or one-shot runs driven by **ssh_node** / **ssh_session**: **interval**, **count**, **start/stop**, log/sink attachment. Each execution uses **ssh_session.post_cmd**. |
**CommandManager** (class) |
Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). |
Concurrent runs: Prefer **asyncio.TaskGroup** for structured concurrency; **asyncio.gather** when appropriate.
Per-line logging: Use **logging.Logger** and/or a callable sink; a formal **LineSink** protocol type is optional and only worth adding if multiple implementations need a shared interface.
Library asyncio rule: Public APIs that do I/O are **async**. Coroutines and protocols use **asyncio.get_running_loop()**. The application (or **asyncio.run**) starts the event loop before calling into the library.
3. ssh_node
3.1 Role
| Concern | ssh_node |
|---|---|
**name**, **ipaddr**, optional **device**, **devip** |
Yes |
**sshtype** (ssh / ush), **relay**, **self.ssh argv prefix |
Yes |
**ssh_controlmaster**, **controlmasters** (SSH only; off for ush) |
Yes |
**ssh_console_session**, console task references |
Yes (bookkeeping) |
Subprocess streaming / **pipe_data_received** |
No — **ssh_session** |
| Interval scheduling, periodic task lists | No — **Command** (callers hold **Command** instances and **Task** handles) |
3.2 Constructor parameters
**name**,**ipaddr**: label and target for argv assembly (root@…applied when building the remote command).**relay**: optional jump host; argv begins withssh root@<relay>then innerushorssh.**ssh_controlmaster**: enable ControlMaster path forssh; forced off forush.**silent_mode**: passed into sessions for line logging behavior.**console**: optional rig-facing flag.
3.3 Registry
Instances register in a class-level **WeakSet** (instances). **get_instances()** returns a snapshot of nodes that are still alive from the rest of the program’s perspective.
**WeakSet properties:**
- The registry supports enumeration (e.g.
**open_consoles** over nodes the application is using).**WeakSet**stores weak references only: node lifetime follows strong references held by the application; when those go away, the node may be collected and disappears frominstancesautomatically. **get_instances()**therefore reflects nodes that are still strongly referenced elsewhere (variables, rig object, collections).- Callers keep a strong reference to each node they care about for as long as that node should appear in global enumeration.
Alternative pattern: an explicit list of nodes passed into orchestration functions also works; **WeakSet** fits a central index without that list becoming the sole long-lived owner of every node.
3.4 Class-level orchestration
**open_consoles**, **close_consoles**: multi-node cleanup (pkill dmesg), ControlMaster + long-lived console streams. **async**; require a running loop.
Stopping scheduled work: callers **await cmd.stop()** and/or **task.cancel()** on the **Task** from **Command.start()** (exact API TBD).
3.5 Instance methods
| Method | Behavior |
|---|---|
**rexec** |
Constructs **ssh_session**, runs **post_cmd** with IO / command / connect timeouts. To run many **rexec**-style operations later in parallel, use **asyncio.TaskGroup** (or **gather** if that shape fits). |
**clean** |
**async**: remote cleanup (e.g. pkill dmesg) via a one-off subprocess. |
**close_console** |
**async**: tear down **ssh_console_session**. |
Scheduled / interval execution: **Command** (§5). **ssh_node**: identity, transport, registry, console bookkeeping.
3.6 Optional utilities
Thin **async helpers** (e.g. sleep delays) may exist as module-level functions when useful.
4. ssh_session
4.1 Role
One logical SSH subprocess: spawn via **loop.subprocess_exec** with a **SubprocessProtocol**, stream stdout/stderr through **pipe_data_received**, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion.
4.2 Relationship to ssh_node
The session holds a reference to **ssh_node** and delegates missing attributes (e.g. **ipaddr**, **ssh**, **controlmasters**) for argv and ControlMaster paths.
4.3 Operations
**post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)**— Build full**ssh**argv (including**-o ControlPath**/ master setup when**control_master**). A**repeat**flag that loops inside one subprocess lifecycle is distinct from**Command’s** separate invocations on a schedule (§5).**close** — Terminate console / master as required (**async**).
4.4 Inner protocol
**SSHReaderProtocol** (name may match implementation): implements **pipe_data_received**, **pipe_connection_lost**, **process_exited**, watchdog timers via **get_running_loop().call_later**, and optional **LoggerAdapter** for prefixed logs.
5. Command
5.1 Name and scope
The type is **Command** (or **SshCommand** if Command collides in consumer code). SSH is the default transport; repetition is modeled with an **interval** field on the same type.
5.2 Responsibilities
| Concern | Owner |
|---|---|
| Remote command string / argv fragment | **Command** |
**interval**: None or 0 → one execution per start/stop (or defined one-shot semantics); > 0 → period between scheduled fires |
**Command** |
**count**: finite run count or **Command.COUNT_FOREVER** for never-ending |
**Command** |
Optional **initial_delay**, jitter |
**Command** |
**start() / stop()**, background **Task**, cancellation, log flush |
**Command** |
| Log / line output target | **Command** — typically **logging.Logger** or a callable; formal **LineSink** protocol optional |
| Completion timer (wall-clock cap per run) | **Command** — default passed into each **post_cmd** as **CMD_TIMEOUT** (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. |
| I/O idle timer | **Command** — default passed into each **post_cmd** as **IO_TIMEOUT**: max time without stdout/stderr bytes; timer resets on **pipe_data_received**. Catches “stuck but still running” cases where the process produces no output. |
| Subprocess + protocol | **ssh_session** only — each tick **await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)**; timers enforced inside **SSHReaderProtocol** |
Timers on Command: **IO_TIMEOUT** and **CMD_TIMEOUT** address idle output vs overall run length. **Command** supplies defaults and passes them into each **post_cmd**; **SSHReaderProtocol** implements the watchdogs.
Split of roles: **Command** schedules runs and timer values; **ssh_session** runs **post_cmd** and owns **pipe_data_received** streaming.
5.3 Scheduling repeats
Repeating **Command** uses event-loop time so each period is measured between scheduled fire times, independent of how long **post_cmd** took:
- Deadline-based: at tick start,
**next_fire = loop.time() + period**. After**await post_cmd(...)**, wait**max(0.0, next_fire - loop.time())**(or equivalent**Event.wait**with timeout). Document behavior when**post_cmd**exceeds**period**(immediate next run vs align to next**next_fire**). **loop.call_at/call_later:** schedule the next tick at**loop.time() + period** (or the computed deadline).
The wait between ticks is the time remaining until **next_fire** (or the next **call_at** / **call_later** fire), so the period stays anchored to the event-loop clock regardless of **post_cmd** duration.
5.4 Command vs bare Task
A coroutine **run_command_loop(...)** with deadline scheduling plus **asyncio.create_task** is enough for callers who want minimal API surface. **Command** is optional sugar for **start/stop**, named fields, and encapsulation — add it when that ergonomics is worth the type.
5.5 API sketch
cmd = Command(node, cmd="...", interval=5.0, line_sink=...)
cmd = Command(node, cmd="...", interval=None, line_sink=...)
await cmd.start()
await cmd.stop()
5.6 Command manager
CommandManager supports dynamic add/remove of commands while the system is running:
manager = CommandManager()
cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10)
await manager.stop_command(cid)
await manager.stop_all()
active = manager.list_active()
history = manager.list_abnormal_history()
Self-cleaning behavior:
- completed commands are removed from active state automatically.
- abnormal stop records (exception/timeout) are retained in history.
6. Concurrent execution (TaskGroup / gather)
Requires Python 3.11+ for **TaskGroup** as documented here.
- Preparation: build coroutines (e.g.
**session.post_cmd("cmd", ...)** for several sessions). - Execution (preferred):
**async with asyncio.TaskGroup() as tg:** then**tg.create_task(coro)**for each coroutine.**TaskGroup**cancels siblings on first failure (structured concurrency). - Alternative:
**await asyncio.gather(*coros, return_exceptions=True)** when callers want all outcomes without**TaskGroup**fail-fast behavior.
One top-level **await** runs the whole set concurrently.
7. Functional requirements
| ID | Requirement |
|---|---|
| R1 | Line-oriented streaming: decode, buffer, split on \n, optional final fragment on EOF (§10.3). |
| R2 | Central logging via logger and/or callable; supports shared and per-session targets. |
| R3 | Completion: process ended, stdin closed if used, pipes EOF’d, line buffers flushed (§9.2, §10). |
| R4 | Prepared coroutines run concurrently via **asyncio.TaskGroup** (or **gather** when that shape fits); Python 3.11+. |
| R5 | **ush**, **ssh**, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. |
8. Non-functional requirements
| ID | Requirement |
|---|---|
| N1 | Library I/O runs under **asyncio.get_running_loop()**; programs start work with **asyncio.run** or an application event loop. |
| N2 | Python 3.11+ for implementation and documentation examples. |
| N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. |
| N4 | Cancelling a command **Task** tears down subprocess and protocol cleanly. |
9. Line sink and completion
9.1 Per-line output
Use **logging.Logger**, a callable (stream, line) -> None, or both. Normalize **\r\n**, strip **\r** for display. A dedicated **LineSink** protocol is optional sugar.
9.2 Communicate-like completion
Completion means:
- Subprocess has exited (return code available).
- Stdout and stderr reached EOF (
**pipe_connection_lost** for fds 1 and 2 as used). - Protocol line buffers flushed (§10.3);
**process_exited** handled; completion event or future set.
10. Streaming: SubprocessProtocol and pipe_data_received
The SSH streaming path uses **loop.subprocess_exec** with a **SubprocessProtocol**. Line assembly happens in pipe_data_received(fd, data) on arbitrary byte chunks from the transport.
10.1 Pipeline
| Stage | Mechanism |
|---|---|
| Ingress | **pipe_data_received(fd, data)** — fd 1 = stdout, 2 = stderr |
| Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable |
| Line split | On \n, invoke **on_line** → logger / callable |
| EOF | **pipe_connection_lost**: flush trailing fragment per §10.3 |
| Done | **process_exited** + both pipes closed + internal finished state → complete await |
10.2 Transport vs logical lines
Ingress: **pipe_data_received** delivers byte chunks (fragments and coalesced data). Line boundaries are applied in the protocol buffer: accumulate, split on **\n**, emit one logical line per **on_line** (Tcl **gets**-style delivery to parsers and loggers).
10.3 EOF / trailing fragment
| Mode | Behavior |
|---|---|
| Default | If buffer non-empty at EOF, emit one final line without requiring trailing \n. |
| Strict | Drop or tag partial trailing line — configurable. |
10.4 Tcl-aligned behavior
- Input: after internal buffering,
**on_line** receives complete lines (same idea as Tcl**gets** on a readable channel). - Output (Tcl
fconfigure -buffering line): In Tcl, line-buffered mode on a channel flushes outgoing data when a newline is written (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when writing to the subprocess stdin (if used), apply the same idea: flush after each line (writes ending with**\n**, or explicit flush) so the peer sees output without waiting for a full stdio buffer.
11. Timeouts
| Timeout | Semantics |
|---|---|
| Connect | From spawn until **connection_made**; cancel timer when connected. |
| Command (wall) | Maximum lifetime; cancel watchdogs and **terminate** transport. |
| IO idle | Reset on each **pipe_data_received**; on fire, **terminate**. |
If IO idle fires, completion still runs after terminate + drain (output may be partial).
12. Central log file
Use **logging** with a **FileHandler**, or a callable that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional **asyncio.Queue** plus a single writer task.
13. API sketch (illustrative names)
async with asyncio.TaskGroup() as tg:
tg.create_task(session_a.post_cmd("cmd1", ...))
tg.create_task(session_b.post_cmd("cmd2", ...))
cmd = Command(node, cmd="...", interval=10.0, line_sink=...)
await cmd.start()
await cmd.stop()
14. Open decisions
- Formal
**LineSink**protocol vs logger + callable only. - stderr: merged to stdout vs separate channels.
- Public name
**Command** vs**SshCommand**. - Whether
**Command**ships in v1 or callers use**create_task(run_command_loop(...))**only.
15. Specification checklist
- Line emission, EOF, and CR handling documented in implementation.
- Concurrent execution documented for Python 3.11+ with
**TaskGroup** (and**gather**where noted). - Completion = process exit + pipe EOF + buffer flush.
- Central logging path documented.
- Timeout and cancellation behavior documented.
- Repeating
**Command**scheduling uses deadline-based waits or**call_at/call_later** (§5.3). - Automated tests cover the scenarios in §16.
16. Test cases
This section lists behaviors to verify in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints.
16.1 ssh_session / post_cmd / completion
- Happy path: Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown.
- Non-zero exit: Process exits with failure; completion still reached; return code or error path is observable as designed.
- Communicate-like barrier: Completion does not report done until
**process_exited** and pipe**pipe_connection_lost**semantics are satisfied (§9.2).
16.2 Streaming and line buffering (pipe_data_received)
- Chunked input: Data arriving as multiple arbitrary chunks still yields correct line splits on
**\n**. - CR / CRLF:
**\r\n** and trailing**\r**handling matches §10.2 / §10.3. - EOF partial line: Buffer non-empty at EOF emits one final line under default policy (§10.3).
- Stdout vs stderr: If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration.
16.3 Timeouts
- Connect timeout: Fires when transport does not reach connected state in time; subprocess is terminated; completion reached.
- Command (wall /
CMD_TIMEOUT): Long-running command is terminated at wall limit; completion reached. - IO idle (
IO_TIMEOUT): No**pipe_data_received** for the idle interval triggers terminate; completion reached (output may be partial) (§11).
16.4 Command (scheduled execution)
- One-shot (
intervalunset / zero):**start** runs at most one**post_cmd**cycle per lifecycle (per defined semantics);**stop**ends cleanly. - Repeating (
interval> 0): Multiple invocations occur; period between scheduled fires follows deadline or**call_at** rules (§5.3), not “sleep N after work” drift. - Finite count:
count=Nexecutes exactly N runs. - Never-ending:
count=Command.COUNT_FOREVERruns until explicit stop/cancel. - Timers on
Command:**IO_TIMEOUT** and**CMD_TIMEOUT**passed through to each**post_cmd**invocation behave as in §16.3. - Stop / cancel:
**stop()** or**Task**cancellation ends the loop; no leaked tasks or open log handles.
16.4.1 CommandManager
- Add + self-clean: Added finite commands appear in
list_active()then auto-remove on normal completion. - Stop one / stop all: Explicit stop removes active entries.
- Abnormal history: exception/timeout stops are retained in
list_abnormal_history().
16.5 Concurrent execution (Python 3.11+)
**TaskGroup:** Multiple**post_cmd** (or equivalent) tasks run overlapped; first failure cancels siblings if using default**TaskGroup** semantics.**gather(when used):****return_exceptions=True** path collects all results without aborting early, if that pattern is supported in helpers.
16.6 ssh_node / registry
**WeakSet:** After the application drops its last strong reference to a node, the node disappears from**get_instances()** following collection (§3.3).- Strong reference: While the app holds a node, it appears in
**get_instances()** when enumeration runs.
16.7 Transport configuration
**sshvsush:** Argv shape and**ssh_controlmaster** interaction match §3 / §4 (as far as test doubles allow).- Relay: Jump-host argv prefix is built correctly when
**relay** is set.
16.8 Logging / sink
- Per-line delivery: Logger or callable receives one line at a time after buffering (§9.1, §10.2).
- Central vs per-session: If both modes exist, each receives the expected lines (no cross-contamination unless designed).
16.9 Consoles / ControlMaster (if exercised in test suite)
**open_consoles/close_consoles:** Lifecycle can be covered with integration tests or marked optional when real hardware or SSH fixtures are unavailable.
Implemented coverage reference: tests/test_node_control.py
End of design document.