FiWiControl/scripts/system/rrh_power_control.py

129 lines
4.2 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2026 Umber
#
# Licensed under the Apache License, Version 2.0; see LICENSE.
"""Power control for RRHs in a FabricDefinition via Acroname ports.
Default behavior keeps one target ``radio_id`` ON and powers OFF all others.
Use ``--all-off`` to power OFF every RRH port in the fabric JSON.
"""
from __future__ import annotations
import argparse
import asyncio
from pathlib import Path
from fiwicontrol.fabric.fabric import FabricDefinition
from fiwicontrol.lab.discovery import discover_acroname_modules
from fiwicontrol.power.acroname import AcronamePower
_REPO_ROOT = Path(__file__).resolve().parents[2]
async def _power_only_one(
*,
fabric_json: str,
keep_radio_id: str | None,
all_off: bool,
dry_run: bool,
) -> None:
json_path = Path(fabric_json).expanduser()
if not json_path.is_absolute():
json_path = (_REPO_ROOT / json_path).resolve()
if not json_path.is_file():
cfg_dir = (_REPO_ROOT / "configs").resolve()
candidates = sorted(cfg_dir.glob("*.json")) if cfg_dir.is_dir() else []
if len(candidates) == 1:
json_path = candidates[0]
print("Using fabric JSON: {}".format(json_path))
else:
msg = ["Fabric JSON not found: {}".format(json_path)]
if candidates:
msg.append("Found candidates under configs/:")
msg.extend(" - {}".format(p) for p in candidates)
else:
msg.append("No *.json files found under configs/.")
msg.append("Pass --fabric-json PATH")
raise SystemExit("\n".join(msg))
fd = FabricDefinition.load(json_path)
rrhs = {h.radio_id: h for h in fd.rrhs}
if not all_off and keep_radio_id not in rrhs:
raise SystemExit("radio_id {!r} not found in {}".format(keep_radio_id, json_path))
modules = discover_acroname_modules()
by_serial = {int(m.serial_number): m for m in modules}
keep_key: tuple[int | None, int] | None = None
if all_off:
print("Mode: all-off (powering OFF every RRH port in fabric JSON)")
else:
keep = rrhs[keep_radio_id]
keep_key = (keep.acroname_module_serial, keep.acroname_port)
print(
"Keeping ON: radio_id={} module={} port={}".format(
keep_radio_id, keep_key[0], keep_key[1]
)
)
for h in fd.rrhs:
key = (h.acroname_module_serial, h.acroname_port)
if keep_key is not None and key == keep_key:
continue
if h.acroname_module_serial is None:
raise RuntimeError(
"{}: missing acroname_module_serial in {}".format(h.radio_id, json_path)
)
serial = int(h.acroname_module_serial)
mod = by_serial.get(serial)
if mod is None:
raise RuntimeError(
"{}: module serial {} not found on local USB".format(h.radio_id, serial)
)
if dry_run:
print("DRY-RUN OFF: radio_id={} module={} port={}".format(h.radio_id, serial, h.acroname_port))
continue
ap = AcronamePower(mod)
await ap.port_off(h.acroname_port)
print("OFF: radio_id={} module={} port={}".format(h.radio_id, serial, h.acroname_port))
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"-f",
"--fabric-json",
default="configs/my-fabric.json",
help="Path to FabricDefinition JSON (default: configs/my-fabric.json)",
)
p.add_argument(
"--keep-radio-id",
default="7915",
help="radio_id to keep powered on (default: 7915, ignored with --all-off)",
)
p.add_argument(
"--all-off",
action="store_true",
help="Power off all RRH ports defined in the fabric JSON",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Print actions without toggling Acroname ports",
)
args = p.parse_args()
asyncio.run(
_power_only_one(
fabric_json=args.fabric_json,
keep_radio_id=args.keep_radio_id if not args.all_off else None,
all_off=args.all_off,
dry_run=args.dry_run,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())