Initial import: fiwicontrol package (commands + power scaffold)

Made-with: Cursor
This commit is contained in:
Robert McMahon 2026-04-10 11:08:30 -07:00
parent 0ffe139de7
commit 5874253359
12 changed files with 1555 additions and 42 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
__pycache__/
*.py[cod]
*$py.class
.pytest_cache/
.mypy_cache/
*.egg-info/
.eggs/
dist/
build/

185
LICENSE
View File

@ -6,68 +6,171 @@ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or Derivative Works a copy of this License; and
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices stating that You changed the files; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
Copyright 2026 rjmcmahon
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,3 +1,73 @@
# FiWiControl
Code for managing and testing Umber Fi-Wi networks
Tools and libraries for managing and testing Umber FiWi networks.
This repository ships one Python distribution (**`fiwicontrol`** for `pip`) whose import root is **`orchestrator`**:
1. **`orchestrator.commands`** — run commands on remote rigs via OpenSSH or **`ush`**, with asyncio streaming, timeouts, repeats (`Command`), and a small registry (`CommandManager`). Implementation: `src/orchestrator/commands/remote_nodes.py`.
2. **`orchestrator.power`** — reserved for power switching, monitoring, and discovery (Acroname, Monsoon, …). Must depend on **`commands`** only, not the reverse.
**Layout**
```
FiWiControl/
├── LICENSE
├── README.md
├── pyproject.toml
├── docs/
│ └── remote-nodes-asyncio-design.md
├── src/
│ └── orchestrator/
│ ├── commands/
│ │ ├── __init__.py
│ │ └── remote_nodes.py
│ └── power/
│ └── __init__.py
└── tests/
```
## Requirements
- Python **3.11+**
- For **`sshtype="ssh"`**: **passwordless** SSH to **`root@<host>`** (non-interactive `ssh`; see `docs/remote-nodes-asyncio-design.md`).
## Install (editable)
```bash
cd ~/Code/FiWiControl
python -m pip install -e .
python -m pip install -e ".[dev]" # pytest
```
Imports:
```python
from orchestrator.commands import ssh_node, Command, CommandManager
```
## Tests
Commands, example **`pytest`** output, and **`unittest`** entry points for **`remote_nodes`**: **`docs/remote-nodes-asyncio-design.md`** → section **“Running tests”** (top of that file).
Layout / import smoke (no network):
```bash
cd ~/Code/FiWiControl
pytest -q tests/test_package_layout.py
```
Remote integration (needs **`FIWI_REMOTE_IP`** and key auth):
```bash
cd ~/Code/FiWiControl
FIWI_REMOTE_IP=192.168.1.39 pytest -q tests/test_remote_nodes.py
```
## Remote
```bash
git remote add origin "https://git.umbernetworks.com/rjmcmahon/FiWiControl.git"
git push -u origin main
```
Use a Gitea **personal access token** as the HTTPS password if prompted.

29
docs/REFACTOR-REVIEW.md Normal file
View File

@ -0,0 +1,29 @@
# Remote nodes module status
`src/fiwicontrol/commands/remote_nodes.py` is the active asyncio implementation (package **`fiwicontrol.commands`**).
## Current state
- Refactor is merged into `remote_nodes.py`; there is no separate `ssh_nodes_refactored.py`.
- Module runs under Python 3.11+ asyncio patterns (`get_running_loop`, async class methods).
- `ssh_node.open_consoles` and `ssh_node.close_consoles` are async class methods.
- `Command` owns scheduling with deadline-based timing and supports:
- finite count (`count=N`)
- never-ending count (`count=Command.COUNT_FOREVER`)
- `CommandManager` can add commands dynamically, self-cleans active commands when done, and keeps abnormal-stop history (exception/timeout).
- Stream logging format includes timestamp (ms), node/ip label, optional run count, command tag, and line text.
## Tests
- Active test file: `tests/test_remote_nodes.py`
- Covers:
- single command runs
- dmesg command run
- repeat runs (`uname -r`, `pwd` + `ls`)
- never-ending repeat stop behavior
- `CommandManager` add + self-clean
## Notes
- **Passwordless SSH** to `root@<target>` (or your configured user) is required for non-interactive remote runs; see `remote-nodes-asyncio-design.md`.
- Remaining SSH warning output from host config permissions is environment-specific (`/etc/ssh/ssh_config.d/...`) and not a module logic failure.

View File

