121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
"""
|
||
Physical patch panel: front-panel position count for the rack (field workflow).
|
||
|
||
Stored in fiber_map.json as ``patch_panel``: ``{ "slots": N, "label": "…", "location": "…" }``.
|
||
USB hub calibrate still walks hub ports; map keys 1…N align with panel positions.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
from dataclasses import dataclass
|
||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||
|
||
from fiwi.constants import PANEL_SLOTS
|
||
|
||
if TYPE_CHECKING:
|
||
from fiwi.concentrator import FiWiConcentrator
|
||
|
||
from fiwi.radiohead import RadioHead, RadioHeadEntry
|
||
|
||
_MAX_SLOTS = 256
|
||
|
||
|
||
def default_panel_ports() -> int:
|
||
"""
|
||
Fallback front-panel **port** count from ``FIWI_DEFAULT_PANEL_PORTS`` (``config/*.ini`` ``[patch_panel] default_ports``),
|
||
or legacy ``FIWI_DEFAULT_PANEL_SLOTS``, else ``PANEL_SLOTS``.
|
||
"""
|
||
raw = (os.environ.get("FIWI_DEFAULT_PANEL_PORTS") or os.environ.get("FIWI_DEFAULT_PANEL_SLOTS") or "").strip()
|
||
if raw.isdigit():
|
||
n = int(raw)
|
||
if 1 <= n <= _MAX_SLOTS:
|
||
return n
|
||
return PANEL_SLOTS
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class PatchPanel:
|
||
"""Instantiated panel: ``slots`` front-panel positions (numbered 1…slots)."""
|
||
|
||
slots: int
|
||
label: str = ""
|
||
location: str = ""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.slots < 1:
|
||
raise ValueError("patch panel slots must be >= 1")
|
||
if self.slots > _MAX_SLOTS:
|
||
raise ValueError(f"patch panel slots must be <= {_MAX_SLOTS}")
|
||
|
||
def to_map_blob(self) -> Dict[str, Any]:
|
||
d: Dict[str, Any] = {"slots": self.slots}
|
||
if self.label.strip():
|
||
d["label"] = self.label.strip()
|
||
if self.location.strip():
|
||
d["location"] = self.location.strip()
|
||
return d
|
||
|
||
@classmethod
|
||
def from_map_blob(cls, blob: Any) -> Optional[PatchPanel]:
|
||
if not isinstance(blob, dict):
|
||
return None
|
||
s = blob.get("slots")
|
||
if isinstance(s, str) and s.strip().isdigit():
|
||
s = int(s.strip())
|
||
if not isinstance(s, int) or s < 1:
|
||
return None
|
||
if s > _MAX_SLOTS:
|
||
return None
|
||
lab = blob.get("label")
|
||
label = lab.strip() if isinstance(lab, str) else ""
|
||
loc = blob.get("location")
|
||
location = loc.strip() if isinstance(loc, str) else ""
|
||
return cls(slots=s, label=label, location=location)
|
||
|
||
def bound(self, concentrator: FiWiConcentrator, doc: Dict[str, Any]) -> BoundPatchPanel:
|
||
"""Attach a :class:`FiWiConcentrator` and loaded fiber map for :class:`~fiwi.radiohead.RadioHead` access."""
|
||
return BoundPatchPanel(panel=self, concentrator=concentrator, doc=doc)
|
||
|
||
|
||
@dataclass
|
||
class BoundPatchPanel:
|
||
"""
|
||
Patch panel (slot count) plus fiber map document and concentrator — the supported way to get a
|
||
:class:`~fiwi.radiohead.RadioHead` for a front-panel position.
|
||
"""
|
||
|
||
panel: PatchPanel
|
||
concentrator: FiWiConcentrator
|
||
doc: Dict[str, Any]
|
||
|
||
def head(self, position_1based: int) -> Optional[RadioHead]:
|
||
"""Radio head for panel position ``1…panel.slots``, or ``None`` if unmapped."""
|
||
if position_1based < 1 or position_1based > self.panel.slots:
|
||
raise ValueError(
|
||
f"panel position must be 1–{self.panel.slots}, got {position_1based}"
|
||
)
|
||
map_ent = RadioHeadEntry.from_port_id(self.doc, position_1based)
|
||
if not map_ent.is_mapped():
|
||
return None
|
||
return RadioHead(map_ent, self.concentrator, patch_panel_port=position_1based)
|
||
|
||
def heads(self) -> List[RadioHead]:
|
||
"""Mapped positions only, in panel order (1…slots)."""
|
||
out: List[RadioHead] = []
|
||
for n in range(1, self.panel.slots + 1):
|
||
rh = self.head(n)
|
||
if rh is not None:
|
||
out.append(rh)
|
||
return out
|
||
|
||
|
||
def effective_panel_slots(doc: Optional[Dict[str, Any]]) -> int:
|
||
"""Panel position count from ``doc['patch_panel']``, else :func:`default_panel_ports`."""
|
||
if not isinstance(doc, dict):
|
||
return default_panel_ports()
|
||
pp = PatchPanel.from_map_blob(doc.get("patch_panel"))
|
||
if pp is not None:
|
||
return pp.slots
|
||
return default_panel_ports()
|