FiWiControl/tests/test_node_control.py

105 lines
4.3 KiB
Python

import argparse
import asyncio
import logging
import os
import sys
import unittest
from orchestrator.commands.node_control import Command, CommandManager, ssh_node
class _RemoteNodeMixin:
def _make_node(self, silent_mode=False):
remote_ip = os.getenv("FIWI_REMOTE_IP", "192.168.1.39")
return ssh_node(
name=None,
ipaddr=remote_ip,
ssh_controlmaster=False,
silent_mode=silent_mode,
)
class TestRemoteSingleRunCommands(_RemoteNodeMixin, unittest.IsolatedAsyncioTestCase):
async def test_ls_home_directory(self):
node = self._make_node()
session = await node.rexec(
cmd="ls ~",
IO_TIMEOUT=15.0,
CMD_TIMEOUT=30,
CONNECT_TIMEOUT=20.0,
repeat=None,
)
output = session.results.decode("utf-8", errors="replace").strip()
self.assertTrue(output, "Expected non-empty output from `ls ~` on remote host.")
async def test_dmesg_tail(self):
node = self._make_node(silent_mode=False)
session = await node.rexec(
cmd="dmesg | tail -n 25",
IO_TIMEOUT=20.0,
CMD_TIMEOUT=45,
CONNECT_TIMEOUT=20.0,
repeat=None,
)
output = session.results.decode("utf-8", errors="replace").strip()
self.assertTrue(output, "Expected non-empty output from `dmesg | tail -n 25` on remote host.")
class TestRemoteRepeatingCommands(_RemoteNodeMixin, unittest.IsolatedAsyncioTestCase):
async def test_repeat_01_uname_r_every_1s_10_times(self):
node = self._make_node()
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=10)
await cmd.start()
await asyncio.wait_for(cmd.task, timeout=20.0)
self.assertEqual(cmd.run_count, 10, "Expected 10 uname -r runs.")
async def test_repeat_02_pwd_and_ls_every_1s_5_times(self):
node = self._make_node()
pwd_cmd = Command(node=node, cmd="pwd", interval=1.0, count=5)
ls_cmd = Command(node=node, cmd="ls", interval=1.0, count=5)
await asyncio.gather(pwd_cmd.start(), ls_cmd.start())
await asyncio.wait_for(asyncio.gather(pwd_cmd.task, ls_cmd.task), timeout=15.0)
self.assertEqual(pwd_cmd.run_count, 5, "Expected 5 pwd runs.")
self.assertEqual(ls_cmd.run_count, 5, "Expected 5 ls runs.")
async def test_repeat_03_forever_can_be_stopped(self):
node = self._make_node()
cmd = Command(node=node, cmd="uname -r", interval=1.0, count=Command.COUNT_FOREVER)
await cmd.start()
await asyncio.sleep(2.2)
await cmd.stop()
self.assertGreaterEqual(cmd.run_count, 2, "Expected at least 2 runs before stop.")
async def test_command_manager_add_and_self_clean(self):
node = self._make_node()
manager = CommandManager()
cid_fast = await manager.add_command(node=node, cmd="uname -r", interval=0.2, count=2)
cid_slow = await manager.add_command(node=node, cmd="pwd", interval=0.2, count=3)
self.assertIn(cid_fast, manager.list_active())
self.assertIn(cid_slow, manager.list_active())
await asyncio.sleep(1.2)
self.assertEqual(manager.list_active(), {}, "Expected active map to self-clean after finite runs.")
self.assertEqual(manager.list_abnormal_history(), [], "Expected no abnormal history on normal completion.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Integration tests for orchestrator.commands.node_control (passwordless SSH required)."
)
parser.add_argument(
"--remote-ip",
default="192.168.1.39",
help="Target host for remote tests (passwordless SSH to root@<ip> must work).",
)
parser.add_argument("--debug", action="store_true", help="Enable asyncio/transport debug logging.")
args, remaining = parser.parse_known_args()
os.environ["FIWI_REMOTE_IP"] = args.remote_ip
if args.debug:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s:%(message)s")
logging.getLogger("asyncio").setLevel(logging.INFO)
else:
logging.basicConfig(level=logging.WARNING, format="%(message)s")
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("orchestrator.commands.node_control").setLevel(logging.INFO)
unittest.main(argv=[sys.argv[0], *remaining])