1077 lines
40 KiB
Python
1077 lines
40 KiB
Python
"""
|
||
SSH transport for the Fi-Wi CLI on a remote host (``user@host``).
|
||
|
||
SSH transport in :mod:`fiwi.ssh` (imported by ``fiwi`` concentrator, CLI, paths).
|
||
|
||
* :class:`SshNodeConfig` — resolved ``FIWI_*`` values and optional ``remote_ssh.env`` /
|
||
``.fiwi_remote`` next to the install (call :meth:`SshNodeConfig.load` before runs).
|
||
* :class:`SshNode` — sync calls either **run to completion** (default) or return a **deferred
|
||
handle** when ``defer=True`` / ``FIWI_REMOTE_DEFER`` / CLI ``--async``. Handles wrap a spawned
|
||
``ssh`` child process (no Python threads — overlap comes from OS processes). Call
|
||
:meth:`RemoteCallHandle.result` (or the handle’s ``result``) to wait. Async methods mirror this
|
||
with :class:`asyncio.Task` when deferred.
|
||
|
||
Parsing helpers for Fi-Wi CLI tables live at module level where the concentrator needs them.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
|
||
from fiwi.paths import base_dir
|
||
|
||
_POPEN_TEXT_KW = {"text": True, "encoding": "utf-8", "errors": "replace"}
|
||
|
||
# Keys read from remote_ssh.env / .fiwi_remote (setdefault; real environment wins).
|
||
_SSH_ENV_FILE_KEYS = frozenset(
|
||
{
|
||
"FIWI_REMOTE_PYTHON",
|
||
"FIWI_REMOTE_SCRIPT",
|
||
"FIWI_SSH_BIN",
|
||
"FIWI_SSH_OPTS",
|
||
"FIWI_CALIBRATE_REMOTES",
|
||
"FIWI_REMOTE_HUBS",
|
||
"FIWI_REMOTE_HUB_LABEL",
|
||
"FIWI_REMOTE_DEFER",
|
||
"FIWI_FIBER_MAP",
|
||
"FIWI_DEFAULT_PANEL_PORTS",
|
||
"FIWI_DEFAULT_PANEL_SLOTS",
|
||
"FIWI_CONFIG",
|
||
}
|
||
)
|
||
|
||
|
||
async def _async_subprocess_communicate(
|
||
argv: List[str],
|
||
*,
|
||
timeout: float,
|
||
on_timeout: Tuple[int, str, str],
|
||
on_spawn_error: Tuple[int, str, str] = (1, "", ""),
|
||
catch_communicate_oserror: bool = False,
|
||
) -> Tuple[int, str, str]:
|
||
"""
|
||
Run ``argv`` with stdin closed and pipes; read full stdout/stderr via :meth:`asyncio.subprocess.Process.communicate`.
|
||
|
||
Use this when the caller only needs the final buffers (JSON, tables, short text). For **realtime** handling
|
||
(line parsing, progress, backpressure), use :meth:`asyncio.loop.subprocess_exec` with a custom
|
||
:class:`asyncio.SubprocessProtocol` and buffer or split lines inside :meth:`~asyncio.SubprocessProtocol.pipe_data_received`.
|
||
"""
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*argv,
|
||
stdin=asyncio.subprocess.DEVNULL,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
except OSError:
|
||
return on_spawn_error
|
||
try:
|
||
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||
except asyncio.TimeoutExpired:
|
||
proc.kill()
|
||
try:
|
||
await proc.wait()
|
||
except OSError:
|
||
pass
|
||
return on_timeout
|
||
except OSError:
|
||
if not catch_communicate_oserror:
|
||
raise
|
||
proc.kill()
|
||
try:
|
||
await proc.wait()
|
||
except OSError:
|
||
pass
|
||
return on_timeout
|
||
code = proc.returncode if proc.returncode is not None else 1
|
||
return (
|
||
code,
|
||
(out_b or b"").decode(errors="replace"),
|
||
(err_b or b"").decode(errors="replace"),
|
||
)
|
||
|
||
|
||
class RemoteCallHandle:
|
||
"""
|
||
Deferred Fi-Wi **capture** call: ``ssh`` is already running as a child process.
|
||
|
||
Start several handles to overlap SSH sessions; :meth:`result` waits on this process only
|
||
(no Python threads).
|
||
"""
|
||
|
||
__slots__ = ("_default_timeout", "_finished", "_proc", "_triple")
|
||
|
||
def __init__(self, cmd: List[str], *, timeout: float) -> None:
|
||
self._default_timeout = timeout
|
||
self._finished = False
|
||
self._triple: Optional[Tuple[int, str, str]] = None
|
||
self._proc = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.DEVNULL,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
**_POPEN_TEXT_KW,
|
||
)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> Tuple[int, str, str]:
|
||
"""``communicate`` with the child; return ``(exit_code, stdout, stderr)``."""
|
||
if self._finished:
|
||
assert self._triple is not None
|
||
return self._triple
|
||
t = self._default_timeout if timeout is None else timeout
|
||
try:
|
||
out, err = self._proc.communicate(timeout=t)
|
||
except subprocess.TimeoutExpired:
|
||
self._proc.kill()
|
||
try:
|
||
self._proc.communicate(timeout=10)
|
||
except (subprocess.TimeoutExpired, OSError):
|
||
pass
|
||
self._finished = True
|
||
self._triple = (124, "", "ssh/fiwi timed out")
|
||
return self._triple
|
||
code = self._proc.returncode if self._proc.returncode is not None else 1
|
||
self._finished = True
|
||
self._triple = (code, out or "", err or "")
|
||
return self._triple
|
||
|
||
|
||
class RemoteInvokeHandle:
|
||
"""Deferred interactive Fi-Wi run: child ``ssh`` uses this process’s stdin."""
|
||
|
||
__slots__ = ("_proc",)
|
||
|
||
def __init__(self, cmd: List[str], *, log_line: str) -> None:
|
||
print(log_line, file=sys.stderr, flush=True)
|
||
self._proc = subprocess.Popen(cmd, stdin=sys.stdin)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> int:
|
||
try:
|
||
code = self._proc.wait(timeout=timeout)
|
||
except subprocess.TimeoutExpired:
|
||
self._proc.kill()
|
||
self._proc.wait()
|
||
return 124
|
||
return code if code is not None else 1
|
||
|
||
|
||
class FetchCalibratePortsHandle:
|
||
"""Deferred :meth:`SshNode.fetch_calibrate_ports_json` — first SSH starts in ``__init__``."""
|
||
|
||
__slots__ = ("_finished", "_node", "_p1", "_value")
|
||
|
||
def __init__(self, node: "SshNode") -> None:
|
||
self._node = node
|
||
self._finished = False
|
||
self._value: Optional[List[Tuple[int, int]]] = None
|
||
cfg = SshNodeConfig.load()
|
||
cmd = node._fiwi_cmd_argv(cfg, ["calibrate-ports-json"])
|
||
self._p1 = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.DEVNULL,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
**_POPEN_TEXT_KW,
|
||
)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> List[Tuple[int, int]]:
|
||
if self._finished:
|
||
assert self._value is not None
|
||
return self._value
|
||
t1 = 90.0 if timeout is None else timeout
|
||
try:
|
||
out, err = self._p1.communicate(timeout=t1)
|
||
except subprocess.TimeoutExpired:
|
||
self._p1.kill()
|
||
try:
|
||
self._p1.communicate(timeout=10)
|
||
except (subprocess.TimeoutExpired, OSError):
|
||
pass
|
||
self._finished = True
|
||
self._value = []
|
||
return []
|
||
code = self._p1.returncode if self._p1.returncode is not None else 1
|
||
out = out or ""
|
||
err = err or ""
|
||
json_pairs = None
|
||
if code == 0 and out.strip():
|
||
json_pairs = _pairs_from_calibrate_json_stdout(out)
|
||
if json_pairs is not None:
|
||
self._finished = True
|
||
self._value = json_pairs
|
||
return json_pairs
|
||
if "Unknown command" in (out + err):
|
||
print(
|
||
f"Remote {self._node.target!r} has no calibrate-ports-json; using discover fallback "
|
||
"(update fiwi on the remote host to skip the extra SSH round trip).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
cfg = SshNodeConfig.load()
|
||
cmd2 = self._node._fiwi_cmd_argv(cfg, ["discover"])
|
||
try:
|
||
p2 = subprocess.run(
|
||
cmd2,
|
||
capture_output=True,
|
||
timeout=120,
|
||
stdin=subprocess.DEVNULL,
|
||
**_POPEN_TEXT_KW,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
self._finished = True
|
||
self._value = []
|
||
return []
|
||
except OSError:
|
||
self._finished = True
|
||
self._value = []
|
||
return []
|
||
code2 = p2.returncode if p2.returncode is not None else 1
|
||
out2 = (p2.stdout or "") if p2.stdout is not None else ""
|
||
err2 = (p2.stderr or "") if p2.stderr is not None else ""
|
||
if code2 != 0:
|
||
print(
|
||
f"discover on {self._node.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code2, out2, err2)
|
||
if code != 0 or (out + err).strip():
|
||
print(
|
||
f"calibrate-ports-json on {self._node.target!r} (exit {code}): {(err or out).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code, out, err)
|
||
self._finished = True
|
||
self._value = []
|
||
return []
|
||
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
|
||
if not pairs:
|
||
print(
|
||
f"Could not parse hub/port list from discover on {self._node.target!r}. "
|
||
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
self._finished = True
|
||
self._value = []
|
||
return []
|
||
print(
|
||
f"Using discover output for {self._node.target!r} ({len(pairs)} hub.port step(s)).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
self._finished = True
|
||
self._value = pairs
|
||
return pairs
|
||
|
||
|
||
class _HubPowerHandle:
|
||
__slots__ = ("_inner",)
|
||
|
||
def __init__(self, node: "SshNode", hub_1: int, port_0: int, enable: bool) -> None:
|
||
sub = "on" if enable else "off"
|
||
cfg = SshNodeConfig.load()
|
||
cmd = node._fiwi_cmd_argv(cfg, [sub, f"{hub_1}.{port_0}"])
|
||
self._inner = RemoteCallHandle(cmd, timeout=90.0)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> Tuple[int, str]:
|
||
code, out, err = self._inner.result(timeout=timeout)
|
||
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
|
||
return code, blob
|
||
|
||
|
||
class _StrFeedbackHandle:
|
||
__slots__ = ("_hub_1", "_inner", "_port_0")
|
||
|
||
def __init__(self, node: "SshNode", hub_1: int, port_0: int) -> None:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = node._fiwi_cmd_argv(cfg, ["status", f"{hub_1}.{port_0}"])
|
||
self._inner = RemoteCallHandle(cmd, timeout=90.0)
|
||
self._hub_1 = hub_1
|
||
self._port_0 = port_0
|
||
|
||
def result(self, timeout: Optional[float] = None) -> str:
|
||
code, out, err = self._inner.result(timeout=timeout)
|
||
if code != 0:
|
||
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
|
||
pwr, cur = parse_status_line_for_hub_port(out, self._hub_1, self._port_0)
|
||
return f"remote hub reports {pwr}, {cur} mA"
|
||
|
||
|
||
class _WlanJsonHandle:
|
||
__slots__ = ("_inner",)
|
||
|
||
def __init__(self, node: "SshNode", timeout: float) -> None:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = node._fiwi_cmd_argv(cfg, ["wlan-info-json"])
|
||
self._inner = RemoteCallHandle(cmd, timeout=timeout)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> Optional[Dict[str, Any]]:
|
||
code, out, err = self._inner.result(timeout=timeout)
|
||
if code != 0 or not (out or "").strip():
|
||
return None
|
||
try:
|
||
data = json.loads(out.strip())
|
||
except json.JSONDecodeError:
|
||
return None
|
||
return data if isinstance(data, dict) else None
|
||
|
||
|
||
class _LsusbLinesHandle:
|
||
"""First ``lsusb-lines-json`` SSH may still be running; optional second raw ``lsusb``."""
|
||
|
||
__slots__ = ("_inner", "_node")
|
||
|
||
def __init__(self, node: "SshNode") -> None:
|
||
self._node = node
|
||
cfg = SshNodeConfig.load()
|
||
cmd = node._fiwi_cmd_argv(cfg, ["lsusb-lines-json"])
|
||
self._inner = RemoteCallHandle(cmd, timeout=45.0)
|
||
|
||
def result(self, timeout: Optional[float] = None) -> List[str]:
|
||
code, out, err = self._inner.result(timeout=timeout)
|
||
if code == 0 and out.strip():
|
||
try:
|
||
data = json.loads(out.strip())
|
||
if isinstance(data, list) and all(isinstance(x, str) for x in data):
|
||
return data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
cfg = SshNodeConfig.load()
|
||
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self._node.target, "lsusb"]
|
||
try:
|
||
p2 = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
timeout=45,
|
||
stdin=subprocess.DEVNULL,
|
||
**_POPEN_TEXT_KW,
|
||
)
|
||
except (OSError, subprocess.TimeoutExpired):
|
||
return []
|
||
if p2.returncode != 0 or not p2.stdout:
|
||
return []
|
||
return p2.stdout.splitlines()
|
||
|
||
|
||
def resolve_remote_defer(defer: Optional[bool]) -> bool:
|
||
"""
|
||
Effective “deferred / concurrent” flag.
|
||
|
||
``defer`` explicit ``True``/``False`` wins; otherwise ``FIWI_REMOTE_DEFER`` from the
|
||
environment (including after :func:`apply_fiwi_ssh_env`) — set by CLI ``--async`` or
|
||
``remote_ssh.env``.
|
||
"""
|
||
if defer is not None:
|
||
return defer
|
||
apply_fiwi_ssh_env()
|
||
v = os.environ.get("FIWI_REMOTE_DEFER", "").strip().lower()
|
||
return v in ("1", "true", "yes")
|
||
|
||
|
||
def apply_fiwi_ssh_env() -> None:
|
||
"""
|
||
Load ``remote_ssh.env`` or ``.fiwi_remote`` from the install directory into
|
||
``os.environ`` (``FIWI_*`` keys used for SSH transport only).
|
||
"""
|
||
b = base_dir()
|
||
for fname in ("remote_ssh.env", ".fiwi_remote"):
|
||
path = os.path.join(b, fname)
|
||
if not os.path.isfile(path):
|
||
continue
|
||
try:
|
||
with open(path, encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" not in line:
|
||
continue
|
||
key, _, val = line.partition("=")
|
||
key, val = key.strip(), val.strip()
|
||
if len(val) >= 2 and val[0] == val[-1] and val[0] in "'\"":
|
||
val = val[1:-1]
|
||
if key in _SSH_ENV_FILE_KEYS:
|
||
os.environ.setdefault(key, val)
|
||
except OSError:
|
||
continue
|
||
break
|
||
|
||
|
||
def _merged_calibrate_remote_hosts() -> str:
|
||
"""Join ``FIWI_CALIBRATE_REMOTES`` and ``FIWI_REMOTE_HUBS`` (comma-separated ``user@host``)."""
|
||
seen: list[str] = []
|
||
for key in ("FIWI_CALIBRATE_REMOTES", "FIWI_REMOTE_HUBS"):
|
||
raw = (os.environ.get(key) or "").strip()
|
||
if not raw:
|
||
continue
|
||
for part in raw.split(","):
|
||
s = part.strip()
|
||
if s and s not in seen:
|
||
seen.append(s)
|
||
return ",".join(seen)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SshNodeConfig:
|
||
"""SSH client argv + remote ``python`` / ``fiwi.py`` paths after env + dotfiles."""
|
||
|
||
python: str
|
||
script: str
|
||
ssh_bin: str
|
||
ssh_extra_argv: tuple[str, ...]
|
||
calibrate_remotes: str
|
||
|
||
@classmethod
|
||
def load(cls) -> SshNodeConfig:
|
||
apply_fiwi_ssh_env()
|
||
raw_opts = os.environ.get("FIWI_SSH_OPTS") or ""
|
||
return cls(
|
||
python=os.environ.get("FIWI_REMOTE_PYTHON") or "python3",
|
||
script=os.environ.get("FIWI_REMOTE_SCRIPT") or "/usr/local/bin/fiwi.py",
|
||
ssh_bin=os.environ.get("FIWI_SSH_BIN") or "ssh",
|
||
ssh_extra_argv=tuple(shlex.split(raw_opts)),
|
||
calibrate_remotes=_merged_calibrate_remote_hosts(),
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SshNode:
|
||
"""One SSH destination for Fi-Wi (BrainStem hubs, calibration, etc.)."""
|
||
|
||
target: str
|
||
|
||
def __post_init__(self) -> None:
|
||
t = (self.target or "").strip()
|
||
if not t or "@" not in t:
|
||
raise ValueError("SshNode.target must be non-empty user@host")
|
||
object.__setattr__(self, "target", t)
|
||
|
||
@classmethod
|
||
def parse(cls, user_host: str) -> SshNode:
|
||
return cls(target=(user_host or "").strip())
|
||
|
||
def _invoke_blocking(self, remote_args: List[str]) -> int:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
|
||
print(
|
||
f"fiwi: ssh {self.target} → {cfg.python} {cfg.script} {' '.join(remote_args)}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
proc = subprocess.run(cmd, stdin=sys.stdin)
|
||
return proc.returncode if proc.returncode is not None else 1
|
||
|
||
def invoke(
|
||
self, remote_args: List[str], *, defer: Optional[bool] = None
|
||
) -> Union[int, RemoteInvokeHandle]:
|
||
"""
|
||
TTY-capable Fi-Wi CLI (stdin forwarded).
|
||
Adds ``ssh -t`` for ``panel calibrate`` when ``FIWI_SSH_OPTS`` has no ``-t``.
|
||
|
||
When deferred, returns :class:`RemoteInvokeHandle`; child ``ssh`` already runs and shares
|
||
this process’s stdin until :meth:`~RemoteInvokeHandle.result`.
|
||
"""
|
||
if resolve_remote_defer(defer):
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
|
||
log = (
|
||
f"fiwi: ssh {self.target} → {cfg.python} {cfg.script} {' '.join(remote_args)}"
|
||
)
|
||
return RemoteInvokeHandle(cmd, log_line=log)
|
||
return self._invoke_blocking(remote_args)
|
||
|
||
def _invoke_capture_blocking(
|
||
self, remote_args: List[str], timeout: float = 90
|
||
) -> Tuple[int, str, str]:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv(cfg, remote_args)
|
||
try:
|
||
proc = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout,
|
||
stdin=subprocess.DEVNULL,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
return 124, "", "ssh/fiwi timed out"
|
||
return (
|
||
proc.returncode if proc.returncode is not None else 1,
|
||
proc.stdout or "",
|
||
proc.stderr or "",
|
||
)
|
||
|
||
def invoke_capture(
|
||
self,
|
||
remote_args: List[str],
|
||
*,
|
||
timeout: float = 90,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str, str], RemoteCallHandle]:
|
||
"""Fi-Wi CLI with captured stdout/stderr. No TTY."""
|
||
if resolve_remote_defer(defer):
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv(cfg, remote_args)
|
||
return RemoteCallHandle(cmd, timeout=timeout)
|
||
return self._invoke_capture_blocking(remote_args, timeout=timeout)
|
||
|
||
@staticmethod
|
||
def _bash_escape_double(s: str) -> str:
|
||
"""Escape for inclusion inside bash double quotes (``$ ` \\ "``)."""
|
||
return "".join("\\" + c if c in '\\$"`' else c for c in s)
|
||
|
||
_REMOTE_HOME_TAIL_SAFE = re.compile(r"^[a-zA-Z0-9._+/@-]+$")
|
||
|
||
@classmethod
|
||
def remote_path_bash_word(cls, path: str) -> str:
|
||
"""
|
||
One bash word for a filesystem path run on the *remote* host via ``bash -lc``.
|
||
|
||
``shlex.quote('~/foo')`` yields a single-quoted literal with a tilde, which bash does not
|
||
expand, so ``~/venv/bin/python3`` would not run. Use ``$HOME/…`` for ``~/`` prefixes.
|
||
"""
|
||
if path.startswith("~/"):
|
||
rest = path[2:]
|
||
if cls._REMOTE_HOME_TAIL_SAFE.fullmatch(rest):
|
||
return "$HOME/" + rest
|
||
return '"$HOME/' + cls._bash_escape_double(rest) + '"'
|
||
return shlex.quote(path)
|
||
|
||
@classmethod
|
||
def _fiwi_remote_shell_command(cls, cfg: SshNodeConfig, remote_args: List[str]) -> str:
|
||
"""
|
||
Single remote command string for ``bash -lc`` so ``~/`` in ``FIWI_REMOTE_*`` paths expand
|
||
on the *remote* host (quoted tilde literals would not).
|
||
|
||
``-u`` makes the interpreter unbuffered. When ``stdbuf`` from GNU coreutils is on
|
||
``PATH`` (typical on Pi / Fedora), ``stdbuf -oL -eL`` also sets **line-buffered**
|
||
libc streams so mixed Python + native (BrainStem) output flushes on newlines over
|
||
SSH pipes (no TTY). If ``stdbuf`` is missing, the command falls back to ``-u`` only.
|
||
"""
|
||
py = cls.remote_path_bash_word(cfg.python)
|
||
sc = cls.remote_path_bash_word(cfg.script)
|
||
tail = " ".join(shlex.quote(p) for p in remote_args)
|
||
core = f"{py} -u {sc}" + (f" {tail}" if tail else "")
|
||
return (
|
||
"if command -v stdbuf >/dev/null 2>&1; then "
|
||
f"stdbuf -oL -eL {core}; "
|
||
f"else {core}; fi"
|
||
)
|
||
|
||
@staticmethod
|
||
def remote_bash_lc_ssh_token(inner_script: str) -> str:
|
||
"""
|
||
One argv element after ``user@host`` for ``ssh``.
|
||
|
||
OpenSSH runs the remote command as ``sh -c 'arg1 arg2 …'``. Passing
|
||
``bash``, ``-lc``, and a multi-word script as **separate** argv words
|
||
breaks ``if …; then …; fi`` (``then`` becomes a top-level token). Wrap
|
||
the whole script in ``bash -lc …`` with :func:`shlex.quote`.
|
||
"""
|
||
return f"bash -lc {shlex.quote(inner_script)}"
|
||
|
||
def _fiwi_cmd_argv(self, cfg: SshNodeConfig, remote_args: List[str]) -> List[str]:
|
||
inner = self._fiwi_remote_shell_command(cfg, remote_args)
|
||
return [
|
||
cfg.ssh_bin,
|
||
*cfg.ssh_extra_argv,
|
||
self.target,
|
||
self.remote_bash_lc_ssh_token(inner),
|
||
]
|
||
|
||
def _fiwi_cmd_argv_tty(self, cfg: SshNodeConfig, remote_args: List[str]) -> List[str]:
|
||
extra = list(cfg.ssh_extra_argv)
|
||
if (
|
||
len(remote_args) >= 2
|
||
and remote_args[0].lower() == "panel"
|
||
and remote_args[1].lower() == "calibrate"
|
||
and not any(x in ("-t", "-tt") for x in extra)
|
||
):
|
||
extra = ["-t", *extra]
|
||
inner = self._fiwi_remote_shell_command(cfg, remote_args)
|
||
return [cfg.ssh_bin, *extra, self.target, self.remote_bash_lc_ssh_token(inner)]
|
||
|
||
async def _ainvoke_coro(self, remote_args: List[str]) -> int:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
|
||
print(
|
||
f"fiwi: ssh {self.target} → {cfg.python} {cfg.script} {' '.join(remote_args)}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(*cmd)
|
||
except OSError:
|
||
return 1
|
||
return await proc.wait()
|
||
|
||
async def ainvoke(
|
||
self, remote_args: List[str], *, defer: Optional[bool] = None
|
||
) -> Union[int, asyncio.Task[int]]:
|
||
"""
|
||
Async interactive Fi-Wi CLI (inherits stdin — no ``asyncio.to_thread``).
|
||
|
||
When deferred, returns an :class:`asyncio.Task`.
|
||
"""
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(self._ainvoke_coro(remote_args))
|
||
return await self._ainvoke_coro(remote_args)
|
||
|
||
async def _ainvoke_capture_coro(
|
||
self, remote_args: List[str], timeout: float = 90
|
||
) -> Tuple[int, str, str]:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = self._fiwi_cmd_argv(cfg, remote_args)
|
||
return await _async_subprocess_communicate(
|
||
cmd,
|
||
timeout=timeout,
|
||
on_timeout=(124, "", "ssh/fiwi timed out"),
|
||
on_spawn_error=(1, "", ""),
|
||
)
|
||
|
||
async def ainvoke_capture(
|
||
self,
|
||
remote_args: List[str],
|
||
*,
|
||
timeout: float = 90,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str, str], asyncio.Task[Tuple[int, str, str]]]:
|
||
"""Fi-Wi CLI with captured stdout/stderr; concurrent-safe when deferred (returns Task)."""
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(
|
||
self._ainvoke_capture_coro(remote_args, timeout=timeout)
|
||
)
|
||
return await self._ainvoke_capture_coro(remote_args, timeout=timeout)
|
||
|
||
async def _araw_ssh_coro(
|
||
self, remote_argv: List[str], *, timeout: float = 45
|
||
) -> Tuple[int, str, str]:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
|
||
return await _async_subprocess_communicate(
|
||
cmd,
|
||
timeout=timeout,
|
||
on_timeout=(1, "", ""),
|
||
on_spawn_error=(1, "", ""),
|
||
catch_communicate_oserror=True,
|
||
)
|
||
|
||
async def araw_ssh(
|
||
self,
|
||
remote_argv: List[str],
|
||
*,
|
||
timeout: float = 45,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str, str], asyncio.Task[Tuple[int, str, str]]]:
|
||
"""Async raw remote command (no ``fiwi.py`` wrapper)."""
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(self._araw_ssh_coro(remote_argv, timeout=timeout))
|
||
return await self._araw_ssh_coro(remote_argv, timeout=timeout)
|
||
|
||
def _raw_ssh_blocking(
|
||
self, remote_argv: List[str], *, timeout: float = 45
|
||
) -> Tuple[int, str, str]:
|
||
cfg = SshNodeConfig.load()
|
||
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
|
||
try:
|
||
proc = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout,
|
||
stdin=subprocess.DEVNULL,
|
||
)
|
||
except (OSError, subprocess.TimeoutExpired):
|
||
return 1, "", ""
|
||
return (
|
||
proc.returncode if proc.returncode is not None else 1,
|
||
proc.stdout or "",
|
||
proc.stderr or "",
|
||
)
|
||
|
||
def raw_ssh(
|
||
self,
|
||
remote_argv: List[str],
|
||
*,
|
||
timeout: float = 45,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str, str], RemoteCallHandle]:
|
||
"""Run ``remote_argv`` on the host (no ``python fiwi.py`` prefix)."""
|
||
if resolve_remote_defer(defer):
|
||
cfg = SshNodeConfig.load()
|
||
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
|
||
return RemoteCallHandle(cmd, timeout=timeout)
|
||
return self._raw_ssh_blocking(remote_argv, timeout=timeout)
|
||
|
||
def _fetch_calibrate_ports_json_blocking(self) -> List[Tuple[int, int]]:
|
||
"""``[[hub, port], …]`` from remote ``calibrate-ports-json``, else parse ``discover``."""
|
||
code, out, err = self._invoke_capture_blocking(["calibrate-ports-json"], timeout=90)
|
||
out = out or ""
|
||
err = err or ""
|
||
json_pairs = None
|
||
if code == 0 and out.strip():
|
||
json_pairs = _pairs_from_calibrate_json_stdout(out)
|
||
|
||
if json_pairs is not None:
|
||
return json_pairs
|
||
|
||
if "Unknown command" in (out + err):
|
||
print(
|
||
f"Remote {self.target!r} has no calibrate-ports-json; using discover fallback "
|
||
"(update fiwi on the remote host to skip the extra SSH round trip).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
|
||
code2, out2, err2 = self._invoke_capture_blocking(["discover"], timeout=120)
|
||
out2 = out2 or ""
|
||
if code2 != 0:
|
||
print(
|
||
f"discover on {self.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code2, out2, err2)
|
||
if code != 0 or (out + err).strip():
|
||
print(
|
||
f"calibrate-ports-json on {self.target!r} (exit {code}): {(err or out).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code, out, err)
|
||
return []
|
||
|
||
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
|
||
if not pairs:
|
||
print(
|
||
f"Could not parse hub/port list from discover on {self.target!r}. "
|
||
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return []
|
||
|
||
print(
|
||
f"Using discover output for {self.target!r} ({len(pairs)} hub.port step(s)).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return pairs
|
||
|
||
def fetch_calibrate_ports_json(
|
||
self, *, defer: Optional[bool] = None
|
||
) -> Union[List[Tuple[int, int]], FetchCalibratePortsHandle]:
|
||
if resolve_remote_defer(defer):
|
||
return FetchCalibratePortsHandle(self)
|
||
return self._fetch_calibrate_ports_json_blocking()
|
||
|
||
async def _afetch_calibrate_ports_json_coro(self) -> List[Tuple[int, int]]:
|
||
code, out, err = await self._ainvoke_capture_coro(["calibrate-ports-json"], timeout=90)
|
||
out = out or ""
|
||
err = err or ""
|
||
json_pairs = None
|
||
if code == 0 and out.strip():
|
||
json_pairs = _pairs_from_calibrate_json_stdout(out)
|
||
|
||
if json_pairs is not None:
|
||
return json_pairs
|
||
|
||
if "Unknown command" in (out + err):
|
||
print(
|
||
f"Remote {self.target!r} has no calibrate-ports-json; using discover fallback "
|
||
"(update fiwi on the remote host to skip the extra SSH round trip).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
|
||
code2, out2, err2 = await self._ainvoke_capture_coro(["discover"], timeout=120)
|
||
out2 = out2 or ""
|
||
if code2 != 0:
|
||
print(
|
||
f"discover on {self.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code2, out2, err2)
|
||
if code != 0 or (out + err).strip():
|
||
print(
|
||
f"calibrate-ports-json on {self.target!r} (exit {code}): {(err or out).strip()[:400]}",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
_maybe_hint_remote_python(code, out, err)
|
||
return []
|
||
|
||
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
|
||
if not pairs:
|
||
print(
|
||
f"Could not parse hub/port list from discover on {self.target!r}. "
|
||
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return []
|
||
|
||
print(
|
||
f"Using discover output for {self.target!r} ({len(pairs)} hub.port step(s)).",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return pairs
|
||
|
||
async def afetch_calibrate_ports_json(
|
||
self, *, defer: Optional[bool] = None
|
||
) -> Union[List[Tuple[int, int]], asyncio.Task[List[Tuple[int, int]]]]:
|
||
"""Async :meth:`fetch_calibrate_ports_json`; use ``asyncio.gather`` on tasks when deferred."""
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(self._afetch_calibrate_ports_json_coro())
|
||
return await self._afetch_calibrate_ports_json_coro()
|
||
|
||
def _remote_hub_port_power_blocking(
|
||
self, hub_1: int, port_0: int, enable: bool
|
||
) -> Tuple[int, str]:
|
||
sub = "on" if enable else "off"
|
||
code, out, err = self._invoke_capture_blocking([sub, f"{hub_1}.{port_0}"])
|
||
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
|
||
return code, blob
|
||
|
||
def remote_hub_port_power(
|
||
self,
|
||
hub_1: int,
|
||
port_0: int,
|
||
enable: bool,
|
||
*,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str], _HubPowerHandle]:
|
||
if resolve_remote_defer(defer):
|
||
return _HubPowerHandle(self, hub_1, port_0, enable)
|
||
return self._remote_hub_port_power_blocking(hub_1, port_0, enable)
|
||
|
||
async def _aremote_hub_port_power_coro(
|
||
self, hub_1: int, port_0: int, enable: bool
|
||
) -> Tuple[int, str]:
|
||
sub = "on" if enable else "off"
|
||
code, out, err = await self._ainvoke_capture_coro([sub, f"{hub_1}.{port_0}"])
|
||
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
|
||
return code, blob
|
||
|
||
async def aremote_hub_port_power(
|
||
self,
|
||
hub_1: int,
|
||
port_0: int,
|
||
enable: bool,
|
||
*,
|
||
defer: Optional[bool] = None,
|
||
) -> Union[Tuple[int, str], asyncio.Task[Tuple[int, str]]]:
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(
|
||
self._aremote_hub_port_power_coro(hub_1, port_0, enable)
|
||
)
|
||
return await self._aremote_hub_port_power_coro(hub_1, port_0, enable)
|
||
|
||
def _remote_port_power_feedback_blocking(self, hub_1: int, port_0: int) -> str:
|
||
code, out, err = self._invoke_capture_blocking(["status", f"{hub_1}.{port_0}"])
|
||
if code != 0:
|
||
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
|
||
pwr, cur = parse_status_line_for_hub_port(out, hub_1, port_0)
|
||
return f"remote hub reports {pwr}, {cur} mA"
|
||
|
||
def remote_port_power_feedback(
|
||
self, hub_1: int, port_0: int, *, defer: Optional[bool] = None
|
||
) -> Union[str, _StrFeedbackHandle]:
|
||
if resolve_remote_defer(defer):
|
||
return _StrFeedbackHandle(self, hub_1, port_0)
|
||
return self._remote_port_power_feedback_blocking(hub_1, port_0)
|
||
|
||
async def _aremote_port_power_feedback_coro(self, hub_1: int, port_0: int) -> str:
|
||
code, out, err = await self._ainvoke_capture_coro(["status", f"{hub_1}.{port_0}"])
|
||
if code != 0:
|
||
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
|
||
pwr, cur = parse_status_line_for_hub_port(out, hub_1, port_0)
|
||
return f"remote hub reports {pwr}, {cur} mA"
|
||
|
||
async def aremote_port_power_feedback(
|
||
self, hub_1: int, port_0: int, *, defer: Optional[bool] = None
|
||
) -> Union[str, asyncio.Task[str]]:
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(
|
||
self._aremote_port_power_feedback_coro(hub_1, port_0)
|
||
)
|
||
return await self._aremote_port_power_feedback_coro(hub_1, port_0)
|
||
|
||
def _remote_wlan_info_json_blocking(self, timeout: float = 60) -> Optional[Dict[str, Any]]:
|
||
code, out, err = self._invoke_capture_blocking(
|
||
["wlan-info-json"], timeout=timeout
|
||
)
|
||
if code != 0 or not (out or "").strip():
|
||
return None
|
||
try:
|
||
data = json.loads(out.strip())
|
||
except json.JSONDecodeError:
|
||
return None
|
||
return data if isinstance(data, dict) else None
|
||
|
||
def remote_wlan_info_json(
|
||
self, timeout: float = 60, *, defer: Optional[bool] = None
|
||
) -> Union[Optional[Dict[str, Any]], _WlanJsonHandle]:
|
||
if resolve_remote_defer(defer):
|
||
return _WlanJsonHandle(self, timeout)
|
||
return self._remote_wlan_info_json_blocking(timeout=timeout)
|
||
|
||
async def _aremote_wlan_info_json_coro(self, timeout: float = 60) -> Optional[Dict[str, Any]]:
|
||
code, out, err = await self._ainvoke_capture_coro(
|
||
["wlan-info-json"], timeout=timeout
|
||
)
|
||
if code != 0 or not (out or "").strip():
|
||
return None
|
||
try:
|
||
data = json.loads(out.strip())
|
||
except json.JSONDecodeError:
|
||
return None
|
||
return data if isinstance(data, dict) else None
|
||
|
||
async def aremote_wlan_info_json(
|
||
self, timeout: float = 60, *, defer: Optional[bool] = None
|
||
) -> Union[Optional[Dict[str, Any]], asyncio.Task[Optional[Dict[str, Any]]]]:
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(self._aremote_wlan_info_json_coro(timeout=timeout))
|
||
return await self._aremote_wlan_info_json_coro(timeout=timeout)
|
||
|
||
def _remote_lsusb_lines_blocking(self) -> List[str]:
|
||
code, out, err = self._invoke_capture_blocking(["lsusb-lines-json"], timeout=45)
|
||
if code == 0 and out.strip():
|
||
try:
|
||
data = json.loads(out.strip())
|
||
if isinstance(data, list) and all(isinstance(x, str) for x in data):
|
||
return data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
code2, out2, err2 = self._raw_ssh_blocking(["lsusb"], timeout=45)
|
||
if code2 != 0 or not out2:
|
||
return []
|
||
return out2.splitlines()
|
||
|
||
def remote_lsusb_lines(
|
||
self, *, defer: Optional[bool] = None
|
||
) -> Union[List[str], _LsusbLinesHandle]:
|
||
if resolve_remote_defer(defer):
|
||
return _LsusbLinesHandle(self)
|
||
return self._remote_lsusb_lines_blocking()
|
||
|
||
async def _aremote_lsusb_lines_coro(self) -> List[str]:
|
||
code, out, err = await self._ainvoke_capture_coro(["lsusb-lines-json"], timeout=45)
|
||
if code == 0 and out.strip():
|
||
try:
|
||
data = json.loads(out.strip())
|
||
if isinstance(data, list) and all(isinstance(x, str) for x in data):
|
||
return data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
code2, out2, err2 = await self._araw_ssh_coro(["lsusb"], timeout=45)
|
||
if code2 != 0 or not out2:
|
||
return []
|
||
return out2.splitlines()
|
||
|
||
async def aremote_lsusb_lines(
|
||
self, *, defer: Optional[bool] = None
|
||
) -> Union[List[str], asyncio.Task[List[str]]]:
|
||
if resolve_remote_defer(defer):
|
||
return asyncio.create_task(self._aremote_lsusb_lines_coro())
|
||
return await self._aremote_lsusb_lines_coro()
|
||
|
||
|
||
def _pairs_from_calibrate_json_stdout(out: str) -> Optional[List[Tuple[int, int]]]:
|
||
try:
|
||
data = json.loads(out.strip())
|
||
except (json.JSONDecodeError, AttributeError):
|
||
return None
|
||
if not isinstance(data, list):
|
||
return None
|
||
pairs: List[Tuple[int, int]] = []
|
||
for item in data:
|
||
if isinstance(item, (list, tuple)) and len(item) == 2:
|
||
try:
|
||
pairs.append((int(item[0]), int(item[1])))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
return pairs
|
||
|
||
|
||
def _maybe_hint_remote_python(exit_code: int, out: str, err: str) -> None:
|
||
"""Exit 126/127 often means bad ``FIWI_REMOTE_PYTHON`` on the remote host."""
|
||
cfg = SshNodeConfig.load()
|
||
if exit_code not in (126, 127):
|
||
return
|
||
blob = f"{out or ''} {err or ''}".lower()
|
||
if "no such file" not in blob and "not found" not in blob:
|
||
return
|
||
print(
|
||
" SSH ran this on the remote host (paths must exist *there*, not on your workstation):\n"
|
||
f" interpreter: {cfg.python}\n"
|
||
f" script: {cfg.script}\n"
|
||
" Fix: SSH in, activate the venv with brainstem, run:\n"
|
||
" which python3\n"
|
||
" realpath /path/to/fiwi.py\n"
|
||
" Put those absolute paths in remote_ssh.env next to fiwi.py, or export:\n"
|
||
" FIWI_REMOTE_PYTHON=… FIWI_REMOTE_SCRIPT=…",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
|
||
|
||
def _parse_discover_stdout_for_calibrate_ports(stdout: str) -> List[Tuple[int, int]]:
|
||
"""Parse ``discover`` table lines → ``(hub, port0)`` pairs."""
|
||
pairs = []
|
||
for line in stdout.splitlines():
|
||
raw = line.rstrip()
|
||
if "|" not in raw:
|
||
continue
|
||
if set(raw.strip()) <= {"-", " "}:
|
||
continue
|
||
parts = [p.strip() for p in raw.split("|")]
|
||
if len(parts) < 3:
|
||
continue
|
||
hub_s, _, n_s = parts[0], parts[1], parts[2]
|
||
if not hub_s.isdigit():
|
||
continue
|
||
try:
|
||
hub = int(hub_s)
|
||
nports = int(n_s)
|
||
except ValueError:
|
||
continue
|
||
if hub < 1 or nports < 1:
|
||
continue
|
||
for p in range(nports):
|
||
pairs.append((hub, p))
|
||
return pairs
|
||
|
||
|
||
def parse_status_line_for_hub_port(stdout: str, hub_1: int, port_0: int) -> Tuple[str, str]:
|
||
"""Parse ``status hub.port`` table output for one port; return ``(power, current_str)``."""
|
||
for line in stdout.splitlines():
|
||
ln = line.strip()
|
||
if not ln or ln.startswith("-") or "Identity" in ln or "Hub" not in ln:
|
||
continue
|
||
parts = [p.strip() for p in line.split("|")]
|
||
if len(parts) < 4:
|
||
continue
|
||
hub_part = parts[0].replace("Hub", "", 1).strip()
|
||
try:
|
||
h = int(hub_part)
|
||
p = int(parts[1])
|
||
except ValueError:
|
||
continue
|
||
if h == hub_1 and p == port_0:
|
||
return parts[2], parts[3]
|
||
return "?", "?"
|