From 2fd1b58e6692d8d8eceeef32bec91f6f1eddb8ba Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 21 Nov 2025 13:34:56 -0800 Subject: [PATCH] add csi parser --- parse_csi.py | 247 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100755 parse_csi.py diff --git a/parse_csi.py b/parse_csi.py new file mode 100755 index 0000000..9a34f20 --- /dev/null +++ b/parse_csi.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +""" +parse_csi.py + +Standalone parser and plotter for ESP32 CSI dumps of the form: + + CSI_DUMP_BEGIN + R + R ... + CSI_DUMP_END + +Usage examples: + + # Plot magnitude vs subcarrier for the first record + python3 parse_csi.py -i csi.log + + # Plot magnitude and phase for record index 5 + python3 parse_csi.py -i csi.log -n 5 --phase + + # Plot a heatmap of magnitude vs subcarrier vs record index + python3 parse_csi.py -i csi.log --heatmap + +""" + +import argparse +import math +import re +import sys + +import matplotlib.pyplot as plt + + +CSI_LINE_RE = re.compile( + r'^R\s+(\d+)\s+(\d+)\s+(-?\d+)\s+(\d+)\s+([0-9A-Fa-f]+)' +) + + +def parse_csi_line(line): + """ + Parse one CSI line of the form: + R + Return dict or None if not a match. + """ + m = CSI_LINE_RE.match(line.strip()) + if not m: + return None + + idx = int(m.group(1)) + timestamp = int(m.group(2)) + rssi = int(m.group(3)) + length_bytes = int(m.group(4)) + hex_data = m.group(5) + + # Convert hex to signed 16-bit ints (big-endian words in hex string) + # hex_data is a sequence of 4-hex-digit words: I0, Q0, I1, Q1, ... + if len(hex_data) % 4 != 0: + # Truncate any odd tail, just in case + hex_data = hex_data[: len(hex_data) // 4 * 4] + + vals = [] + for i in range(0, len(hex_data), 4): + w = int(hex_data[i:i + 4], 16) + if w >= 0x8000: + w -= 0x10000 + vals.append(w) + + # Interleave into complex CSI values + csi = [] + for i in range(0, len(vals), 2): + if i + 1 >= len(vals): + break + I = vals[i] + Q = vals[i + 1] + csi.append(complex(I, Q)) + + rec = { + "idx": idx, + "timestamp": timestamp, + "rssi": rssi, + "length_bytes": length_bytes, + "hex_len": len(hex_data) // 2, + "csi": csi, + } + return rec + + +def parse_csi_stream(file_obj): + """ + Parse all CSI records between CSI_DUMP_BEGIN / CSI_DUMP_END markers. + Returns a list of record dicts. + """ + in_block = False + records = [] + + for line in file_obj: + line = line.strip() + if not line: + continue + if line.startswith("CSI_DUMP_BEGIN"): + in_block = True + continue + if line.startswith("CSI_DUMP_END"): + in_block = False + continue + if not in_block: + continue + if line.startswith("R "): + rec = parse_csi_line(line) + if rec is not None: + records.append(rec) + + return records + + +def plot_single_record(rec, show_phase=False): + """ + Plot magnitude (and optionally phase) vs subcarrier index for one record. + """ + csi = rec["csi"] + mags = [abs(v) for v in csi] + + fig, ax = plt.subplots() + ax.plot(mags) + ax.set_xlabel("Subcarrier index") + ax.set_ylabel("|H(k)| (magnitude)") + ax.set_title( + f"CSI magnitude, record idx={rec['idx']}, RSSI={rec['rssi']} dBm" + ) + ax.grid(True) + + if show_phase: + phases = [math.atan2(v.imag, v.real) for v in csi] + fig2, ax2 = plt.subplots() + ax2.plot(phases) + ax2.set_xlabel("Subcarrier index") + ax2.set_ylabel("Phase (radians)") + ax2.set_title( + f"CSI phase, record idx={rec['idx']}, RSSI={rec['rssi']} dBm" + ) + ax2.grid(True) + + plt.show() + + +def plot_heatmap(records): + """ + Plot a heatmap of |H(k)| over: + x-axis: subcarrier index + y-axis: record index (time-like) + """ + if not records: + print("No records to plot heatmap.") + return + + # Use the minimum number of subcarriers across all records + min_len = min(len(r["csi"]) for r in records) + if min_len == 0: + print("Records have zero-length CSI arrays, cannot plot heatmap.") + return + + # Build 2D list [num_records x min_len] + import numpy as np # only needed for the heatmap + + data = np.zeros((len(records), min_len), dtype=float) + for i, r in enumerate(records): + mags = [abs(v) for v in r["csi"][:min_len]] + data[i, :] = mags + + fig, ax = plt.subplots() + im = ax.imshow( + data, + aspect="auto", + origin="lower", + interpolation="nearest", + ) + ax.set_xlabel("Subcarrier index") + ax.set_ylabel("Record index") + ax.set_title("|H(k)| magnitude heatmap") + fig.colorbar(im, ax=ax, label="Magnitude") + plt.show() + + +def main(): + parser = argparse.ArgumentParser( + description="Parse and plot ESP32 CSI dumps." + ) + parser.add_argument( + "-i", + "--input", + type=str, + default="-", + help="Input log file (default: stdin)", + ) + parser.add_argument( + "-n", + "--record-index", + type=int, + default=0, + help="Record index to plot for single-record plots (default: 0)", + ) + parser.add_argument( + "--phase", + action="store_true", + help="Also plot phase vs subcarrier for the chosen record.", + ) + parser.add_argument( + "--heatmap", + action="store_true", + help="Plot a heatmap of |H| over all records instead of single record.", + ) + + args = parser.parse_args() + + if args.input == "-" or args.input is None: + records = parse_csi_stream(sys.stdin) + else: + with open(args.input, "r") as f: + records = parse_csi_stream(f) + + if not records: + print("No CSI records found in input.", file=sys.stderr) + sys.exit(1) + + print(f"Parsed {len(records)} CSI records.") + + if args.heatmap: + plot_heatmap(records) + return + + idx = args.record_index + if idx < 0 or idx >= len(records): + print( + f"Record index {idx} out of range (0..{len(records)-1}). " + "Using 0 instead." + ) + idx = 0 + + rec = records[idx] + print( + f"Plotting record idx={rec['idx']} (array index {idx}), " + f"RSSI={rec['rssi']} dBm, subcarriers={len(rec['csi'])}" + ) + plot_single_record(rec, show_phase=args.phase) + + +if __name__ == "__main__": + main()