FiWiManager/fiwi/patch_panel.py

121 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Physical patch panel: front-panel position count for the rack (field workflow).
Stored in fiber_map.json as ``patch_panel``: ``{ "slots": N, "label": "", "location": "" }``.
USB hub calibrate still walks hub ports; map keys 1…N align with panel positions.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from fiwi.constants import PANEL_SLOTS
if TYPE_CHECKING:
from fiwi.concentrator import FiWiConcentrator
from fiwi.radiohead import RadioHead, RadioHeadEntry
_MAX_SLOTS = 256
def default_panel_ports() -> int:
"""
Fallback front-panel **port** count from ``FIWI_DEFAULT_PANEL_PORTS`` (``config/*.ini`` ``[patch_panel] default_ports``),
or legacy ``FIWI_DEFAULT_PANEL_SLOTS``, else ``PANEL_SLOTS``.
"""
raw = (os.environ.get("FIWI_DEFAULT_PANEL_PORTS") or os.environ.get("FIWI_DEFAULT_PANEL_SLOTS") or "").strip()
if raw.isdigit():
n = int(raw)
if 1 <= n <= _MAX_SLOTS:
return n
return PANEL_SLOTS
@dataclass(frozen=True)
class PatchPanel:
"""Instantiated panel: ``slots`` front-panel positions (numbered 1…slots)."""
slots: int
label: str = ""
location: str = ""
def __post_init__(self) -> None:
if self.slots < 1:
raise ValueError("patch panel slots must be >= 1")
if self.slots > _MAX_SLOTS:
raise ValueError(f"patch panel slots must be <= {_MAX_SLOTS}")
def to_map_blob(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"slots": self.slots}
if self.label.strip():
d["label"] = self.label.strip()
if self.location.strip():
d["location"] = self.location.strip()
return d
@classmethod
def from_map_blob(cls, blob: Any) -> Optional[PatchPanel]:
if not isinstance(blob, dict):
return None
s = blob.get("slots")
if isinstance(s, str) and s.strip().isdigit():
s = int(s.strip())
if not isinstance(s, int) or s < 1:
return None
if s > _MAX_SLOTS:
return None
lab = blob.get("label")
label = lab.strip() if isinstance(lab, str) else ""
loc = blob.get("location")
location = loc.strip() if isinstance(loc, str) else ""
return cls(slots=s, label=label, location=location)
def bound(self, concentrator: FiWiConcentrator, doc: Dict[str, Any]) -> BoundPatchPanel:
"""Attach a :class:`FiWiConcentrator` and loaded fiber map for :class:`~fiwi.radiohead.RadioHead` access."""
return BoundPatchPanel(panel=self, concentrator=concentrator, doc=doc)
@dataclass
class BoundPatchPanel:
"""
Patch panel (slot count) plus fiber map document and concentrator — the supported way to get a
:class:`~fiwi.radiohead.RadioHead` for a front-panel position.
"""
panel: PatchPanel
concentrator: FiWiConcentrator
doc: Dict[str, Any]
def head(self, position_1based: int) -> Optional[RadioHead]:
"""Radio head for panel position ``1…panel.slots``, or ``None`` if unmapped."""
if position_1based < 1 or position_1based > self.panel.slots:
raise ValueError(
f"panel position must be 1{self.panel.slots}, got {position_1based}"
)
map_ent = RadioHeadEntry.from_port_id(self.doc, position_1based)
if not map_ent.is_mapped():
return None
return RadioHead(map_ent, self.concentrator, patch_panel_port=position_1based)
def heads(self) -> List[RadioHead]:
"""Mapped positions only, in panel order (1…slots)."""
out: List[RadioHead] = []
for n in range(1, self.panel.slots + 1):
rh = self.head(n)
if rh is not None:
out.append(rh)
return out
def effective_panel_slots(doc: Optional[Dict[str, Any]]) -> int:
"""Panel position count from ``doc['patch_panel']``, else :func:`default_panel_ports`."""
if not isinstance(doc, dict):
return default_panel_ports()
pp = PatchPanel.from_map_blob(doc.get("patch_panel"))
if pp is not None:
return pp.slots
return default_panel_ports()