UmberHubManager/fiwi/ssh_node.py

999 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SSH transport for the Fi-Wi CLI on a remote host (``user@host``).
* :class:`SshNodeConfig` — resolved ``FIWI_*`` values and optional ``remote_ssh.env`` /
``.fiwi_remote`` next to the install (call :meth:`SshNodeConfig.load` before runs).
* :class:`SshNode` — sync calls either **run to completion** (default) or return a **deferred
handle** when ``defer=True`` / ``FIWI_REMOTE_DEFER`` / CLI ``--async``. Handles wrap a spawned
``ssh`` child process (no Python threads — overlap comes from OS processes). Call
:meth:`RemoteCallHandle.result` (or the handles ``result``) to wait. Async methods mirror this
with :class:`asyncio.Task` when deferred.
Parsing helpers for Fi-Wi CLI tables live at module level where the harness needs them.
"""
from __future__ import annotations
import asyncio
import json
import os
import shlex
import subprocess
import sys
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
from fiwi.paths import base_dir
_POPEN_TEXT_KW = {"text": True, "encoding": "utf-8", "errors": "replace"}
# Keys read from remote_ssh.env / .fiwi_remote (setdefault; real environment wins).
_SSH_ENV_FILE_KEYS = frozenset(
{
"FIWI_REMOTE_PYTHON",
"FIWI_REMOTE_SCRIPT",
"FIWI_SSH_BIN",
"FIWI_SSH_OPTS",
"FIWI_CALIBRATE_REMOTES",
"FIWI_REMOTE_DEFER",
}
)
async def _async_subprocess_communicate(
argv: List[str],
*,
timeout: float,
on_timeout: Tuple[int, str, str],
on_spawn_error: Tuple[int, str, str] = (1, "", ""),
catch_communicate_oserror: bool = False,
) -> Tuple[int, str, str]:
"""
Run ``argv`` with stdin closed and pipes; read full stdout/stderr via :meth:`asyncio.subprocess.Process.communicate`.
Use this when the caller only needs the final buffers (JSON, tables, short text). For **realtime** handling
(line parsing, progress, backpressure), use :meth:`asyncio.loop.subprocess_exec` with a custom
:class:`asyncio.SubprocessProtocol` and buffer or split lines inside :meth:`~asyncio.SubprocessProtocol.pipe_data_received`.
"""
try:
proc = await asyncio.create_subprocess_exec(
*argv,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except OSError:
return on_spawn_error
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutExpired:
proc.kill()
try:
await proc.wait()
except OSError:
pass
return on_timeout
except OSError:
if not catch_communicate_oserror:
raise
proc.kill()
try:
await proc.wait()
except OSError:
pass
return on_timeout
code = proc.returncode if proc.returncode is not None else 1
return (
code,
(out_b or b"").decode(errors="replace"),
(err_b or b"").decode(errors="replace"),
)
class RemoteCallHandle:
"""
Deferred Fi-Wi **capture** call: ``ssh`` is already running as a child process.
Start several handles to overlap SSH sessions; :meth:`result` waits on this process only
(no Python threads).
"""
__slots__ = ("_default_timeout", "_finished", "_proc", "_triple")
def __init__(self, cmd: List[str], *, timeout: float) -> None:
self._default_timeout = timeout
self._finished = False
self._triple: Optional[Tuple[int, str, str]] = None
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**_POPEN_TEXT_KW,
)
def result(self, timeout: Optional[float] = None) -> Tuple[int, str, str]:
"""``communicate`` with the child; return ``(exit_code, stdout, stderr)``."""
if self._finished:
assert self._triple is not None
return self._triple
t = self._default_timeout if timeout is None else timeout
try:
out, err = self._proc.communicate(timeout=t)
except subprocess.TimeoutExpired:
self._proc.kill()
try:
self._proc.communicate(timeout=10)
except (subprocess.TimeoutExpired, OSError):
pass
self._finished = True
self._triple = (124, "", "ssh/fiwi timed out")
return self._triple
code = self._proc.returncode if self._proc.returncode is not None else 1
self._finished = True
self._triple = (code, out or "", err or "")
return self._triple
class RemoteInvokeHandle:
"""Deferred interactive Fi-Wi run: child ``ssh`` uses this processs stdin."""
__slots__ = ("_proc",)
def __init__(self, cmd: List[str], *, log_line: str) -> None:
print(log_line, file=sys.stderr, flush=True)
self._proc = subprocess.Popen(cmd, stdin=sys.stdin)
def result(self, timeout: Optional[float] = None) -> int:
try:
code = self._proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc.wait()
return 124
return code if code is not None else 1
class FetchCalibratePortsHandle:
"""Deferred :meth:`SshNode.fetch_calibrate_ports_json` — first SSH starts in ``__init__``."""
__slots__ = ("_finished", "_node", "_p1", "_value")
def __init__(self, node: "SshNode") -> None:
self._node = node
self._finished = False
self._value: Optional[List[Tuple[int, int]]] = None
cfg = SshNodeConfig.load()
cmd = node._fiwi_cmd_argv(cfg, ["calibrate-ports-json"])
self._p1 = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**_POPEN_TEXT_KW,
)
def result(self, timeout: Optional[float] = None) -> List[Tuple[int, int]]:
if self._finished:
assert self._value is not None
return self._value
t1 = 90.0 if timeout is None else timeout
try:
out, err = self._p1.communicate(timeout=t1)
except subprocess.TimeoutExpired:
self._p1.kill()
try:
self._p1.communicate(timeout=10)
except (subprocess.TimeoutExpired, OSError):
pass
self._finished = True
self._value = []
return []
code = self._p1.returncode if self._p1.returncode is not None else 1
out = out or ""
err = err or ""
json_pairs = None
if code == 0 and out.strip():
json_pairs = _pairs_from_calibrate_json_stdout(out)
if json_pairs is not None:
self._finished = True
self._value = json_pairs
return json_pairs
if "Unknown command" in (out + err):
print(
f"Remote {self._node.target!r} has no calibrate-ports-json; using discover fallback "
"(update fiwi on the remote host to skip the extra SSH round trip).",
file=sys.stderr,
flush=True,
)
cfg = SshNodeConfig.load()
cmd2 = self._node._fiwi_cmd_argv(cfg, ["discover"])
try:
p2 = subprocess.run(
cmd2,
capture_output=True,
timeout=120,
stdin=subprocess.DEVNULL,
**_POPEN_TEXT_KW,
)
except subprocess.TimeoutExpired:
self._finished = True
self._value = []
return []
except OSError:
self._finished = True
self._value = []
return []
code2 = p2.returncode if p2.returncode is not None else 1
out2 = (p2.stdout or "") if p2.stdout is not None else ""
err2 = (p2.stderr or "") if p2.stderr is not None else ""
if code2 != 0:
print(
f"discover on {self._node.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code2, out2, err2)
if code != 0 or (out + err).strip():
print(
f"calibrate-ports-json on {self._node.target!r} (exit {code}): {(err or out).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code, out, err)
self._finished = True
self._value = []
return []
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
if not pairs:
print(
f"Could not parse hub/port list from discover on {self._node.target!r}. "
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
file=sys.stderr,
flush=True,
)
self._finished = True
self._value = []
return []
print(
f"Using discover output for {self._node.target!r} ({len(pairs)} hub.port step(s)).",
file=sys.stderr,
flush=True,
)
self._finished = True
self._value = pairs
return pairs
class _HubPowerHandle:
__slots__ = ("_inner",)
def __init__(self, node: "SshNode", hub_1: int, port_0: int, enable: bool) -> None:
sub = "on" if enable else "off"
cfg = SshNodeConfig.load()
cmd = node._fiwi_cmd_argv(cfg, [sub, f"{hub_1}.{port_0}"])
self._inner = RemoteCallHandle(cmd, timeout=90.0)
def result(self, timeout: Optional[float] = None) -> Tuple[int, str]:
code, out, err = self._inner.result(timeout=timeout)
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
return code, blob
class _StrFeedbackHandle:
__slots__ = ("_hub_1", "_inner", "_port_0")
def __init__(self, node: "SshNode", hub_1: int, port_0: int) -> None:
cfg = SshNodeConfig.load()
cmd = node._fiwi_cmd_argv(cfg, ["status", f"{hub_1}.{port_0}"])
self._inner = RemoteCallHandle(cmd, timeout=90.0)
self._hub_1 = hub_1
self._port_0 = port_0
def result(self, timeout: Optional[float] = None) -> str:
code, out, err = self._inner.result(timeout=timeout)
if code != 0:
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
pwr, cur = parse_status_line_for_hub_port(out, self._hub_1, self._port_0)
return f"remote hub reports {pwr}, {cur} mA"
class _WlanJsonHandle:
__slots__ = ("_inner",)
def __init__(self, node: "SshNode", timeout: float) -> None:
cfg = SshNodeConfig.load()
cmd = node._fiwi_cmd_argv(cfg, ["wlan-info-json"])
self._inner = RemoteCallHandle(cmd, timeout=timeout)
def result(self, timeout: Optional[float] = None) -> Optional[Dict[str, Any]]:
code, out, err = self._inner.result(timeout=timeout)
if code != 0 or not (out or "").strip():
return None
try:
data = json.loads(out.strip())
except json.JSONDecodeError:
return None
return data if isinstance(data, dict) else None
class _LsusbLinesHandle:
"""First ``lsusb-lines-json`` SSH may still be running; optional second raw ``lsusb``."""
__slots__ = ("_inner", "_node")
def __init__(self, node: "SshNode") -> None:
self._node = node
cfg = SshNodeConfig.load()
cmd = node._fiwi_cmd_argv(cfg, ["lsusb-lines-json"])
self._inner = RemoteCallHandle(cmd, timeout=45.0)
def result(self, timeout: Optional[float] = None) -> List[str]:
code, out, err = self._inner.result(timeout=timeout)
if code == 0 and out.strip():
try:
data = json.loads(out.strip())
if isinstance(data, list) and all(isinstance(x, str) for x in data):
return data
except json.JSONDecodeError:
pass
cfg = SshNodeConfig.load()
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self._node.target, "lsusb"]
try:
p2 = subprocess.run(
cmd,
capture_output=True,
timeout=45,
stdin=subprocess.DEVNULL,
**_POPEN_TEXT_KW,
)
except (OSError, subprocess.TimeoutExpired):
return []
if p2.returncode != 0 or not p2.stdout:
return []
return p2.stdout.splitlines()
def resolve_remote_defer(defer: Optional[bool]) -> bool:
"""
Effective “deferred / concurrent” flag.
``defer`` explicit ``True``/``False`` wins; otherwise ``FIWI_REMOTE_DEFER`` from the
environment (including after :func:`apply_fiwi_ssh_env`) — set by CLI ``--async`` or
``remote_ssh.env``.
"""
if defer is not None:
return defer
apply_fiwi_ssh_env()
v = os.environ.get("FIWI_REMOTE_DEFER", "").strip().lower()
return v in ("1", "true", "yes")
def apply_fiwi_ssh_env() -> None:
"""
Load ``remote_ssh.env`` or ``.fiwi_remote`` from the install directory into
``os.environ`` (``FIWI_*`` keys used for SSH transport only).
"""
b = base_dir()
for fname in ("remote_ssh.env", ".fiwi_remote"):
path = os.path.join(b, fname)
if not os.path.isfile(path):
continue
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, val = line.partition("=")
key, val = key.strip(), val.strip()
if len(val) >= 2 and val[0] == val[-1] and val[0] in "'\"":
val = val[1:-1]
if key in _SSH_ENV_FILE_KEYS:
os.environ.setdefault(key, val)
except OSError:
continue
break
@dataclass(frozen=True)
class SshNodeConfig:
"""SSH client argv + remote ``python`` / ``fiwi.py`` paths after env + dotfiles."""
python: str
script: str
ssh_bin: str
ssh_extra_argv: tuple[str, ...]
calibrate_remotes: str
@classmethod
def load(cls) -> SshNodeConfig:
apply_fiwi_ssh_env()
raw_opts = os.environ.get("FIWI_SSH_OPTS") or ""
return cls(
python=os.environ.get("FIWI_REMOTE_PYTHON") or "python3",
script=os.environ.get("FIWI_REMOTE_SCRIPT") or "/usr/local/bin/fiwi.py",
ssh_bin=os.environ.get("FIWI_SSH_BIN") or "ssh",
ssh_extra_argv=tuple(shlex.split(raw_opts)),
calibrate_remotes=(os.environ.get("FIWI_CALIBRATE_REMOTES") or "").strip(),
)
@dataclass(frozen=True)
class SshNode:
"""One SSH destination for Fi-Wi (BrainStem hubs, calibration, etc.)."""
target: str
def __post_init__(self) -> None:
t = (self.target or "").strip()
if not t or "@" not in t:
raise ValueError("SshNode.target must be non-empty user@host")
object.__setattr__(self, "target", t)
@classmethod
def parse(cls, user_host: str) -> SshNode:
return cls(target=(user_host or "").strip())
def _invoke_blocking(self, remote_args: List[str]) -> int:
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
print(
f"fiwi: ssh {self.target}{cfg.python} {cfg.script} {' '.join(remote_args)}",
file=sys.stderr,
flush=True,
)
proc = subprocess.run(cmd, stdin=sys.stdin)
return proc.returncode if proc.returncode is not None else 1
def invoke(
self, remote_args: List[str], *, defer: Optional[bool] = None
) -> Union[int, RemoteInvokeHandle]:
"""
TTY-capable Fi-Wi CLI (stdin forwarded).
Adds ``ssh -t`` for ``panel calibrate`` when ``FIWI_SSH_OPTS`` has no ``-t``.
When deferred, returns :class:`RemoteInvokeHandle`; child ``ssh`` already runs and shares
this processs stdin until :meth:`~RemoteInvokeHandle.result`.
"""
if resolve_remote_defer(defer):
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
log = (
f"fiwi: ssh {self.target}{cfg.python} {cfg.script} {' '.join(remote_args)}"
)
return RemoteInvokeHandle(cmd, log_line=log)
return self._invoke_blocking(remote_args)
def _invoke_capture_blocking(
self, remote_args: List[str], timeout: float = 90
) -> Tuple[int, str, str]:
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv(cfg, remote_args)
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL,
)
except subprocess.TimeoutExpired:
return 124, "", "ssh/fiwi timed out"
return (
proc.returncode if proc.returncode is not None else 1,
proc.stdout or "",
proc.stderr or "",
)
def invoke_capture(
self,
remote_args: List[str],
*,
timeout: float = 90,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str, str], RemoteCallHandle]:
"""Fi-Wi CLI with captured stdout/stderr. No TTY."""
if resolve_remote_defer(defer):
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv(cfg, remote_args)
return RemoteCallHandle(cmd, timeout=timeout)
return self._invoke_capture_blocking(remote_args, timeout=timeout)
def _fiwi_cmd_argv(self, cfg: SshNodeConfig, remote_args: List[str]) -> List[str]:
return [
cfg.ssh_bin,
*cfg.ssh_extra_argv,
self.target,
cfg.python,
cfg.script,
*remote_args,
]
def _fiwi_cmd_argv_tty(self, cfg: SshNodeConfig, remote_args: List[str]) -> List[str]:
extra = list(cfg.ssh_extra_argv)
if (
len(remote_args) >= 2
and remote_args[0].lower() == "panel"
and remote_args[1].lower() == "calibrate"
and not any(x in ("-t", "-tt") for x in extra)
):
extra = ["-t", *extra]
return [cfg.ssh_bin, *extra, self.target, cfg.python, cfg.script, *remote_args]
async def _ainvoke_coro(self, remote_args: List[str]) -> int:
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv_tty(cfg, remote_args)
print(
f"fiwi: ssh {self.target}{cfg.python} {cfg.script} {' '.join(remote_args)}",
file=sys.stderr,
flush=True,
)
try:
proc = await asyncio.create_subprocess_exec(*cmd)
except OSError:
return 1
return await proc.wait()
async def ainvoke(
self, remote_args: List[str], *, defer: Optional[bool] = None
) -> Union[int, asyncio.Task[int]]:
"""
Async interactive Fi-Wi CLI (inherits stdin — no ``asyncio.to_thread``).
When deferred, returns an :class:`asyncio.Task`.
"""
if resolve_remote_defer(defer):
return asyncio.create_task(self._ainvoke_coro(remote_args))
return await self._ainvoke_coro(remote_args)
async def _ainvoke_capture_coro(
self, remote_args: List[str], timeout: float = 90
) -> Tuple[int, str, str]:
cfg = SshNodeConfig.load()
cmd = self._fiwi_cmd_argv(cfg, remote_args)
return await _async_subprocess_communicate(
cmd,
timeout=timeout,
on_timeout=(124, "", "ssh/fiwi timed out"),
on_spawn_error=(1, "", ""),
)
async def ainvoke_capture(
self,
remote_args: List[str],
*,
timeout: float = 90,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str, str], asyncio.Task[Tuple[int, str, str]]]:
"""Fi-Wi CLI with captured stdout/stderr; concurrent-safe when deferred (returns Task)."""
if resolve_remote_defer(defer):
return asyncio.create_task(
self._ainvoke_capture_coro(remote_args, timeout=timeout)
)
return await self._ainvoke_capture_coro(remote_args, timeout=timeout)
async def _araw_ssh_coro(
self, remote_argv: List[str], *, timeout: float = 45
) -> Tuple[int, str, str]:
cfg = SshNodeConfig.load()
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
return await _async_subprocess_communicate(
cmd,
timeout=timeout,
on_timeout=(1, "", ""),
on_spawn_error=(1, "", ""),
catch_communicate_oserror=True,
)
async def araw_ssh(
self,
remote_argv: List[str],
*,
timeout: float = 45,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str, str], asyncio.Task[Tuple[int, str, str]]]:
"""Async raw remote command (no ``fiwi.py`` wrapper)."""
if resolve_remote_defer(defer):
return asyncio.create_task(self._araw_ssh_coro(remote_argv, timeout=timeout))
return await self._araw_ssh_coro(remote_argv, timeout=timeout)
def _raw_ssh_blocking(
self, remote_argv: List[str], *, timeout: float = 45
) -> Tuple[int, str, str]:
cfg = SshNodeConfig.load()
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL,
)
except (OSError, subprocess.TimeoutExpired):
return 1, "", ""
return (
proc.returncode if proc.returncode is not None else 1,
proc.stdout or "",
proc.stderr or "",
)
def raw_ssh(
self,
remote_argv: List[str],
*,
timeout: float = 45,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str, str], RemoteCallHandle]:
"""Run ``remote_argv`` on the host (no ``python fiwi.py`` prefix)."""
if resolve_remote_defer(defer):
cfg = SshNodeConfig.load()
cmd = [cfg.ssh_bin, *cfg.ssh_extra_argv, self.target, *remote_argv]
return RemoteCallHandle(cmd, timeout=timeout)
return self._raw_ssh_blocking(remote_argv, timeout=timeout)
def _fetch_calibrate_ports_json_blocking(self) -> List[Tuple[int, int]]:
"""``[[hub, port], …]`` from remote ``calibrate-ports-json``, else parse ``discover``."""
code, out, err = self._invoke_capture_blocking(["calibrate-ports-json"], timeout=90)
out = out or ""
err = err or ""
json_pairs = None
if code == 0 and out.strip():
json_pairs = _pairs_from_calibrate_json_stdout(out)
if json_pairs is not None:
return json_pairs
if "Unknown command" in (out + err):
print(
f"Remote {self.target!r} has no calibrate-ports-json; using discover fallback "
"(update fiwi on the remote host to skip the extra SSH round trip).",
file=sys.stderr,
flush=True,
)
code2, out2, err2 = self._invoke_capture_blocking(["discover"], timeout=120)
out2 = out2 or ""
if code2 != 0:
print(
f"discover on {self.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code2, out2, err2)
if code != 0 or (out + err).strip():
print(
f"calibrate-ports-json on {self.target!r} (exit {code}): {(err or out).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code, out, err)
return []
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
if not pairs:
print(
f"Could not parse hub/port list from discover on {self.target!r}. "
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
file=sys.stderr,
flush=True,
)
return []
print(
f"Using discover output for {self.target!r} ({len(pairs)} hub.port step(s)).",
file=sys.stderr,
flush=True,
)
return pairs
def fetch_calibrate_ports_json(
self, *, defer: Optional[bool] = None
) -> Union[List[Tuple[int, int]], FetchCalibratePortsHandle]:
if resolve_remote_defer(defer):
return FetchCalibratePortsHandle(self)
return self._fetch_calibrate_ports_json_blocking()
async def _afetch_calibrate_ports_json_coro(self) -> List[Tuple[int, int]]:
code, out, err = await self._ainvoke_capture_coro(["calibrate-ports-json"], timeout=90)
out = out or ""
err = err or ""
json_pairs = None
if code == 0 and out.strip():
json_pairs = _pairs_from_calibrate_json_stdout(out)
if json_pairs is not None:
return json_pairs
if "Unknown command" in (out + err):
print(
f"Remote {self.target!r} has no calibrate-ports-json; using discover fallback "
"(update fiwi on the remote host to skip the extra SSH round trip).",
file=sys.stderr,
flush=True,
)
code2, out2, err2 = await self._ainvoke_capture_coro(["discover"], timeout=120)
out2 = out2 or ""
if code2 != 0:
print(
f"discover on {self.target!r} failed (exit {code2}): {(err2 or out2).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code2, out2, err2)
if code != 0 or (out + err).strip():
print(
f"calibrate-ports-json on {self.target!r} (exit {code}): {(err or out).strip()[:400]}",
file=sys.stderr,
flush=True,
)
_maybe_hint_remote_python(code, out, err)
return []
pairs = _parse_discover_stdout_for_calibrate_ports(out2)
if not pairs:
print(
f"Could not parse hub/port list from discover on {self.target!r}. "
"Ensure the host sees its hubs (udev 24ff) and discover prints the Hub|Serial|Ports table.",
file=sys.stderr,
flush=True,
)
return []
print(
f"Using discover output for {self.target!r} ({len(pairs)} hub.port step(s)).",
file=sys.stderr,
flush=True,
)
return pairs
async def afetch_calibrate_ports_json(
self, *, defer: Optional[bool] = None
) -> Union[List[Tuple[int, int]], asyncio.Task[List[Tuple[int, int]]]]:
"""Async :meth:`fetch_calibrate_ports_json`; use ``asyncio.gather`` on tasks when deferred."""
if resolve_remote_defer(defer):
return asyncio.create_task(self._afetch_calibrate_ports_json_coro())
return await self._afetch_calibrate_ports_json_coro()
def _remote_hub_port_power_blocking(
self, hub_1: int, port_0: int, enable: bool
) -> Tuple[int, str]:
sub = "on" if enable else "off"
code, out, err = self._invoke_capture_blocking([sub, f"{hub_1}.{port_0}"])
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
return code, blob
def remote_hub_port_power(
self,
hub_1: int,
port_0: int,
enable: bool,
*,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str], _HubPowerHandle]:
if resolve_remote_defer(defer):
return _HubPowerHandle(self, hub_1, port_0, enable)
return self._remote_hub_port_power_blocking(hub_1, port_0, enable)
async def _aremote_hub_port_power_coro(
self, hub_1: int, port_0: int, enable: bool
) -> Tuple[int, str]:
sub = "on" if enable else "off"
code, out, err = await self._ainvoke_capture_coro([sub, f"{hub_1}.{port_0}"])
blob = "\n".join(x.strip() for x in (out or "", err or "") if x and x.strip()).strip()
return code, blob
async def aremote_hub_port_power(
self,
hub_1: int,
port_0: int,
enable: bool,
*,
defer: Optional[bool] = None,
) -> Union[Tuple[int, str], asyncio.Task[Tuple[int, str]]]:
if resolve_remote_defer(defer):
return asyncio.create_task(
self._aremote_hub_port_power_coro(hub_1, port_0, enable)
)
return await self._aremote_hub_port_power_coro(hub_1, port_0, enable)
def _remote_port_power_feedback_blocking(self, hub_1: int, port_0: int) -> str:
code, out, err = self._invoke_capture_blocking(["status", f"{hub_1}.{port_0}"])
if code != 0:
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
pwr, cur = parse_status_line_for_hub_port(out, hub_1, port_0)
return f"remote hub reports {pwr}, {cur} mA"
def remote_port_power_feedback(
self, hub_1: int, port_0: int, *, defer: Optional[bool] = None
) -> Union[str, _StrFeedbackHandle]:
if resolve_remote_defer(defer):
return _StrFeedbackHandle(self, hub_1, port_0)
return self._remote_port_power_feedback_blocking(hub_1, port_0)
async def _aremote_port_power_feedback_coro(self, hub_1: int, port_0: int) -> str:
code, out, err = await self._ainvoke_capture_coro(["status", f"{hub_1}.{port_0}"])
if code != 0:
return f"remote status failed ({code}): {(err or out).strip()[:120]}"
pwr, cur = parse_status_line_for_hub_port(out, hub_1, port_0)
return f"remote hub reports {pwr}, {cur} mA"
async def aremote_port_power_feedback(
self, hub_1: int, port_0: int, *, defer: Optional[bool] = None
) -> Union[str, asyncio.Task[str]]:
if resolve_remote_defer(defer):
return asyncio.create_task(
self._aremote_port_power_feedback_coro(hub_1, port_0)
)
return await self._aremote_port_power_feedback_coro(hub_1, port_0)
def _remote_wlan_info_json_blocking(self, timeout: float = 60) -> Optional[Dict[str, Any]]:
code, out, err = self._invoke_capture_blocking(
["wlan-info-json"], timeout=timeout
)
if code != 0 or not (out or "").strip():
return None
try:
data = json.loads(out.strip())
except json.JSONDecodeError:
return None
return data if isinstance(data, dict) else None
def remote_wlan_info_json(
self, timeout: float = 60, *, defer: Optional[bool] = None
) -> Union[Optional[Dict[str, Any]], _WlanJsonHandle]:
if resolve_remote_defer(defer):
return _WlanJsonHandle(self, timeout)
return self._remote_wlan_info_json_blocking(timeout=timeout)
async def _aremote_wlan_info_json_coro(self, timeout: float = 60) -> Optional[Dict[str, Any]]:
code, out, err = await self._ainvoke_capture_coro(
["wlan-info-json"], timeout=timeout
)
if code != 0 or not (out or "").strip():
return None
try:
data = json.loads(out.strip())
except json.JSONDecodeError:
return None
return data if isinstance(data, dict) else None
async def aremote_wlan_info_json(
self, timeout: float = 60, *, defer: Optional[bool] = None
) -> Union[Optional[Dict[str, Any]], asyncio.Task[Optional[Dict[str, Any]]]]:
if resolve_remote_defer(defer):
return asyncio.create_task(self._aremote_wlan_info_json_coro(timeout=timeout))
return await self._aremote_wlan_info_json_coro(timeout=timeout)
def _remote_lsusb_lines_blocking(self) -> List[str]:
code, out, err = self._invoke_capture_blocking(["lsusb-lines-json"], timeout=45)
if code == 0 and out.strip():
try:
data = json.loads(out.strip())
if isinstance(data, list) and all(isinstance(x, str) for x in data):
return data
except json.JSONDecodeError:
pass
code2, out2, err2 = self._raw_ssh_blocking(["lsusb"], timeout=45)
if code2 != 0 or not out2:
return []
return out2.splitlines()
def remote_lsusb_lines(
self, *, defer: Optional[bool] = None
) -> Union[List[str], _LsusbLinesHandle]:
if resolve_remote_defer(defer):
return _LsusbLinesHandle(self)
return self._remote_lsusb_lines_blocking()
async def _aremote_lsusb_lines_coro(self) -> List[str]:
code, out, err = await self._ainvoke_capture_coro(["lsusb-lines-json"], timeout=45)
if code == 0 and out.strip():
try:
data = json.loads(out.strip())
if isinstance(data, list) and all(isinstance(x, str) for x in data):
return data
except json.JSONDecodeError:
pass
code2, out2, err2 = await self._araw_ssh_coro(["lsusb"], timeout=45)
if code2 != 0 or not out2:
return []
return out2.splitlines()
async def aremote_lsusb_lines(
self, *, defer: Optional[bool] = None
) -> Union[List[str], asyncio.Task[List[str]]]:
if resolve_remote_defer(defer):
return asyncio.create_task(self._aremote_lsusb_lines_coro())
return await self._aremote_lsusb_lines_coro()
def _pairs_from_calibrate_json_stdout(out: str) -> Optional[List[Tuple[int, int]]]:
try:
data = json.loads(out.strip())
except (json.JSONDecodeError, AttributeError):
return None
if not isinstance(data, list):
return None
pairs: List[Tuple[int, int]] = []
for item in data:
if isinstance(item, (list, tuple)) and len(item) == 2:
try:
pairs.append((int(item[0]), int(item[1])))
except (TypeError, ValueError):
continue
return pairs
def _maybe_hint_remote_python(exit_code: int, out: str, err: str) -> None:
"""Exit 126/127 often means bad ``FIWI_REMOTE_PYTHON`` on the remote host."""
cfg = SshNodeConfig.load()
if exit_code not in (126, 127):
return
blob = f"{out or ''} {err or ''}".lower()
if "no such file" not in blob and "not found" not in blob:
return
print(
" SSH ran this on the remote host (paths must exist *there*, not on your workstation):\n"
f" interpreter: {cfg.python}\n"
f" script: {cfg.script}\n"
" Fix: SSH in, activate the venv with brainstem, run:\n"
" which python3\n"
" realpath /path/to/fiwi.py\n"
" Put those absolute paths in remote_ssh.env next to fiwi.py, or export:\n"
" FIWI_REMOTE_PYTHON=… FIWI_REMOTE_SCRIPT=…",
file=sys.stderr,
flush=True,
)
def _parse_discover_stdout_for_calibrate_ports(stdout: str) -> List[Tuple[int, int]]:
"""Parse ``discover`` table lines → ``(hub, port0)`` pairs."""
pairs = []
for line in stdout.splitlines():
raw = line.rstrip()
if "|" not in raw:
continue
if set(raw.strip()) <= {"-", " "}:
continue
parts = [p.strip() for p in raw.split("|")]
if len(parts) < 3:
continue
hub_s, _, n_s = parts[0], parts[1], parts[2]
if not hub_s.isdigit():
continue
try:
hub = int(hub_s)
nports = int(n_s)
except ValueError:
continue
if hub < 1 or nports < 1:
continue
for p in range(nports):
pairs.append((hub, p))
return pairs
def parse_status_line_for_hub_port(stdout: str, hub_1: int, port_0: int) -> Tuple[str, str]:
"""Parse ``status hub.port`` table output for one port; return ``(power, current_str)``."""
for line in stdout.splitlines():
ln = line.strip()
if not ln or ln.startswith("-") or "Identity" in ln or "Hub" not in ln:
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) < 4:
continue
hub_part = parts[0].replace("Hub", "", 1).strip()
try:
h = int(hub_part)
p = int(parts[1])
except ValueError:
continue
if h == hub_1 and p == port_0:
return parts[2], parts[3]
return "?", "?"