438 lines
12 KiB
Python
438 lines
12 KiB
Python
"""
|
|
Central diagnostic timeline: append-only, wall-clock ordered events for post-mortem debugging.
|
|
|
|
Async helpers snapshot **dmesg** and **lspci** (local or over SSH). Use :func:`get_diag_log` for a
|
|
process-wide buffer, or construct :class:`DiagLog` for tests / isolated sessions.
|
|
|
|
Extension — kernel debugging / crash dumps (collectors TBD)
|
|
------------------------------------------------------------
|
|
Planned **kdump** / **vmcore** / **pstore** / netconsole workflows will append :class:`KernelDumpEvent`
|
|
rows (host, phase, human summary, artifact paths, optional tool output, scalar metadata pairs). That
|
|
keeps crash artifacts in the same JSONL timeline as :class:`DmesgEvent` and :class:`PcieEvent`.
|
|
Future async helpers (e.g. ``acapture_kernel_dump`` / SSH pull of ``/var/crash``) should build
|
|
:class:`KernelDumpEvent` and :meth:`DiagLog.append` them — no change to :class:`DiagLog` storage.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import threading
|
|
import time
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Literal, Optional, Sequence, Tuple, Union
|
|
|
|
from fiwi.ssh_node import SshNode
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NoteEvent:
|
|
"""Operator or code annotation on the diagnostic timeline."""
|
|
|
|
kind: Literal["note"]
|
|
ts_wall: float
|
|
message: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DmesgEvent:
|
|
"""Kernel ring buffer snapshot (``dmesg``), for link resets, PCIe AER, USB, etc."""
|
|
|
|
kind: Literal["dmesg"]
|
|
ts_wall: float
|
|
host_label: str
|
|
exit_code: int
|
|
stdout: str
|
|
stderr: str
|
|
argv_used: Tuple[str, ...]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PcieEvent:
|
|
"""PCI device listing (``lspci``), for topology and driver binding issues."""
|
|
|
|
kind: Literal["pcie"]
|
|
ts_wall: float
|
|
host_label: str
|
|
exit_code: int
|
|
stdout: str
|
|
stderr: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class KernelDumpEvent:
|
|
"""
|
|
Kernel crash-dump lifecycle (kdump/vmcore, pstore, copy steps, ``crash`` session notes).
|
|
|
|
Populated by future collectors; use :func:`kernel_dump_event` to append manually today.
|
|
``phase`` examples: ``"kdump_enabled"``, ``"vmcore_ready"``, ``"artifact_copied"``, ``"pstore"``.
|
|
"""
|
|
|
|
kind: Literal["kernel_dump"]
|
|
ts_wall: float
|
|
host_label: str
|
|
phase: str
|
|
summary: str
|
|
artifact_paths: Tuple[str, ...] = ()
|
|
capture_log: str = ""
|
|
metadata: Tuple[Tuple[str, str], ...] = ()
|
|
|
|
|
|
DiagEvent = Union[NoteEvent, DmesgEvent, PcieEvent, KernelDumpEvent]
|
|
|
|
|
|
class DiagLog:
|
|
"""
|
|
Ordered event list (wall-clock ``ts_wall``). Safe to append from sync code or asyncio tasks.
|
|
|
|
New diagnostic categories (e.g. :class:`KernelDumpEvent`) are added as union members of
|
|
:obj:`DiagEvent`; :meth:`dump_jsonl` stays ``dataclasses.asdict`` per row.
|
|
"""
|
|
|
|
__slots__ = ("_events", "_lock")
|
|
|
|
def __init__(self) -> None:
|
|
self._events: List[DiagEvent] = []
|
|
self._lock = threading.Lock()
|
|
|
|
def append(self, event: DiagEvent) -> None:
|
|
with self._lock:
|
|
self._events.append(event)
|
|
|
|
def extend(self, events: Sequence[DiagEvent]) -> None:
|
|
with self._lock:
|
|
self._events.extend(events)
|
|
|
|
def clear(self) -> None:
|
|
with self._lock:
|
|
self._events.clear()
|
|
|
|
def snapshot(self) -> List[DiagEvent]:
|
|
with self._lock:
|
|
return list(self._events)
|
|
|
|
def dump_jsonl(self, path: Union[str, Path], *, append: bool = False) -> None:
|
|
"""Write one JSON object per line (UTF-8)."""
|
|
p = Path(path)
|
|
mode = "a" if append else "w"
|
|
lines = [json.dumps(asdict(e), ensure_ascii=False) + "\n" for e in self.snapshot()]
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
with p.open(mode, encoding="utf-8") as f:
|
|
f.writelines(lines)
|
|
|
|
|
|
_diag_singleton: Optional[DiagLog] = None
|
|
_diag_singleton_lock = threading.Lock()
|
|
|
|
|
|
def get_diag_log() -> DiagLog:
|
|
"""Shared :class:`DiagLog` for the process (lazy)."""
|
|
global _diag_singleton
|
|
with _diag_singleton_lock:
|
|
if _diag_singleton is None:
|
|
_diag_singleton = DiagLog()
|
|
return _diag_singleton
|
|
|
|
|
|
def note_event(message: str, *, ts_wall: Optional[float] = None) -> NoteEvent:
|
|
return NoteEvent(
|
|
kind="note",
|
|
ts_wall=time.time() if ts_wall is None else ts_wall,
|
|
message=message,
|
|
)
|
|
|
|
|
|
def kernel_dump_event(
|
|
*,
|
|
host_label: str,
|
|
phase: str,
|
|
summary: str,
|
|
artifact_paths: Sequence[str] = (),
|
|
capture_log: str = "",
|
|
metadata: Optional[Dict[str, str]] = None,
|
|
ts_wall: Optional[float] = None,
|
|
) -> KernelDumpEvent:
|
|
"""Build a :class:`KernelDumpEvent` (for manual logging until async collectors exist)."""
|
|
pairs: Tuple[Tuple[str, str], ...] = tuple(
|
|
sorted((metadata or {}).items(), key=lambda kv: kv[0])
|
|
)
|
|
return KernelDumpEvent(
|
|
kind="kernel_dump",
|
|
ts_wall=time.time() if ts_wall is None else ts_wall,
|
|
host_label=host_label,
|
|
phase=phase,
|
|
summary=summary,
|
|
artifact_paths=tuple(artifact_paths),
|
|
capture_log=capture_log,
|
|
metadata=pairs,
|
|
)
|
|
|
|
|
|
async def _async_run_capture(
|
|
argv: Sequence[str], *, timeout: float
|
|
) -> Tuple[int, str, str]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*argv,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
except OSError as e:
|
|
return 1, "", str(e)
|
|
try:
|
|
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutExpired:
|
|
proc.kill()
|
|
try:
|
|
await proc.wait()
|
|
except OSError:
|
|
pass
|
|
return 124, "", "timeout"
|
|
code = proc.returncode if proc.returncode is not None else 1
|
|
return (
|
|
code,
|
|
(out_b or b"").decode(errors="replace"),
|
|
(err_b or b"").decode(errors="replace"),
|
|
)
|
|
|
|
|
|
async def acapture_dmesg_local(
|
|
*, host_label: str = "local", timeout: float = 45.0
|
|
) -> DmesgEvent:
|
|
ts = time.time()
|
|
for args in (("dmesg", "-T"), ("dmesg",)):
|
|
code, out, err = await _async_run_capture(args, timeout=timeout)
|
|
if code == 0 or args == ("dmesg",):
|
|
return DmesgEvent(
|
|
kind="dmesg",
|
|
ts_wall=ts,
|
|
host_label=host_label,
|
|
exit_code=code,
|
|
stdout=out,
|
|
stderr=err,
|
|
argv_used=tuple(args),
|
|
)
|
|
raise RuntimeError("unreachable dmesg capture loop")
|
|
|
|
|
|
async def acapture_pcie_local(
|
|
*,
|
|
host_label: str = "local",
|
|
timeout: float = 60.0,
|
|
include_tree: bool = True,
|
|
) -> PcieEvent:
|
|
ts = time.time()
|
|
parts: List[str] = []
|
|
errs: List[str] = []
|
|
worst = 0
|
|
code, out, err = await _async_run_capture(("lspci", "-nn"), timeout=timeout)
|
|
worst = max(worst, code)
|
|
parts.append("=== lspci -nn ===\n" + out)
|
|
if err.strip():
|
|
errs.append(err)
|
|
if include_tree:
|
|
code2, out2, err2 = await _async_run_capture(("lspci", "-tv"), timeout=timeout)
|
|
worst = max(worst, code2)
|
|
parts.append("\n=== lspci -tv ===\n" + out2)
|
|
if err2.strip():
|
|
errs.append(err2)
|
|
return PcieEvent(
|
|
kind="pcie",
|
|
ts_wall=ts,
|
|
host_label=host_label,
|
|
exit_code=worst,
|
|
stdout="".join(parts),
|
|
stderr="\n".join(errs),
|
|
)
|
|
|
|
|
|
async def acapture_dmesg_ssh(
|
|
ssh: SshNode,
|
|
*,
|
|
timeout: float = 45.0,
|
|
) -> DmesgEvent:
|
|
ts = time.time()
|
|
label = ssh.target
|
|
for args in (["dmesg", "-T"], ["dmesg"]):
|
|
code, out, err = await ssh.araw_ssh(args, timeout=timeout, defer=False)
|
|
if code == 0 or args == ["dmesg"]:
|
|
return DmesgEvent(
|
|
kind="dmesg",
|
|
ts_wall=ts,
|
|
host_label=label,
|
|
exit_code=code,
|
|
stdout=out,
|
|
stderr=err,
|
|
argv_used=tuple(args),
|
|
)
|
|
raise RuntimeError("unreachable dmesg SSH capture loop")
|
|
|
|
|
|
async def acapture_pcie_ssh(
|
|
ssh: SshNode,
|
|
*,
|
|
timeout: float = 60.0,
|
|
include_tree: bool = True,
|
|
) -> PcieEvent:
|
|
ts = time.time()
|
|
label = ssh.target
|
|
parts: List[str] = []
|
|
errs: List[str] = []
|
|
worst = 0
|
|
code, out, err = await ssh.araw_ssh(["lspci", "-nn"], timeout=timeout, defer=False)
|
|
worst = max(worst, code)
|
|
parts.append("=== lspci -nn ===\n" + out)
|
|
if err.strip():
|
|
errs.append(err)
|
|
if include_tree:
|
|
code2, out2, err2 = await ssh.araw_ssh(["lspci", "-tv"], timeout=timeout, defer=False)
|
|
worst = max(worst, code2)
|
|
parts.append("\n=== lspci -tv ===\n" + out2)
|
|
if err2.strip():
|
|
errs.append(err2)
|
|
return PcieEvent(
|
|
kind="pcie",
|
|
ts_wall=ts,
|
|
host_label=label,
|
|
exit_code=worst,
|
|
stdout="".join(parts),
|
|
stderr="\n".join(errs),
|
|
)
|
|
|
|
|
|
async def acapture_dmesg(
|
|
*,
|
|
ssh: Optional[SshNode] = None,
|
|
host_label: Optional[str] = None,
|
|
timeout: float = 45.0,
|
|
) -> DmesgEvent:
|
|
"""Local ``dmesg`` when ``ssh`` is ``None``, else remote via :class:`~fiwi.ssh_node.SshNode`."""
|
|
if ssh is None:
|
|
return await acapture_dmesg_local(
|
|
host_label=host_label or "local", timeout=timeout
|
|
)
|
|
ev = await acapture_dmesg_ssh(ssh, timeout=timeout)
|
|
if host_label is not None:
|
|
return DmesgEvent(
|
|
kind="dmesg",
|
|
ts_wall=ev.ts_wall,
|
|
host_label=host_label,
|
|
exit_code=ev.exit_code,
|
|
stdout=ev.stdout,
|
|
stderr=ev.stderr,
|
|
argv_used=ev.argv_used,
|
|
)
|
|
return ev
|
|
|
|
|
|
async def acapture_pcie(
|
|
*,
|
|
ssh: Optional[SshNode] = None,
|
|
host_label: Optional[str] = None,
|
|
timeout: float = 60.0,
|
|
include_tree: bool = True,
|
|
) -> PcieEvent:
|
|
if ssh is None:
|
|
return await acapture_pcie_local(
|
|
host_label=host_label or "local",
|
|
timeout=timeout,
|
|
include_tree=include_tree,
|
|
)
|
|
ev = await acapture_pcie_ssh(ssh, timeout=timeout, include_tree=include_tree)
|
|
if host_label is not None:
|
|
return PcieEvent(
|
|
kind="pcie",
|
|
ts_wall=ev.ts_wall,
|
|
host_label=host_label,
|
|
exit_code=ev.exit_code,
|
|
stdout=ev.stdout,
|
|
stderr=ev.stderr,
|
|
)
|
|
return ev
|
|
|
|
|
|
async def alog_note(log: DiagLog, message: str) -> NoteEvent:
|
|
ev = note_event(message)
|
|
log.append(ev)
|
|
return ev
|
|
|
|
|
|
async def alog_dmesg(
|
|
log: DiagLog,
|
|
*,
|
|
ssh: Optional[SshNode] = None,
|
|
host_label: Optional[str] = None,
|
|
timeout: float = 45.0,
|
|
) -> DmesgEvent:
|
|
ev = await acapture_dmesg(ssh=ssh, host_label=host_label, timeout=timeout)
|
|
log.append(ev)
|
|
return ev
|
|
|
|
|
|
async def alog_pcie(
|
|
log: DiagLog,
|
|
*,
|
|
ssh: Optional[SshNode] = None,
|
|
host_label: Optional[str] = None,
|
|
timeout: float = 60.0,
|
|
include_tree: bool = True,
|
|
) -> PcieEvent:
|
|
ev = await acapture_pcie(
|
|
ssh=ssh, host_label=host_label, timeout=timeout, include_tree=include_tree
|
|
)
|
|
log.append(ev)
|
|
return ev
|
|
|
|
|
|
async def alog_kernel_dump(
|
|
log: DiagLog,
|
|
*,
|
|
host_label: str,
|
|
phase: str,
|
|
summary: str,
|
|
artifact_paths: Sequence[str] = (),
|
|
capture_log: str = "",
|
|
metadata: Optional[Dict[str, str]] = None,
|
|
) -> KernelDumpEvent:
|
|
"""Append a :class:`KernelDumpEvent` (for async collectors once kdump/vmcore pull is implemented)."""
|
|
ev = kernel_dump_event(
|
|
host_label=host_label,
|
|
phase=phase,
|
|
summary=summary,
|
|
artifact_paths=artifact_paths,
|
|
capture_log=capture_log,
|
|
metadata=metadata,
|
|
)
|
|
log.append(ev)
|
|
return ev
|
|
|
|
|
|
async def alog_hardware_snapshot(
|
|
log: DiagLog,
|
|
*,
|
|
ssh: Optional[SshNode] = None,
|
|
caption: str = "dmesg + lspci",
|
|
timeout_dmesg: float = 45.0,
|
|
timeout_pcie: float = 60.0,
|
|
include_tree: bool = True,
|
|
) -> Tuple[NoteEvent, DmesgEvent, PcieEvent]:
|
|
"""
|
|
One note plus concurrent dmesg and lspci captures (same host). Handy after a failure or
|
|
between calibrate steps. Events are appended in a stable order: note, then dmesg, then PCIe.
|
|
"""
|
|
ev_note = note_event(caption)
|
|
log.append(ev_note)
|
|
dmesg_ev, pcie_ev = await asyncio.gather(
|
|
acapture_dmesg(ssh=ssh, timeout=timeout_dmesg),
|
|
acapture_pcie(
|
|
ssh=ssh,
|
|
timeout=timeout_pcie,
|
|
include_tree=include_tree,
|
|
),
|
|
)
|
|
log.append(dmesg_ev)
|
|
log.append(pcie_ev)
|
|
return ev_note, dmesg_ev, pcie_ev
|