@ -0,0 +1,550 @@
# Design: Remote nodes — asyncio, streaming, and batched concurrency
**Status:** Current
**Scope:** `src/fiwicontrol/commands/remote_nodes.py` — asyncio-based SSH/`ush` rig control: **`ssh_node`**, **`ssh_session`**, **`Command`**, **`CommandManager`**, concurrent execution via **`asyncio`**, line-oriented streaming, and logging.
**Language:** **Python 3.11+** (stdlib **`asyncio.TaskGroup`**, **`asyncio.timeout`**, and related APIs assumed available).
**Remote access:** For **`sshtype="ssh"`**, **passwordless SSH is required**. Commands are executed via **`ssh`** without an interactive TTY, so **key-based authentication** (or any non-interactive auth that does not prompt) must already work for **`root@<ipaddr>`** (or your chosen user if you extend argv). Password prompts, interactive MFA, or unlocking passphrase-protected keys in this non-interactive path will cause failures or indefinite waits.
---
## How to use (examples)
Implementation lives in **`src/fiwicontrol/commands/remote_nodes.py`**. All remote I/O is **async** — call from a coroutine under **`asyncio.run()`** or your application event loop.
### Prerequisites (remote targets)
1. **Passwordless SSH** — From the machine running this code, **`ssh root@<ipaddr> true`** (or equivalent) must succeed **without** typing a password. Configure **`~/.ssh/authorized_keys`** on the target and use an unencrypted agent key or **`ssh-agent`** as appropriate for your environment.
2. Network reachability and correct **`ipaddr`** (or hostname) on the node.
3. Optional: **`BatchMode=yes`** in **`ssh_config`** for fail-fast behavior when keys are missing (recommended for automation).
### Import and layout
```python
import asyncio
from fiwicontrol.commands.remote_nodes import Command, CommandManager, ssh_node
```
Install the package (editable: `pip install -e .` from the **FiWiControl** repo root), or run tests with **`pytest`** using **`pythonpath = ["src"]`** in **`pyproject.toml`**.
### Create a node (`ssh_node`)
**SSH (default)** — targets **`root@ipaddr`** with ControlMaster socket under **`/tmp/controlmasters_<ip>`** when **`ssh_controlmaster=True`** (default for **`rexec`** / **`Command`** sessions).
```python
node = ssh_node(
name="rack-pi",
ipaddr="192.168.1.39",
ssh_controlmaster=False, # optional; tests often disable for simplicity
silent_mode=False, # False: stream stdout/stderr lines to logging
)
```
**`ush`** instead of plain **`ssh`**:
```python
node = ssh_node(name="rig", ipaddr="10.0.0.2", sshtype="ush")
```
**Jump host (`relay`)** — argv becomes **`ssh root@<relay>`** then inner **`ush`** or **`ssh`**.
```python
node = ssh_node(name="behind-jump", ipaddr="10.0.0.5", relay="bastion.example.com")
```
Keep a **strong reference** to each node you care about; the class registry (**`WeakSet`**) only lists nodes still alive from the rest of your program (§3.3).
### One-shot remote command (`rexec`)
**`rexec`** builds an **`ssh_session`**, runs **`post_cmd`**, and returns the session. Output accumulates in **`session.results`** (bytes); line-oriented output also goes to the logger unless **`silent_mode=True`**.
```python
async def run_once():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
session = await node.rexec(
cmd="ls ~",
IO_TIMEOUT=15.0, # idle I/O watchdog (reset on each received chunk)
CMD_TIMEOUT=30, # wall-clock cap for the remote command
CONNECT_TIMEOUT=20.0,
repeat=None, # omit or False: single subprocess lifecycle
)
text = session.results.decode("utf-8", errors="replace").strip()
print(text)
asyncio.run(run_once())
```
### Repeating commands (`Command`)
**`Command`** drives **separate** **`ssh_session.post_cmd`** invocations on a schedule (**`interval`** in seconds, event-loop deadline-based — §5.3).
- **`interval=None`** or **`<= 0`**: run **once** then stop.
- **`interval > 0`**: repeat until **`count`** is reached or **`stop()`**.
- **`count`**: finite integer, or **`Command.COUNT_FOREVER`** (**`-1`**) until **`stop()`**.
```python
async def poll_uname():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
await cmd.start()
await asyncio.wait_for(cmd.task, timeout=25.0)
print("runs:", cmd.run_count) # expect 10
asyncio.run(poll_uname())
```
Stop a long-running / infinite command:
```python
async def stop_forever():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
cmd = Command(node=node, cmd="date", interval=2.0, count=Command.COUNT_FOREVER)
await cmd.start()
await asyncio.sleep(5)
await cmd.stop()
asyncio.run(stop_forever())
```
Optional **`log_file`**: each runs output is appended under a banner. Per-run timeouts: **`io_timeout`**, **`cmd_timeout`**, **`connect_timeout`** (override node defaults).
### Many commands (`CommandManager`)
**`CommandManager`** assigns IDs, starts tasks, **removes finished commands** from **`list_active()`**, and records **exception/timeout** stops in **`list_abnormal_history()`**.
```python
async def managed():
node = ssh_node(name="pi", ipaddr="192.168.1.39", ssh_controlmaster=False)
mgr = CommandManager()
cid_a = await mgr.add_command(node=node, cmd="uname -r", interval=0.5, count=4)
cid_b = await mgr.add_command(node=node, cmd="pwd", interval=0.5, count=4)
await asyncio.sleep(3)
print("active:", mgr.list_active())
print("abnormal:", mgr.list_abnormal_history())
await mgr.stop_all()
asyncio.run(managed())
```
### Parallel one-shots (`TaskGroup` / `gather`)
For several **independent** **`rexec`**-style calls, schedule them together:
```python
async def parallel():
a = ssh_node(name="a", ipaddr="192.168.1.10", ssh_controlmaster=False)
b = ssh_node(name="b", ipaddr="192.168.1.11", ssh_controlmaster=False)
async def one(node, cmd):
s = await node.rexec(cmd=cmd, IO_TIMEOUT=15.0, CMD_TIMEOUT=30, CONNECT_TIMEOUT=20.0)
return s.results.decode("utf-8", errors="replace")
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(one(a, "hostname"))
t2 = tg.create_task(one(b, "hostname"))
print(t1.result(), t2.result())
asyncio.run(parallel())
```
Use **`asyncio.gather(..., return_exceptions=True)`** when you want every outcome without **`TaskGroup`** fail-fast behavior.
### Consoles (`open_consoles` / `close_consoles`)
Class methods walk **`ssh_node.get_instances()`**, clean **`dmesg`** on SSH nodes, and optionally open long-lived ControlMaster **`dmesg -w`** streams (§3.4). Call only when your rig expects this pattern.
```python
await ssh_node.open_consoles(silent_mode=False)
# ... use nodes ...
await ssh_node.close_consoles()
```
### Automated tests
**`tests/test_remote_nodes.py`** exercises remote shell access when **`FIWI_REMOTE_IP`** is reachable. Run, overriding the default IP:
```bash
# From repository root (FiWiControl):
cd /path/to/FiWiControl
FIWI_REMOTE_IP=192.168.1.39 python -m pytest tests/test_remote_nodes.py -v
# Or run the unittest module directly:
python tests/test_remote_nodes.py --remote-ip 192.168.1.39
```
---
## 1. Goals
1. **Streaming with line buffering** — Decode subprocess output, split on line boundaries, and deliver each line to logging (including a shared central log) without waiting for the process to finish.
2. **Run-to-completion** — Await a single completion that means: subprocess exited (or was terminated on timeout), and stdout/stderr are drained with line buffers flushed per the EOF policy (§10.3).
3. **Concurrent execution from a prepared set** — Build coroutines (e.g. several **`post_cmd`** calls), then run them **concurrently** under **`asyncio.TaskGroup`** (Python 3.11+). Use **`asyncio.gather`** when a simple “wait for all results” shape fits better.
4. **Scheduled / periodic execution** — Support one-shot and repeating commands through a single **`Command`** type with explicit lifecycle (`start`/`stop`) and fixed-period scheduling semantics.
---
## 2. Architectural overview
| Type | Responsibility |
|------|----------------|
| **`ssh_node`** (class) | Per-host configuration: identity, `ssh`/`ush` argv prefix, relay, ControlMaster socket path, console bookkeeping. Registry of instances. **Does not** own subprocess I/O or interval scheduling. |
| **`ssh_session`** (class) | One SSH child process for a command or stream: **`post_cmd`**, **`close`**, **`SubprocessProtocol`** with **`pipe_data_received`**, timeouts, communicate-like completion. Reads settings from **`ssh_node`** (e.g. via attribute delegation). |
| **`SSHReaderProtocol`** (nested class on `ssh_session`) | Subprocess protocol callbacks: `connection_made`, `pipe_data_received`, `pipe_connection_lost`, `process_exited`, watchdog timer hooks. |
| **`Command`** (class) | Managed lifecycle for repeated or one-shot runs driven by **`ssh_node`** / **`ssh_session`**: **`interval`**, **`count`**, **`start`/`stop`**, log/sink attachment. Each execution uses **`ssh_session.post_cmd`**. |
| **`CommandManager`** (class) | Dynamic command registry: add running commands, stop one/all, self-clean completed commands, keep abnormal-stop history (exception / timeout). |
**Concurrent runs:** Prefer **`asyncio.TaskGroup`** for structured concurrency; **`asyncio.gather`** when appropriate.
**Per-line logging:** Use **`logging.Logger`** and/or a **callable** sink; a formal **`LineSink`** protocol type is optional and only worth adding if multiple implementations need a shared interface.
**Library asyncio rule:** Public APIs that do I/O are **`async`**. Coroutines and protocols use **`asyncio.get_running_loop()`**. The application (or **`asyncio.run`**) starts the event loop before calling into the library.
---
## 3. `ssh_node`
### 3.1 Role
| Concern | `ssh_node` |
|---------|------------|
| **`name`**, **`ipaddr`**, optional **`device`**, **`devip`** | Yes |
| **`sshtype`** (`ssh` / `ush`), **`relay`**, **`self.ssh` argv prefix | Yes |
| **`ssh_controlmaster`**, **`controlmasters`** (SSH only; off for `ush`) | Yes |
| **`ssh_console_session`**, console task references | Yes (bookkeeping) |
| Subprocess streaming / **`pipe_data_received`** | **No****`ssh_session`** |
| Interval scheduling, periodic task lists | **No****`Command`** (callers hold **`Command`** instances and **`Task`** handles) |
### 3.2 Constructor parameters
- **`name`**, **`ipaddr`**: label and target for argv assembly (`root@…` applied when building the remote command).
- **`relay`**: optional jump host; argv begins with `ssh root@<relay>` then inner `ush` or `ssh`.
- **`ssh_controlmaster`**: enable ControlMaster path for `ssh`; forced off for `ush`.
- **`silent_mode`**: passed into sessions for line logging behavior.
- **`console`**: optional rig-facing flag.
### 3.3 Registry
Instances register in a class-level **`WeakSet`** (`instances`). **`get_instances()`** returns a snapshot of nodes that are **still alive** from the rest of the programs perspective.
**`WeakSet` properties:**
- The registry supports **enumeration** (e.g. **`open_consoles`** over nodes the application is using). **`WeakSet`** stores **weak references** only: node **lifetime** follows **strong references** held by the application; when those go away, the node may be **collected** and **disappears from `instances`** automatically.
- **`get_instances()`** therefore reflects nodes that are **still strongly referenced** elsewhere (variables, rig object, collections).
- Callers keep a **strong reference** to each node they care about for as long as that node should appear in global enumeration.
**Alternative pattern:** an explicit **list of nodes** passed into orchestration functions also works; **`WeakSet`** fits a **central index** without that list becoming the sole long-lived owner of every node.
### 3.4 Class-level orchestration
**`open_consoles`**, **`close_consoles`**: multi-node cleanup (`pkill dmesg`), ControlMaster + long-lived console streams. **`async`**; require a running loop.
Stopping scheduled work: callers **`await cmd.stop()`** and/or **`task.cancel()`** on the **`Task`** from **`Command.start()`** (exact API TBD).
### 3.5 Instance methods
| Method | Behavior |
|--------|----------|
| **`rexec`** | Constructs **`ssh_session`**, runs **`post_cmd`** with IO / command / connect timeouts. To run many **`rexec`**-style operations later in parallel, use **`asyncio.TaskGroup`** (or **`gather`** if that shape fits). |
| **`clean`** | **`async`**: remote cleanup (e.g. `pkill dmesg`) via a one-off subprocess. |
| **`close_console`** | **`async`**: tear down **`ssh_console_session`**. |
Scheduled / interval execution: **`Command`** (§5). **`ssh_node`**: identity, transport, registry, console bookkeeping.
### 3.6 Optional utilities
Thin **`async` helpers** (e.g. sleep delays) may exist as **module-level functions** when useful.
---
## 4. `ssh_session`
### 4.1 Role
One logical SSH subprocess: spawn via **`loop.subprocess_exec`** with a **`SubprocessProtocol`**, stream stdout/stderr through **`pipe_data_received`**, enforce connect / wall / IO-idle timeouts, and expose communicate-like completion.
### 4.2 Relationship to `ssh_node`
The session holds a reference to **`ssh_node`** and delegates missing attributes (e.g. **`ipaddr`**, **`ssh`**, **`controlmasters`**) for argv and ControlMaster paths.
### 4.3 Operations
- **`post_cmd(cmd, IO_TIMEOUT, CMD_TIMEOUT, …)`** — Build full **`ssh`** argv (including **`-o ControlPath`** / master setup when **`control_master`**). A **`repeat`** flag that loops **inside one subprocess lifecycle** is distinct from **`Command`s** separate invocations on a schedule (§5).
- **`close`** — Terminate console / master as required (**`async`**).
### 4.4 Inner protocol
**`SSHReaderProtocol`** (name may match implementation): implements **`pipe_data_received`**, **`pipe_connection_lost`**, **`process_exited`**, watchdog timers via **`get_running_loop().call_later`**, and optional **`LoggerAdapter`** for prefixed logs.
---
## 5. `Command`
### 5.1 Name and scope
The type is **`Command`** (or **`SshCommand`** if `Command` collides in consumer code). SSH is the default transport; **repetition** is modeled with an **`interval`** field on the same type.
### 5.2 Responsibilities
| Concern | Owner |
|--------|--------|
| Remote command string / argv fragment | **`Command`** |
| **`interval`**: `None` or `0` → one execution per start/stop (or defined one-shot semantics); `> 0` → period between scheduled fires | **`Command`** |
| **`count`**: finite run count or **`Command.COUNT_FOREVER`** for never-ending | **`Command`** |
| Optional **`initial_delay`**, jitter | **`Command`** |
| **`start()` / `stop()`**, background **`Task`**, cancellation, log flush | **`Command`** |
| Log / line output target | **`Command`** — typically **`logging.Logger`** or a **callable**; formal **`LineSink`** protocol optional |
| **Completion timer** (wall-clock cap per run) | **`Command`** — default passed into each **`post_cmd`** as **`CMD_TIMEOUT`** (name may match implementation): max time from start of that invocation until subprocess exit (or kill). Catches hung remote commands that never finish. |
| **I/O idle timer** | **`Command`** — default passed into each **`post_cmd`** as **`IO_TIMEOUT`**: max time **without** stdout/stderr bytes; timer resets on **`pipe_data_received`**. Catches “stuck but still running” cases where the process produces no output. |
| Subprocess + protocol | **`ssh_session`** only — each tick **`await post_cmd(..., IO_TIMEOUT=…, CMD_TIMEOUT=…)`**; timers enforced inside **`SSHReaderProtocol`** |
**Timers on `Command`:** **`IO_TIMEOUT`** and **`CMD_TIMEOUT`** address **idle output** vs **overall run length**. **`Command`** supplies defaults and passes them into each **`post_cmd`**; **`SSHReaderProtocol`** implements the watchdogs.
**Split of roles:** **`Command`** schedules runs and timer values; **`ssh_session`** runs **`post_cmd`** and owns **`pipe_data_received`** streaming.
### 5.3 Scheduling repeats
Repeating **`Command`** uses **event-loop time** so each **period** is measured **between scheduled fire times**, independent of how long **`post_cmd`** took:
1. **Deadline-based:** at tick start, **`next_fire = loop.time() + period`**. After **`await post_cmd(...)`**, wait **`max(0.0, next_fire - loop.time())`** (or equivalent **`Event.wait`** with timeout). Document behavior when **`post_cmd`** exceeds **`period`** (immediate next run vs align to next **`next_fire`**).
2. **`loop.call_at` / `call_later`:** schedule the next tick at **`loop.time() + period`** (or the computed deadline).
The wait between ticks is the time remaining until **`next_fire`** (or the next **`call_at`** / **`call_later`** fire), so the **period** stays anchored to the event-loop clock regardless of **`post_cmd`** duration.
### 5.4 `Command` vs bare `Task`
A coroutine **`run_command_loop(...)`** with deadline scheduling plus **`asyncio.create_task`** is enough for callers who want minimal API surface. **`Command`** is optional sugar for **`start`/`stop`**, named fields, and encapsulation — add it when that ergonomics is worth the type.
### 5.5 API sketch
```text
cmd = Command(node, cmd="...", interval=5.0, line_sink=...)
cmd = Command(node, cmd="...", interval=None, line_sink=...)
await cmd.start()
await cmd.stop()
```
### 5.6 Command manager
`CommandManager` supports dynamic add/remove of commands while the system is running:
```text
manager = CommandManager()
cid = await manager.add_command(node=node, cmd="uname -r", interval=1.0, count=10)
await manager.stop_command(cid)
await manager.stop_all()
active = manager.list_active()
history = manager.list_abnormal_history()
```
Self-cleaning behavior:
- completed commands are removed from active state automatically.
- abnormal stop records (exception/timeout) are retained in history.
---
## 6. Concurrent execution (`TaskGroup` / `gather`)
**Requires Python 3.11+** for **`TaskGroup`** as documented here.
- **Preparation:** build coroutines (e.g. **`session.post_cmd("cmd", ...)`** for several sessions).
- **Execution (preferred):** **`async with asyncio.TaskGroup() as tg:`** then **`tg.create_task(coro)`** for each coroutine. **`TaskGroup`** cancels siblings on first failure (structured concurrency).
- **Alternative:** **`await asyncio.gather(*coros, return_exceptions=True)`** when callers want all outcomes without **`TaskGroup`** fail-fast behavior.
One top-level **`await`** runs the whole set concurrently.
---
## 7. Functional requirements
| ID | Requirement |
|----|-------------|
| R1 | Line-oriented streaming: decode, buffer, split on `\n`, optional final fragment on EOF (§10.3). |
| R2 | Central logging via **logger and/or callable**; supports shared and per-session targets. |
| R3 | Completion: process ended, stdin closed if used, pipes EOFd, line buffers flushed (§9.2, §10). |
| R4 | Prepared coroutines run **concurrently** via **`asyncio.TaskGroup`** (or **`gather`** when that shape fits); **Python 3.11+**. |
| R5 | **`ush`**, **`ssh`**, relay, ControlMaster consoles, connect / wall / IO-idle timeouts — all compose with streaming and completion as described here. |
---
## 8. Non-functional requirements
| ID | Requirement |
|----|-------------|
| N1 | Library I/O runs under **`asyncio.get_running_loop()`**; programs start work with **`asyncio.run`** or an application event loop. |
| N2 | **Python 3.11+** for implementation and documentation examples. |
| N3 | Backpressure: document whether logging is assumed fast or use bounded queues / warnings. |
| N4 | Cancelling a command **`Task`** tears down subprocess and protocol cleanly. |
---
## 9. Line sink and completion
### 9.1 Per-line output
Use **`logging.Logger`**, a **callable** `(stream, line) -> None`, or both. Normalize **`\r\n`**, strip **`\r`** for display. A dedicated **`LineSink`** **protocol** is optional sugar.
### 9.2 Communicate-like completion
Completion means:
1. Subprocess has exited (return code available).
2. Stdout and stderr reached EOF (**`pipe_connection_lost`** for fds 1 and 2 as used).
3. Protocol line buffers flushed (**§10.3**); **`process_exited`** handled; completion event or future set.
---
## 10. Streaming: `SubprocessProtocol` and `pipe_data_received`
The SSH streaming path uses **`loop.subprocess_exec`** with a **`SubprocessProtocol`**. **Line assembly happens in `pipe_data_received(fd, data)`** on arbitrary byte chunks from the transport.
### 10.1 Pipeline
| Stage | Mechanism |
|--------|-----------|
| Ingress | **`pipe_data_received(fd, data)`** — `fd` 1 = stdout, 2 = stderr |
| Buffer / decode | Per-stream byte buffer; UTF-8 (replace or strict), configurable |
| Line split | On `\n`, invoke **`on_line`** → logger / callable |
| EOF | **`pipe_connection_lost`**: flush trailing fragment per §10.3 |
| Done | **`process_exited`** + both pipes closed + internal finished state → complete await |
### 10.2 Transport vs logical lines
**Ingress:** **`pipe_data_received`** delivers **byte chunks** (fragments and coalesced data). **Line boundaries** are applied in the protocol buffer: accumulate, split on **`\n`**, emit one logical line per **`on_line`** (Tcl **`gets`**-style delivery to parsers and loggers).
### 10.3 EOF / trailing fragment
| Mode | Behavior |
|------|----------|
| **Default** | If buffer non-empty at EOF, emit one final line without requiring trailing `\n`. |
| **Strict** | Drop or tag partial trailing line — configurable. |
### 10.4 Tcl-aligned behavior
- **Input:** after internal buffering, **`on_line`** receives **complete lines** (same idea as Tcl **`gets`** on a readable channel).
- **Output (Tcl `fconfigure -buffering line`):** In Tcl, **line-buffered** mode on a channel **flushes outgoing data when a newline is written** (and when the buffer fills), which matters for interactive use, sockets, and pipes so output appears promptly. For this module, when **writing to the subprocess stdin** (if used), apply the same idea: **flush after each line** (writes ending with **`\n`**, or explicit flush) so the peer sees output without waiting for a full stdio buffer.
---
## 11. Timeouts
| Timeout | Semantics |
|---------|-----------|
| **Connect** | From spawn until **`connection_made`**; cancel timer when connected. |
| **Command (wall)** | Maximum lifetime; cancel watchdogs and **`terminate`** transport. |
| **IO idle** | Reset on each **`pipe_data_received`**; on fire, **`terminate`**. |
If IO idle fires, completion still runs after terminate + drain (output may be partial).
---
## 12. Central log file
Use **`logging`** with a **`FileHandler`**, or a **callable** that writes timestamped lines. Prefer writes from the event loop thread; for heavy I/O, optional **`asyncio.Queue`** plus a single writer task.
---
## 13. API sketch (illustrative names)
```text
async with asyncio.TaskGroup() as tg:
tg.create_task(session_a.post_cmd("cmd1", ...))
tg.create_task(session_b.post_cmd("cmd2", ...))
cmd = Command(node, cmd="...", interval=10.0, line_sink=...)
await cmd.start()
await cmd.stop()
```
---
## 14. Open decisions
1. Formal **`LineSink`** protocol vs **logger + callable** only.
2. **stderr:** merged to stdout vs separate channels.
3. Public name **`Command`** vs **`SshCommand`**.
4. Whether **`Command`** ships in v1 or callers use **`create_task(run_command_loop(...))`** only.
---
## 15. Specification checklist
- [ ] Line emission, EOF, and CR handling documented in implementation.
- [ ] Concurrent execution documented for **Python 3.11+** with **`TaskGroup`** (and **`gather`** where noted).
- [ ] Completion = process exit + pipe EOF + buffer flush.
- [ ] Central logging path documented.
- [ ] Timeout and cancellation behavior documented.
- [ ] Repeating **`Command`** scheduling uses **deadline-based** waits or **`call_at`/`call_later`** (§5.3).
- [ ] Automated tests cover the scenarios in **§16**.
---
## 16. Test cases
This section lists **behaviors to verify** in automated tests (unit, functional, or integration as appropriate). Implementation may use mocks, local subprocess stubs, or real SSH depending on CI constraints.
### 16.1 `ssh_session` / `post_cmd` / completion
- **Happy path:** Remote command exits 0; completion fires after subprocess exit, stdout/stderr EOF, and protocol teardown.
- **Non-zero exit:** Process exits with failure; completion still reached; return code or error path is observable as designed.
- **Communicate-like barrier:** Completion does not report done until **`process_exited`** and pipe **`pipe_connection_lost`** semantics are satisfied (§9.2).
### 16.2 Streaming and line buffering (`pipe_data_received`)
- **Chunked input:** Data arriving as multiple arbitrary chunks still yields **correct line splits** on **`\n`**.
- **CR / CRLF:** **`\r\n`** and trailing **`\r`** handling matches §10.2 / §10.3.
- **EOF partial line:** Buffer non-empty at EOF emits **one final line** under default policy (§10.3).
- **Stdout vs stderr:** If stderr is a separate channel, both streams are decoded and routed per sink/logger configuration.
### 16.3 Timeouts
- **Connect timeout:** Fires when transport does not reach connected state in time; subprocess is terminated; completion reached.
- **Command (wall / `CMD_TIMEOUT`):** Long-running command is terminated at wall limit; completion reached.
- **IO idle (`IO_TIMEOUT`):** No **`pipe_data_received`** for the idle interval triggers terminate; completion reached (output may be partial) (§11).
### 16.4 `Command` (scheduled execution)
- **One-shot (`interval` unset / zero):** **`start`** runs at most one **`post_cmd`** cycle per lifecycle (per defined semantics); **`stop`** ends cleanly.
- **Repeating (`interval` > 0):** Multiple invocations occur; **period** between **scheduled** fires follows **deadline** or **`call_at`** rules (§5.3), not “sleep N after work” drift.
- **Finite count:** `count=N` executes exactly N runs.
- **Never-ending:** `count=Command.COUNT_FOREVER` runs until explicit stop/cancel.
- **Timers on `Command`:** **`IO_TIMEOUT`** and **`CMD_TIMEOUT`** passed through to each **`post_cmd`** invocation behave as in §16.3.
- **Stop / cancel:** **`stop()`** or **`Task`** cancellation ends the loop; no leaked tasks or open log handles.
### 16.4.1 `CommandManager`
- **Add + self-clean:** Added finite commands appear in `list_active()` then auto-remove on normal completion.
- **Stop one / stop all:** Explicit stop removes active entries.
- **Abnormal history:** exception/timeout stops are retained in `list_abnormal_history()`.
### 16.5 Concurrent execution (Python 3.11+)
- **`TaskGroup`:** Multiple **`post_cmd`** (or equivalent) tasks run **overlapped**; first failure cancels siblings if using default **`TaskGroup`** semantics.
- **`gather` (when used):** **`return_exceptions=True`** path collects all results without aborting early, if that pattern is supported in helpers.
### 16.6 `ssh_node` / registry
- **`WeakSet`:** After the application drops its last strong reference to a node, the node **disappears** from **`get_instances()`** following collection (§3.3).
- **Strong reference:** While the app holds a node, it appears in **`get_instances()`** when enumeration runs.
### 16.7 Transport configuration
- **`ssh` vs `ush`:** Argv shape and **`ssh_controlmaster`** interaction match §3 / §4 (as far as test doubles allow).
- **Relay:** Jump-host argv prefix is built correctly when **`relay`** is set.
### 16.8 Logging / sink
- **Per-line delivery:** Logger or callable receives **one line at a time** after buffering (§9.1, §10.2).
- **Central vs per-session:** If both modes exist, each receives the expected lines (no cross-contamination unless designed).
### 16.9 Consoles / ControlMaster (if exercised in test suite)
- **`open_consoles` / `close_consoles`:** Lifecycle can be covered with integration tests or marked **optional** when real hardware or SSH fixtures are unavailable.
---
Implemented coverage reference: `tests/test_remote_nodes.py`
*End of design document.*

