ESP32/parse_csi.py

248 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
parse_csi.py
Standalone parser and plotter for ESP32 CSI dumps of the form:
CSI_DUMP_BEGIN <something>
R <idx> <timestamp> <rssi> <len> <hexdata>
R ...
CSI_DUMP_END
Usage examples:
# Plot magnitude vs subcarrier for the first record
python3 parse_csi.py -i csi.log
# Plot magnitude and phase for record index 5
python3 parse_csi.py -i csi.log -n 5 --phase
# Plot a heatmap of magnitude vs subcarrier vs record index
python3 parse_csi.py -i csi.log --heatmap
"""
import argparse
import math
import re
import sys
import matplotlib.pyplot as plt
CSI_LINE_RE = re.compile(
r'^R\s+(\d+)\s+(\d+)\s+(-?\d+)\s+(\d+)\s+([0-9A-Fa-f]+)'
)
def parse_csi_line(line):
"""
Parse one CSI line of the form:
R <idx> <timestamp> <rssi> <length> <hexdata>
Return dict or None if not a match.
"""
m = CSI_LINE_RE.match(line.strip())
if not m:
return None
idx = int(m.group(1))
timestamp = int(m.group(2))
rssi = int(m.group(3))
length_bytes = int(m.group(4))
hex_data = m.group(5)
# Convert hex to signed 16-bit ints (big-endian words in hex string)
# hex_data is a sequence of 4-hex-digit words: I0, Q0, I1, Q1, ...
if len(hex_data) % 4 != 0:
# Truncate any odd tail, just in case
hex_data = hex_data[: len(hex_data) // 4 * 4]
vals = []
for i in range(0, len(hex_data), 4):
w = int(hex_data[i:i + 4], 16)
if w >= 0x8000:
w -= 0x10000
vals.append(w)
# Interleave into complex CSI values
csi = []
for i in range(0, len(vals), 2):
if i + 1 >= len(vals):
break
I = vals[i]
Q = vals[i + 1]
csi.append(complex(I, Q))
rec = {
"idx": idx,
"timestamp": timestamp,
"rssi": rssi,
"length_bytes": length_bytes,
"hex_len": len(hex_data) // 2,
"csi": csi,
}
return rec
def parse_csi_stream(file_obj):
"""
Parse all CSI records between CSI_DUMP_BEGIN / CSI_DUMP_END markers.
Returns a list of record dicts.
"""
in_block = False
records = []
for line in file_obj:
line = line.strip()
if not line:
continue
if line.startswith("CSI_DUMP_BEGIN"):
in_block = True
continue
if line.startswith("CSI_DUMP_END"):
in_block = False
continue
if not in_block:
continue
if line.startswith("R "):
rec = parse_csi_line(line)
if rec is not None:
records.append(rec)
return records
def plot_single_record(rec, show_phase=False):
"""
Plot magnitude (and optionally phase) vs subcarrier index for one record.
"""
csi = rec["csi"]
mags = [abs(v) for v in csi]
fig, ax = plt.subplots()
ax.plot(mags)
ax.set_xlabel("Subcarrier index")
ax.set_ylabel("|H(k)| (magnitude)")
ax.set_title(
f"CSI magnitude, record idx={rec['idx']}, RSSI={rec['rssi']} dBm"
)
ax.grid(True)
if show_phase:
phases = [math.atan2(v.imag, v.real) for v in csi]
fig2, ax2 = plt.subplots()
ax2.plot(phases)
ax2.set_xlabel("Subcarrier index")
ax2.set_ylabel("Phase (radians)")
ax2.set_title(
f"CSI phase, record idx={rec['idx']}, RSSI={rec['rssi']} dBm"
)
ax2.grid(True)
plt.show()
def plot_heatmap(records):
"""
Plot a heatmap of |H(k)| over:
x-axis: subcarrier index
y-axis: record index (time-like)
"""
if not records:
print("No records to plot heatmap.")
return
# Use the minimum number of subcarriers across all records
min_len = min(len(r["csi"]) for r in records)
if min_len == 0:
print("Records have zero-length CSI arrays, cannot plot heatmap.")
return
# Build 2D list [num_records x min_len]
import numpy as np # only needed for the heatmap
data = np.zeros((len(records), min_len), dtype=float)
for i, r in enumerate(records):
mags = [abs(v) for v in r["csi"][:min_len]]
data[i, :] = mags
fig, ax = plt.subplots()
im = ax.imshow(
data,
aspect="auto",
origin="lower",
interpolation="nearest",
)
ax.set_xlabel("Subcarrier index")
ax.set_ylabel("Record index")
ax.set_title("|H(k)| magnitude heatmap")
fig.colorbar(im, ax=ax, label="Magnitude")
plt.show()
def main():
parser = argparse.ArgumentParser(
description="Parse and plot ESP32 CSI dumps."
)
parser.add_argument(
"-i",
"--input",
type=str,
default="-",
help="Input log file (default: stdin)",
)
parser.add_argument(
"-n",
"--record-index",
type=int,
default=0,
help="Record index to plot for single-record plots (default: 0)",
)
parser.add_argument(
"--phase",
action="store_true",
help="Also plot phase vs subcarrier for the chosen record.",
)
parser.add_argument(
"--heatmap",
action="store_true",
help="Plot a heatmap of |H| over all records instead of single record.",
)
args = parser.parse_args()
if args.input == "-" or args.input is None:
records = parse_csi_stream(sys.stdin)
else:
with open(args.input, "r") as f:
records = parse_csi_stream(f)
if not records:
print("No CSI records found in input.", file=sys.stderr)
sys.exit(1)
print(f"Parsed {len(records)} CSI records.")
if args.heatmap:
plot_heatmap(records)
return
idx = args.record_index
if idx < 0 or idx >= len(records):
print(
f"Record index {idx} out of range (0..{len(records)-1}). "
"Using 0 instead."
)
idx = 0
rec = records[idx]
print(
f"Plotting record idx={rec['idx']} (array index {idx}), "
f"RSSI={rec['rssi']} dBm, subcarriers={len(rec['csi'])}"
)
plot_single_record(rec, show_phase=args.phase)
if __name__ == "__main__":
main()