# Copyright (c) 2026 Umber # # Licensed under the Apache License, Version 2.0; see LICENSE. from __future__ import annotations import json from pathlib import Path from fiwicontrol.concentrator import ( ConcentratorPlatform, ConcentratorPlatformSnapshot, PCI_DEVICE_LINK_JSON_COLS, PciDeviceLinkSnapshot, PciDeviceLinksCodec, collect_pci_sysfs_links, format_concentrator_platform_snapshot_human, ) def _intel_two_thread_cpuinfo() -> str: return """processor\t: 0 vendor_id\t: GenuineIntel cpu family\t: 6 model\t: 165 model name\t: Intel(R) Core(TM) i9-10900K CPU @ 3.70GHz physical id\t: 0 core id\t: 0 cpu cores\t: 10 processor\t: 1 vendor_id\t: GenuineIntel cpu family\t: 6 model\t: 165 model name\t: Intel(R) Core(TM) i9-10900K CPU @ 3.70GHz physical id\t: 0 core id\t: 0 cpu cores\t: 10 """ def test_snapshot_from_fake_cpuinfo_intel(tmp_path: Path) -> None: p = tmp_path / "cpuinfo" p.write_text(_intel_two_thread_cpuinfo(), encoding="utf-8") snap = ConcentratorPlatform(label="lab").snapshot(proc_cpuinfo=p) assert snap.vendor_id == "GenuineIntel" assert "Intel" in (snap.model_name or "") assert snap.logical_processors == 2 assert snap.physical_cores == 1 assert snap.cpu_family_label() == "Intel" def test_snapshot_amd_vendor(tmp_path: Path) -> None: p = tmp_path / "cpuinfo" p.write_text( "processor\t: 0\nvendor_id\t: AuthenticAMD\nmodel name\t: AMD Ryzen 9\n" "physical id\t: 0\ncore id\t: 0\ncpu cores\t: 12\n\n", encoding="utf-8", ) snap = ConcentratorPlatform().snapshot(proc_cpuinfo=p) assert snap.cpu_family_label() == "AMD" def test_format_human_wifi_only_and_chip() -> None: links = ( PciDeviceLinkSnapshot( bdf="0000:00:07.1", current_link_width=16, max_link_width=16, current_link_speed="32.0 GT/s PCIe", max_link_speed="32.0 GT/s PCIe", pci_class="0x060400", ), PciDeviceLinkSnapshot( bdf="0000:03:00.0", current_link_width=1, max_link_width=1, current_link_speed="5.0 GT/s PCIe", max_link_speed="5.0 GT/s PCIe", pci_class="0x028000", ), ) snap = ConcentratorPlatformSnapshot( vendor_id="AuthenticAMD", model_name="Test", logical_processors=4, physical_cores=2, architecture="x86_64", platform_string="Linux-test", pci_device_links=links, lspci_tree="line0\nline1\nline2", ) text = format_concentrator_platform_snapshot_human( snap, label="lab", pci_max_rows=20, lspci_max_lines=2, lspci_nn_by_bdf={"0000:03:00.0": "MEDIATEK Corp. MT7925 (test)"}, ) assert "PCIe sysfs link entries: 2" in text assert "Wi‑Fi / wireless PCI (1 of 2)" in text assert "0000:03:00.0" in text and "MT7925" in text assert "0000:00:07.1" not in text assert "line0" in text and "line1" in text and "line2" not in text def test_format_human_pci_extra_shows_bridge() -> None: links = ( PciDeviceLinkSnapshot( bdf="0000:00:07.1", current_link_width=16, max_link_width=16, current_link_speed="32.0 GT/s PCIe", max_link_speed="32.0 GT/s PCIe", pci_class="0x060400", ), PciDeviceLinkSnapshot( bdf="0000:03:00.0", current_link_width=1, max_link_width=1, current_link_speed="5.0 GT/s PCIe", max_link_speed="5.0 GT/s PCIe", pci_class="0x028000", ), ) snap = ConcentratorPlatformSnapshot( vendor_id="AuthenticAMD", model_name="Test", logical_processors=4, physical_cores=2, architecture="x86_64", platform_string="Linux-test", pci_device_links=links, lspci_tree=None, ) text = format_concentrator_platform_snapshot_human( snap, include_pci_extra=True, lspci_nn_by_bdf={"0000:03:00.0": "WiFi chip"}, lspci_max_lines=0, ) assert "0000:03:00.0" in text and "0000:00:07.1" in text assert "Other PCIe links" in text def test_collect_pci_sysfs_links_fake_tree(tmp_path: Path) -> None: root = tmp_path / "pci_devices" dev = root / "0000:00:01.0" dev.mkdir(parents=True) (dev / "current_link_width").write_text("8\n", encoding="utf-8") (dev / "max_link_width").write_text("16\n", encoding="utf-8") (dev / "current_link_speed").write_text("8.0 GT/s PCIe\n", encoding="utf-8") (dev / "max_link_speed").write_text("16.0 GT/s PCIe\n", encoding="utf-8") (dev / "class").write_text("0x060400\n", encoding="utf-8") rows = collect_pci_sysfs_links(root) assert len(rows) == 1 assert rows[0].bdf == "0000:00:01.0" assert rows[0].current_link_width == 8 and rows[0].max_link_width == 16 assert rows[0].current_link_speed == "8.0 GT/s PCIe" def test_pci_device_links_object_list_json_codec_roundtrip() -> None: raw_rows = [ { "bdf": "0000:01:00.0", "current_link_width": 16, "max_link_width": 16, "current_link_speed": "32.0 GT/s PCIe", "max_link_speed": "32.0 GT/s PCIe", "pci_class": "0x060400", } ] links = PciDeviceLinksCodec.decode(raw_rows) assert len(links) == 1 and links[0].bdf == "0000:01:00.0" snap = ConcentratorPlatformSnapshot( vendor_id=None, model_name=None, logical_processors=1, physical_cores=None, architecture="x86_64", platform_string="t", pci_device_links=links, ) d = snap.to_json_dict() assert "cols" in d["pci_device_links"] and d["pci_device_links"]["cols"] == list(PCI_DEVICE_LINK_JSON_COLS) assert d["pci_device_links"]["rows"][0][0] == "0000:01:00.0" assert d["pci_device_links"]["rows"][0][3] == 32 def test_snapshot_json_roundtrip() -> None: link = PciDeviceLinkSnapshot( bdf="0000:00:01.0", current_link_width=4, max_link_width=16, current_link_speed="8.0 GT/s PCIe", max_link_speed="32.0 GT/s PCIe", pci_class="0x060400", ) s = ConcentratorPlatformSnapshot( vendor_id="GenuineIntel", model_name="Test CPU", logical_processors=8, physical_cores=4, architecture="x86_64", platform_string="Linux-test", lspci_tree="(test tree)", pci_device_links=(link,), ) d = s.to_json_dict() s2 = ConcentratorPlatformSnapshot.from_json_dict(d) assert s2.model_name == "Test CPU" and s2.logical_processors == 8 assert "cpu_family_label" in json.dumps(d) assert s2.lspci_tree == "(test tree)" and len(s2.pci_device_links) == 1 assert s2.pci_device_links[0].bdf == "0000:00:01.0"