27
pyproject.toml Normal file
View File

@ -0,0 +1,27 @@
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "fiwicontrol"
version = "0.1.0"
description = "FiWiControl: command execution (SSH/ush) and power rig control in one package."
readme = "README.md"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [{ name = "Umber" }]
dependencies = []
[project.optional-dependencies]
dev = ["pytest>=8.0"]
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]

View File

@ -0,0 +1,5 @@
"""FiWiControl: command execution and power rig helpers (single installable package)."""
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@ -0,0 +1,21 @@
"""
Command execution over SSH / ``ush``: sessions, scheduling, timeouts.
Must not import from ``fiwicontrol.power``.
"""
from fiwicontrol.commands.remote_nodes import (
Command,
CommandManager,
sleep_async,
ssh_node,
ssh_session,
)
__all__ = [
"Command",
"CommandManager",
"sleep_async",
"ssh_node",
"ssh_session",
]

View File

@ -0,0 +1,582 @@
# Copyright (c) 2026 Umber
#
# Licensed under the Apache License, Version 2.0; see LICENSE.
#
# Remote command helpers (SSH / ush) for test rig control.
#
# Async-first implementation for ssh_node.
#
# Typical usage:
# - One-shot: await ssh_node(...).rexec(cmd="...", IO_TIMEOUT=..., CMD_TIMEOUT=..., CONNECT_TIMEOUT=...)
# - Repeating: cmd = Command(node, cmd="...", interval=1.0, count=10); await cmd.start(); await cmd.task
# - Many: CommandManager().add_command(...) / stop_all() / list_active()
#
# Remote targets (sshtype "ssh"): passwordless SSH is required — see docs/remote-nodes-asyncio-design.md.
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
import weakref
from datetime import datetime
logger = logging.getLogger(__name__)
class ssh_node:
DEFAULT_IO_TIMEOUT = 30.0
DEFAULT_CMD_TIMEOUT = 30
DEFAULT_CONNECT_TIMEOUT = 60.0
instances = weakref.WeakSet()
@classmethod
def get_instances(cls):
return list(ssh_node.instances)
@classmethod
async def open_consoles(cls, silent_mode=False):
nodes = cls.get_instances()
node_names = []
cleans = [node.clean() for node in nodes if node.sshtype.lower() == "ssh"]
if cleans:
logging.info("Run consoles clean")
await asyncio.gather(*cleans, return_exceptions=True)
tasks, ipaddrs = [], []
for node in nodes:
if node.ssh_controlmaster and not node.ssh_console_session and node.ipaddr not in ipaddrs:
logging.info("Run consoles ControlMaster")
node.ssh_console_session = ssh_session(
name=node.name, hostname=node.ipaddr, node=node, control_master=True,
ssh_controlmaster=True, silent_mode=silent_mode,
)
coro = node.ssh_console_session.post_cmd(cmd="/usr/bin/dmesg -w", IO_TIMEOUT=None, CMD_TIMEOUT=None)
node.console_task = asyncio.create_task(coro)
tasks.append(node.console_task)
ipaddrs.append(node.ipaddr)
node_names.append(node.name)
if tasks:
s = " "
logging.info("Opening consoles: {}".format(s.join(node_names)))
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=60)
except TimeoutError:
logging.error("open console timeout")
raise
await asyncio.sleep(1)
logging.info("open_consoles done")
@classmethod
async def close_consoles(cls):
nodes, tasks, node_names = cls.get_instances(), [], []
for node in nodes:
if node.ssh_console_session:
node.console_task = asyncio.create_task(node.ssh_console_session.close())
tasks.append(node.console_task)
node_names.append(node.name)
if tasks:
s = " "
logging.info("Closing consoles: {}".format(s.join(node_names)))
await asyncio.wait_for(asyncio.gather(*tasks), timeout=60)
logging.info("Closing consoles done: {}".format(s.join(node_names)))
def __init__(
self, name=None, ipaddr=None, devip=None, console=False, device=None,
ssh_controlmaster=False, silent_mode=False, sshtype="ssh", relay=None,
):
self.ipaddr, self.name, self.device, self.devip = ipaddr, name, device, devip
self.sshtype = sshtype.lower()
if self.sshtype == "ssh":
cm_path = "/tmp/controlmasters_{}".format(self.ipaddr)
self.ssh_controlmaster, self.controlmasters = ssh_controlmaster, cm_path
else:
self.ssh_controlmaster, self.controlmasters = False, None
self.ssh_console_session, self.console_task, self.silent_mode = None, None, silent_mode
if relay:
self.relay, self.ssh = relay, ["/usr/bin/ssh", "root@{}".format(relay)]
else:
self.ssh = []
if self.sshtype == "ush":
self.ssh.extend(["/usr/local/bin/ush"])
elif self.sshtype == "ssh":
if not self.ssh:
logging.debug("node add /usr/bin/ssh")
self.ssh.extend(["/usr/bin/ssh"])
logging.debug("ssh={} ".format(self.ssh))
else:
raise ValueError("ssh type invalid")
ssh_node.instances.add(self)
async def rexec(
self, cmd="pwd", IO_TIMEOUT=DEFAULT_IO_TIMEOUT, CMD_TIMEOUT=DEFAULT_CMD_TIMEOUT,
CONNECT_TIMEOUT=DEFAULT_CONNECT_TIMEOUT, repeat=None,
):
this_session = ssh_session(
name=self.name, hostname=self.ipaddr, CONNECT_TIMEOUT=CONNECT_TIMEOUT,
node=self, ssh_controlmaster=True,
)
await this_session.post_cmd(cmd=cmd, IO_TIMEOUT=IO_TIMEOUT, CMD_TIMEOUT=CMD_TIMEOUT, repeat=repeat)
return this_session
async def clean(self):
childprocess = await asyncio.create_subprocess_exec(
"/usr/bin/ssh", "root@{}".format(self.ipaddr), "pkill", "dmesg",
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
)
stdout, stderr = await childprocess.communicate()
if stdout:
logging.info("{}".format(stdout))
if stderr:
logging.info("{}".format(stderr))
async def close_console(self):
if self.ssh_console_session:
await self.ssh_console_session.close()
self.ssh_console_session = None
async def sleep_async(delay=0, text=None, stoptext=None):
if text:
logging.info("Sleep {} ({})".format(delay, text))
await asyncio.sleep(delay)
if stoptext:
logging.info("Sleep done ({})".format(stoptext))
class Command:
"""Scheduled remote command: each tick uses ssh_session.post_cmd (deadline-based period)."""
COUNT_FOREVER = -1
def __init__(
self, node: ssh_node, cmd: str, interval: float | None = None, *,
io_timeout: float | None = None, cmd_timeout: int | None = None,
connect_timeout: float | None = None, log_file: str | None = None,
silent_mode: bool | None = None, count: int | None = None,
):
self._node, self._cmd, self._interval = node, cmd, interval
self._io_timeout = io_timeout if io_timeout is not None else ssh_node.DEFAULT_IO_TIMEOUT
self._cmd_timeout = cmd_timeout if cmd_timeout is not None else ssh_node.DEFAULT_CMD_TIMEOUT
self._connect_timeout = connect_timeout if connect_timeout is not None else ssh_node.DEFAULT_CONNECT_TIMEOUT
self._log_file, self._silent_mode = log_file, silent_mode if silent_mode is not None else node.silent_mode
if count is not None and count != self.COUNT_FOREVER and count < 1:
raise ValueError("count must be >= 1, None, or Command.COUNT_FOREVER")
self._count, self._run_count = count, 0
self._task, self._stop = None, asyncio.Event()
self._stop_reason, self._error = None, None
async def start(self):
self._stop.clear()
self._task = asyncio.create_task(self._run_loop())
async def stop(self):
self._stop.set()
if self._stop_reason is None:
self._stop_reason = "cancelled"
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
@property
def run_count(self):
return self._run_count
@property
def task(self):
return self._task
@property
def cmd_text(self):
return self._cmd
@property
def node_label(self):
return self._node.name if self._node.name else self._node.ipaddr
@property
def stop_reason(self):
return self._stop_reason
@property
def error(self):
return self._error
async def _run_loop(self):
loop, log_fh = asyncio.get_running_loop(), None
if self._log_file:
log_fh = open(self._log_file, "w", errors="ignore")
try:
while not self._stop.is_set():
period, next_fire = self._interval, loop.time()
if period is not None and period > 0:
next_fire = loop.time() + period
run_idx = self._run_count + 1
session = ssh_session(
name=self._node.name, hostname=self._node.ipaddr,
CONNECT_TIMEOUT=self._connect_timeout, node=self._node,
ssh_controlmaster=True, silent_mode=self._silent_mode,
run_idx=run_idx, run_target=self._count,
)
await session.post_cmd(cmd=self._cmd, IO_TIMEOUT=self._io_timeout, CMD_TIMEOUT=self._cmd_timeout)
if log_fh:
hdr = "********************** Command '{}' @ {} **********************\n"
log_fh.write(hdr.format(self._cmd, datetime.now()))
log_fh.write(session.results.decode("utf-8", errors="replace"))
log_fh.flush()
self._run_count += 1
if self._count not in (None, self.COUNT_FOREVER) and self._run_count >= self._count:
break
if period is None or period <= 0:
break
if next_fire > loop.time():
deadline_fut = loop.create_future()
def _at_deadline():
if not deadline_fut.done():
deadline_fut.set_result(None)
handle = loop.call_at(next_fire, _at_deadline)
try:
stop_task = asyncio.create_task(self._stop.wait())
_, pending = await asyncio.wait(
{stop_task, deadline_fut},
return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
try:
await p
except asyncio.CancelledError:
pass
finally:
handle.cancel()
if self._stop.is_set():
break
if self._stop_reason is None:
self._stop_reason = "completed"
except asyncio.CancelledError:
if self._stop_reason is None:
self._stop_reason = "cancelled"
raise
except Exception as exc:
self._stop_reason = "timeout" if isinstance(exc, TimeoutError) else "exception"
self._error = exc
raise
finally:
if log_fh:
log_fh.close()
class CommandManager:
"""Manage multiple Command loops with self-cleaning active state."""
def __init__(self, history_limit=100):
self._active = {}
self._next_id = 1
self._abnormal_history = []
self._history_limit = history_limit
def _new_id(self):
cid = "c{:03d}".format(self._next_id)
self._next_id += 1
return cid
async def add_command(
self, *, node, cmd, interval=1.0, count=Command.COUNT_FOREVER, io_timeout=None,
cmd_timeout=None, connect_timeout=None, silent_mode=None, command_id=None,
):
cid = command_id if command_id else self._new_id()
if cid in self._active:
raise ValueError("command_id already exists: {}".format(cid))
command = Command(
node=node, cmd=cmd, interval=interval, count=count, io_timeout=io_timeout,
cmd_timeout=cmd_timeout, connect_timeout=connect_timeout, silent_mode=silent_mode,
)
self._active[cid] = command
await command.start()
command.task.add_done_callback(lambda task, command_id=cid: self._on_task_done(command_id, task))
return cid
def _on_task_done(self, command_id, task):
command = self._active.pop(command_id, None)
if not command:
return
reason = command.stop_reason
if task.cancelled():
reason = "cancelled"
else:
err = task.exception()
if err is not None:
reason = "timeout" if isinstance(err, TimeoutError) else "exception"
if reason in ("exception", "timeout"):
self._abnormal_history.append(
{
"command_id": command_id,
"node": command.node_label,
"cmd": command.cmd_text,
"run_count": command.run_count,
"finished_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
"reason": reason,
"error": str(command.error if command.error else task.exception()),
}
)
if len(self._abnormal_history) > self._history_limit:
self._abnormal_history = self._abnormal_history[-self._history_limit :]
async def stop_command(self, command_id):
command = self._active.get(command_id)
if command:
await command.stop()
async def stop_all(self):
active_ids = list(self._active.keys())
for command_id in active_ids:
await self.stop_command(command_id)
def list_active(self):
out = {}
for command_id, command in self._active.items():
out[command_id] = {"node": command.node_label, "cmd": command.cmd_text, "run_count": command.run_count}
return out
def list_abnormal_history(self):
return list(self._abnormal_history)
class ssh_session:
sessionid = 1
class SSHReaderProtocol(asyncio.SubprocessProtocol):
def __init__(self, session, silent_mode):
self._exited, self._closed_stdout, self._closed_stderr = False, False, False
self._mypid, self._stdoutbuffer, self._stderrbuffer = None, "", ""
self._session, self._silent_mode = session, silent_mode
self._loop = asyncio.get_running_loop()
if self._session.CONNECT_TIMEOUT is not None:
self.watchdog = self._loop.call_later(self._session.CONNECT_TIMEOUT, self.wd_timer)
self._session.closed.clear()
self.timeout_occurred = asyncio.Event()
self.timeout_occurred.clear()
@property
def finished(self):
return self._exited and self._closed_stdout and self._closed_stderr
def signal_exit(self):
if not self.finished:
return
self._session.closed.set()
def connection_made(self, transport):
if self._session.CONNECT_TIMEOUT is not None:
self.watchdog.cancel()
self._mypid, self._transport = transport.get_pid(), transport
self._session.sshpipe = self._transport.get_extra_info("subprocess")
self._session.adapter.debug("{} ssh node connection made pid=({})".format(self._session.name, self._mypid))
self._session.connected.set()
if self._session.IO_TIMEOUT is not None:
self.iowatchdog = self._loop.call_later(self._session.IO_TIMEOUT, self.io_timer)
if self._session.CMD_TIMEOUT is not None:
self.watchdog = self._loop.call_later(self._session.CMD_TIMEOUT, self.wd_timer)
def connection_lost(self, exc):
self._session.adapter.debug("{} node connection lost pid=({})".format(self._session.name, self._mypid))
self._session.connected.clear()
def pipe_data_received(self, fd, data):
if self._session.IO_TIMEOUT is not None:
self.iowatchdog.cancel()
self._session.results.extend(data)
text = data.decode("utf-8", errors="replace")
if fd == 1:
self._stdoutbuffer += text
while "\n" in self._stdoutbuffer:
line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1)
line = line.replace("\r", "")
if not self._silent_mode:
self._session.adapter.info("{}".format(line))
elif fd == 2:
self._stderrbuffer += text
while "\n" in self._stderrbuffer:
line, self._stderrbuffer = self._stderrbuffer.split("\n", 1)
line = line.replace("\r", "")
self._session.adapter.warning("{} {}".format(self._session.name, line))
if self._session.IO_TIMEOUT is not None:
self.iowatchdog = self._loop.call_later(self._session.IO_TIMEOUT, self.io_timer)
def pipe_connection_lost(self, fd, exc):
if self._session.IO_TIMEOUT is not None:
self.iowatchdog.cancel()
if fd == 1:
self._session.adapter.debug("{} stdout pipe closed (exception={})".format(self._session.name, exc))
self._closed_stdout = True
elif fd == 2:
self._session.adapter.debug("{} stderr pipe closed (exception={})".format(self._session.name, exc))
self._closed_stderr = True
self.signal_exit()
def process_exited(self):
if self._session.CMD_TIMEOUT is not None:
self.watchdog.cancel()
logging.debug("{} subprocess with pid={} closed".format(self._session.name, self._mypid))
self._exited, self._mypid = True, None
self.signal_exit()
def wd_timer(self, type=None):
logging.error("{}: timeout: pid={}".format(self._session.name, self._mypid))
self.timeout_occurred.set()
if self._session.sshpipe:
self._session.sshpipe.terminate()
def io_timer(self, type=None):
logging.error(
"{} IO timeout: cmd='{}' host(pid)={}({})".format(
self._session.name, self._session.cmd, self._session.hostname, self._mypid,
)
)
self.timeout_occurred.set()
self._session.sshpipe.terminate()
class CustomAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
node_label = self.extra.get("node_label") or self.extra.get("connid")
run_idx = self.extra.get("run_idx")
run_target = self.extra.get("run_target")
tag = self.extra.get("stream_tag")
parts = [ts]
if node_label:
parts.append(str(node_label))
if run_idx is not None:
target = "INF" if run_target in (None, Command.COUNT_FOREVER) else str(run_target)
parts.append("{}{}".format(run_idx, "/{}".format(target)))
prefix_txt = "[{}]".format(" ".join(parts))
if tag:
msg = "{} {}".format(str(tag).upper(), msg)
return "{} {}".format(prefix_txt, msg), kwargs
@staticmethod
def _stream_tag_for_cmd(cmd):
if not cmd:
return None
head = cmd.strip().split()[0]
return head.upper() if head else None
def __init__(
self, user="root", name=None, hostname="localhost", CONNECT_TIMEOUT=None,
control_master=False, node=None, silent_mode=False, ssh_controlmaster=True,
run_idx=None, run_target=None,
):
self.hostname, self.name, self.user = hostname, name, user
node_label = self.name if self.name else self.hostname
self.opened, self.closed, self.connected = asyncio.Event(), asyncio.Event(), asyncio.Event()
self.closed.set()
self.opened.clear()
self.connected.clear()
self.results, self.sshpipe, self.node = bytearray(), None, node
self.CONNECT_TIMEOUT, self.IO_TIMEOUT, self.CMD_TIMEOUT = CONNECT_TIMEOUT, None, None
self.control_master, self.ssh = control_master, node.ssh.copy()
self.silent_mode, self.ssh_controlmaster, self._protocol = silent_mode, ssh_controlmaster, None
log = logging.getLogger(__name__)
if control_master:
conn_id = "{}(console)".format(node_label)
else:
conn_id = "{}({})".format(node_label, ssh_session.sessionid)
ssh_session.sessionid += 1
self.adapter = self.CustomAdapter(
log,
{"connid": conn_id, "node_label": node_label, "run_idx": run_idx, "run_target": run_target},
)
def __getattr__(self, attr):
if self.node:
return getattr(self.node, attr)
raise AttributeError(attr)
@property
def is_established(self):
p = self._protocol
return p.finished if p is not None else False
async def close(self):
if self.control_master:
logging.info("control master close called {}".format(self.controlmasters))
childprocess = await asyncio.create_subprocess_exec(
"/usr/bin/ssh", "root@{}".format(self.ipaddr), "pkill", "dmesg",
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
)
stdout, stderr = await childprocess.communicate()
if stdout:
logging.info("{}".format(stdout))
if stderr:
logging.info("{}".format(stderr))
if self.sshpipe:
self.sshpipe.terminate()
await self.closed.wait()
if self._transport:
self._transport.close()
self._transport = None
logging.info("control master exit called {}".format(self.controlmasters))
argv = list(self.ssh)
argv.extend(["-o", "ControlPath={}".format(self.controlmasters), "-O", "exit"])
if self.node.ssh_controlmaster:
argv.append("{}@{}".format(self.user, self.hostname))
else:
argv.append("{}".format(self.hostname))
childprocess = await asyncio.create_subprocess_exec(*argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = await childprocess.communicate()
if stdout:
logging.info("{}".format(stdout))
if stderr:
logging.info("{}".format(stderr))
elif self.sshpipe:
self.sshpipe.terminate()
await self.closed.wait()
if self._transport:
self._transport.close()
self._transport = None
async def post_cmd(self, cmd=None, IO_TIMEOUT=None, CMD_TIMEOUT=None, ssh_controlmaster=True, repeat=None):
# repeat: respawn the same ssh argv when the subprocess ends; no delay (not interval scheduling).
logging.debug("{} Post command {}".format(self.name, cmd))
self.opened.clear()
self.cmd, self.IO_TIMEOUT, self.CMD_TIMEOUT = cmd, IO_TIMEOUT, CMD_TIMEOUT
self.adapter.extra["stream_tag"] = self._stream_tag_for_cmd(cmd)
sshcmd = self.ssh.copy()
if self.control_master:
try:
os.remove(str(self.controlmasters))
except OSError:
pass
sshcmd.extend([
"-o", "ControlMaster=yes", "-o", "ControlPath={}".format(self.controlmasters),
"-o", "ControlPersist=1",
])
elif self.node.sshtype == "ssh":
sshcmd.extend(["-o", "ControlPath={}".format(self.controlmasters)])
if self.node.ssh_controlmaster:
sshcmd.extend(["{}@{}".format(self.user, self.hostname), cmd])
else:
sshcmd.extend(["{}".format(self.hostname), cmd])
s = " "
logging.info("{} {}".format(self.name, s.join(sshcmd)))
loop = asyncio.get_running_loop()
while True:
proto = lambda: self.SSHReaderProtocol(self, self.silent_mode)
self._transport, self._protocol = await loop.subprocess_exec(
proto, *sshcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=None,
)
await self.connected.wait()
logging.debug("post_cmd connected")
if not self.control_master:
await self.closed.wait()
if self._transport:
self._transport.close()
self._transport = None
if not repeat:
break
return self.results

View File

@ -0,0 +1,7 @@
"""
Power switching, monitoring, and discovery (Acroname, Monsoon, ).
May import ``fiwicontrol.commands``; the reverse is forbidden.
"""
__all__: list[str] = []

View File

@ -0,0 +1,6 @@
def test_import_subpackages() -> None:
import fiwicontrol
import fiwicontrol.commands
import fiwicontrol.power
assert fiwicontrol.__version__

104
tests/test_remote_nodes.py Normal file
View File

@ -0,0 +1,104 @@
import argparse
import asyncio
import logging
import os
import sys
import unittest
from fiwicontrol.commands.remote_nodes import Command, CommandManager, ssh_node
class _RemoteNodeMixin:
def _make_node(self, silent_mode=False):
remote_ip = os.getenv("FIWI_REMOTE_IP", "192.168.1.39")
return ssh_node(
name=None,
ipaddr=remote_ip,
ssh_controlmaster=False,
silent_mode=silent_mode,
)
class TestRemoteSingleRunCommands(_RemoteNodeMixin, unittest.IsolatedAsyncioTestCase):
async def test_ls_home_directory(self):
node = self._make_node()
session = await node.rexec(
cmd="ls ~",
IO_TIMEOUT=15.0,
CMD_TIMEOUT=30,
CONNECT_TIMEOUT=20.0,
repeat=None,
)
output = session.results.decode("utf-8", errors="replace").strip()
self.assertTrue(output, "Expected non-empty output from `ls ~` on remote host.")
async def test_dmesg_tail(self):
node = self._make_node(silent_mode=False)
session = await node.rexec(
cmd="dmesg | tail -n 25",
IO_TIMEOUT=20.0,
CMD_TIMEOUT=45,
CONNECT_TIMEOUT=20.0,
repeat=None,
)
output = session.results.decode("utf-8", errors="replace").strip()
self.assertTrue(output, "Expected non-empty output from `dmesg | tail -n 25` on remote host.")
class TestRemoteRepeatingCommands(_RemoteNodeMixin, unittest.IsolatedAsyncioTestCase):
async def test_repeat_01_uname_r_every_1s_10_times(self):
node = self._make_node()
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
await cmd.start()
await asyncio.wait_for(cmd.task, timeout=20.0)
self.assertEqual(cmd.run_count, 10, "Expected 10 uname -r runs.")
async def test_repeat_02_pwd_and_ls_every_1s_5_times(self):
node = self._make_node()
pwd_cmd = Command(node=node, cmd="pwd", interval=1.0, count=5)
ls_cmd = Command(node=node, cmd="ls", interval=1.0, count=5)
await asyncio.gather(pwd_cmd.start(), ls_cmd.start())
await asyncio.wait_for(asyncio.gather(pwd_cmd.task, ls_cmd.task), timeout=15.0)
self.assertEqual(pwd_cmd.run_count, 5, "Expected 5 pwd runs.")
self.assertEqual(ls_cmd.run_count, 5, "Expected 5 ls runs.")
async def test_repeat_03_forever_can_be_stopped(self):
node = self._make_node()
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=Command.COUNT_FOREVER)
await cmd.start()
await asyncio.sleep(2.2)
await cmd.stop()
self.assertGreaterEqual(cmd.run_count, 2, "Expected at least 2 runs before stop.")
async def test_command_manager_add_and_self_clean(self):
node = self._make_node()
manager = CommandManager()
cid_fast = await manager.add_command(node=node, cmd="uname -r", interval=0.2, count=2)
cid_slow = await manager.add_command(node=node, cmd="pwd", interval=0.2, count=3)
self.assertIn(cid_fast, manager.list_active())
self.assertIn(cid_slow, manager.list_active())
await asyncio.sleep(1.2)
self.assertEqual(manager.list_active(), {}, "Expected active map to self-clean after finite runs.")
self.assertEqual(manager.list_abnormal_history(), [], "Expected no abnormal history on normal completion.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Integration tests for fiwicontrol.commands.remote_nodes (passwordless SSH required)."
)
parser.add_argument(
"--remote-ip",
default="192.168.1.39",
help="Target host for remote tests (passwordless SSH to root@<ip> must work).",
)
parser.add_argument("--debug", action="store_true", help="Enable asyncio/transport debug logging.")
args, remaining = parser.parse_known_args()
os.environ["FIWI_REMOTE_IP"] = args.remote_ip
if args.debug:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s:%(message)s")
logging.getLogger("asyncio").setLevel(logging.INFO)
else:
logging.basicConfig(level=logging.WARNING, format="%(message)s")
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("fiwicontrol.commands.remote_nodes").setLevel(logging.INFO)
unittest.main(argv=[sys.argv[0], *remaining])