#!/usr/bin/env python3 """ parse_csi.py Standalone parser and plotter for ESP32 CSI dumps of the form: CSI_DUMP_BEGIN R R ... CSI_DUMP_END Usage examples: # Plot magnitude vs subcarrier for the first record python3 parse_csi.py -i csi.log # Plot magnitude and phase for record index 5 python3 parse_csi.py -i csi.log -n 5 --phase # Plot a heatmap of magnitude vs subcarrier vs record index python3 parse_csi.py -i csi.log --heatmap """ import argparse import math import re import sys import matplotlib.pyplot as plt CSI_LINE_RE = re.compile( r'^R\s+(\d+)\s+(\d+)\s+(-?\d+)\s+(\d+)\s+([0-9A-Fa-f]+)' ) def parse_csi_line(line): """ Parse one CSI line of the form: R Return dict or None if not a match. """ m = CSI_LINE_RE.match(line.strip()) if not m: return None idx = int(m.group(1)) timestamp = int(m.group(2)) rssi = int(m.group(3)) length_bytes = int(m.group(4)) hex_data = m.group(5) # Convert hex to signed 16-bit ints (big-endian words in hex string) # hex_data is a sequence of 4-hex-digit words: I0, Q0, I1, Q1, ... if len(hex_data) % 4 != 0: # Truncate any odd tail, just in case hex_data = hex_data[: len(hex_data) // 4 * 4] vals = [] for i in range(0, len(hex_data), 4): w = int(hex_data[i:i + 4], 16) if w >= 0x8000: w -= 0x10000 vals.append(w) # Interleave into complex CSI values csi = [] for i in range(0, len(vals), 2): if i + 1 >= len(vals): break I = vals[i] Q = vals[i + 1] csi.append(complex(I, Q)) rec = { "idx": idx, "timestamp": timestamp, "rssi": rssi, "length_bytes": length_bytes, "hex_len": len(hex_data) // 2, "csi": csi, } return rec def parse_csi_stream(file_obj): """ Parse all CSI records between CSI_DUMP_BEGIN / CSI_DUMP_END markers. Returns a list of record dicts. """ in_block = False records = [] for line in file_obj: line = line.strip() if not line: continue if line.startswith("CSI_DUMP_BEGIN"): in_block = True continue if line.startswith("CSI_DUMP_END"): in_block = False continue if not in_block: continue if line.startswith("R "): rec = parse_csi_line(line) if rec is not None: records.append(rec) return records def plot_single_record(rec, show_phase=False): """ Plot magnitude (and optionally phase) vs subcarrier index for one record. """ csi = rec["csi"] mags = [abs(v) for v in csi] fig, ax = plt.subplots() ax.plot(mags) ax.set_xlabel("Subcarrier index") ax.set_ylabel("|H(k)| (magnitude)") ax.set_title( f"CSI magnitude, record idx={rec['idx']}, RSSI={rec['rssi']} dBm" ) ax.grid(True) if show_phase: phases = [math.atan2(v.imag, v.real) for v in csi] fig2, ax2 = plt.subplots() ax2.plot(phases) ax2.set_xlabel("Subcarrier index") ax2.set_ylabel("Phase (radians)") ax2.set_title( f"CSI phase, record idx={rec['idx']}, RSSI={rec['rssi']} dBm" ) ax2.grid(True) plt.show() def plot_heatmap(records): """ Plot a heatmap of |H(k)| over: x-axis: subcarrier index y-axis: record index (time-like) """ if not records: print("No records to plot heatmap.") return # Use the minimum number of subcarriers across all records min_len = min(len(r["csi"]) for r in records) if min_len == 0: print("Records have zero-length CSI arrays, cannot plot heatmap.") return # Build 2D list [num_records x min_len] import numpy as np # only needed for the heatmap data = np.zeros((len(records), min_len), dtype=float) for i, r in enumerate(records): mags = [abs(v) for v in r["csi"][:min_len]] data[i, :] = mags fig, ax = plt.subplots() im = ax.imshow( data, aspect="auto", origin="lower", interpolation="nearest", ) ax.set_xlabel("Subcarrier index") ax.set_ylabel("Record index") ax.set_title("|H(k)| magnitude heatmap") fig.colorbar(im, ax=ax, label="Magnitude") plt.show() def main(): parser = argparse.ArgumentParser( description="Parse and plot ESP32 CSI dumps." ) parser.add_argument( "-i", "--input", type=str, default="-", help="Input log file (default: stdin)", ) parser.add_argument( "-n", "--record-index", type=int, default=0, help="Record index to plot for single-record plots (default: 0)", ) parser.add_argument( "--phase", action="store_true", help="Also plot phase vs subcarrier for the chosen record.", ) parser.add_argument( "--heatmap", action="store_true", help="Plot a heatmap of |H| over all records instead of single record.", ) args = parser.parse_args() if args.input == "-" or args.input is None: records = parse_csi_stream(sys.stdin) else: with open(args.input, "r") as f: records = parse_csi_stream(f) if not records: print("No CSI records found in input.", file=sys.stderr) sys.exit(1) print(f"Parsed {len(records)} CSI records.") if args.heatmap: plot_heatmap(records) return idx = args.record_index if idx < 0 or idx >= len(records): print( f"Record index {idx} out of range (0..{len(records)-1}). " "Using 0 instead." ) idx = 0 rec = records[idx] print( f"Plotting record idx={rec['idx']} (array index {idx}), " f"RSSI={rec['rssi']} dBm, subcarriers={len(rec['csi'])}" ) plot_single_record(rec, show_phase=args.phase) if __name__ == "__main__": main()