#!/usr/bin/env python3 """ Parse tshark pcap file and extract 802.11 frame information. This script handles missing fields gracefully and provides detailed statistics. """ import sys import subprocess import collections from typing import List, Dict, Tuple, Optional # Field indices (0-based) FRAME_NUMBER = 0 FRAME_TIME = 1 WLAN_RA = 2 WLAN_TA = 3 WLAN_FC_TYPE = 4 WLAN_FC_SUBTYPE = 5 WLAN_FC_TYPE_SUBTYPE = 6 WLAN_FC_PROTECTED = 7 WLAN_FC_RETRY = 8 WLAN_DURATION = 9 RADIOTAP_PRESENT = 10 RADIOTAP_DATARATE = 11 RADIOTAP_MCS_INDEX = 12 WLAN_RADIO_DATA_RATE = 13 WLAN_RADIO_MCS_INDEX = 14 def safe_get_field(fields: List[str], index: int, default: str = "N/A") -> str: """Safely get a field value, handling missing or empty fields.""" if index < len(fields): value = fields[index].strip() if value and value != "-" and value != "": return value return default def parse_tshark_output(pcap_file: str) -> List[List[str]]: """Extract fields from pcap file using tshark.""" fields = [ "frame.number", "frame.time", "wlan.ra", "wlan.ta", "wlan.fc.type", "wlan.fc.subtype", "wlan.fc.type_subtype", "wlan.fc.protected", "wlan.fc.retry", "wlan.duration", "radiotap.present", "radiotap.datarate", "radiotap.mcs.index", "wlan_radio.data_rate", "wlan_radio.mcs.index", ] cmd = [ "tshark", "-q", "-r", pcap_file, "-n", "-T", "fields" ] for field in fields: cmd.extend(["-e", field]) try: result = subprocess.run( cmd, capture_output=True, text=True, check=False # Don't fail on errors ) # Filter out error messages and status lines lines = [] for line in result.stdout.splitlines(): line = line.strip() # Skip empty lines and tshark status messages if not line: continue if line.startswith("Running as") or line.startswith("Capturing"): continue if "tshark:" in line.lower() or "packets captured" in line.lower(): continue # Only process lines that start with a number (frame number) if line and line[0].isdigit(): fields = line.split("\t") if len(fields) > 0: lines.append(fields) return lines except Exception as e: print(f"Error running tshark: {e}", file=sys.stderr) return [] def count_packets(packets: List[List[str]]) -> int: """Count total valid packets.""" return len(packets) def count_plcp_headers(packets: List[List[str]]) -> int: """Count packets with PLCP headers (radiotap present).""" count = 0 for packet in packets: radiotap = safe_get_field(packet, RADIOTAP_PRESENT, "") if radiotap and radiotap != "0" and radiotap != "N/A" and radiotap != "-": count += 1 return count def get_unique_ra_ta_pairs(packets: List[List[str]]) -> Dict[str, int]: """Count unique RA/TA pairs.""" pairs = collections.Counter() for packet in packets: ra = safe_get_field(packet, WLAN_RA, "N/A") ta = safe_get_field(packet, WLAN_TA, "N/A") if ra != "N/A" or ta != "N/A": pair = f"{ra} -> {ta}" pairs[pair] += 1 return dict(pairs) def get_phy_rate(packet: List[str]) -> Optional[str]: """Extract PHY rate from packet (try radiotap first, then wlan_radio).""" rate = safe_get_field(packet, RADIOTAP_DATARATE, "") if rate and rate != "0" and rate != "N/A" and rate != "-": return rate rate = safe_get_field(packet, WLAN_RADIO_DATA_RATE, "") if rate and rate != "0" and rate != "N/A" and rate != "-": return rate return None def get_mcs_index(packet: List[str]) -> Optional[str]: """Extract MCS index from packet (try radiotap first, then wlan_radio).""" mcs = safe_get_field(packet, RADIOTAP_MCS_INDEX, "") if mcs and mcs != "0" and mcs != "255" and mcs != "N/A" and mcs != "-": return mcs mcs = safe_get_field(packet, WLAN_RADIO_MCS_INDEX, "") if mcs and mcs != "0" and mcs != "255" and mcs != "N/A" and mcs != "-": return mcs return None def get_histograms_per_pair(packets: List[List[str]]) -> Dict[str, Dict[str, Dict[str, int]]]: """Generate PHY rate and MCS histograms per RA/TA pair.""" histograms = {} # {pair: {"rate": {rate: count}, "mcs": {mcs: count}}} for packet in packets: ra = safe_get_field(packet, WLAN_RA, "N/A") ta = safe_get_field(packet, WLAN_TA, "N/A") if ra == "N/A" and ta == "N/A": continue pair = f"{ra} -> {ta}" frame_type = safe_get_field(packet, WLAN_FC_TYPE, "") # Only process data frames (type 2) for histograms if frame_type != "2": continue if pair not in histograms: histograms[pair] = {"rate": collections.Counter(), "mcs": collections.Counter()} # Get PHY rate rate = get_phy_rate(packet) if rate: histograms[pair]["rate"][rate] += 1 # Get MCS index mcs = get_mcs_index(packet) if mcs: histograms[pair]["mcs"][mcs] += 1 return histograms def get_frame_type_breakdown(packets: List[List[str]]) -> Dict[str, int]: """Count frames by type.""" breakdown = collections.Counter() for packet in packets: frame_type = safe_get_field(packet, WLAN_FC_TYPE, "unknown") type_name = "Unknown" if frame_type == "0": type_name = "Management" elif frame_type == "1": type_name = "Control" elif frame_type == "2": type_name = "Data" breakdown[type_name] += 1 return dict(breakdown) def get_data_frame_analysis(packets: List[List[str]]) -> Tuple[int, int, int]: """Analyze data frames (QoS Data frames, subtype 8).""" data_frames = [] for packet in packets: frame_type = safe_get_field(packet, WLAN_FC_TYPE, "") frame_subtype = safe_get_field(packet, WLAN_FC_SUBTYPE, "") if frame_type == "2" and frame_subtype == "8": # QoS Data frames data_frames.append(packet) encrypted = 0 unencrypted = 0 for packet in data_frames: protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") if protected == "1" or protected == "1.0": encrypted += 1 elif protected and protected != "-" and protected != "N/A": unencrypted += 1 return len(data_frames), encrypted, unencrypted def format_sample_packet(packet: List[str], index: int) -> str: """Format a packet for display.""" frame_num = safe_get_field(packet, FRAME_NUMBER, str(index + 1)) ra = safe_get_field(packet, WLAN_RA, "N/A") ta = safe_get_field(packet, WLAN_TA, "N/A") frame_type = safe_get_field(packet, WLAN_FC_TYPE, "N/A") frame_subtype = safe_get_field(packet, WLAN_FC_SUBTYPE, "N/A") protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") retry = safe_get_field(packet, WLAN_FC_RETRY, "") duration = safe_get_field(packet, WLAN_DURATION, "N/A") radiotap = safe_get_field(packet, RADIOTAP_PRESENT, "") protected_str = "encrypted" if (protected == "1" or protected == "1.0") else "unencrypted" retry_str = " [retry]" if (retry == "1" or retry == "1.0") else "" plcp_str = "yes" if (radiotap == "1" or radiotap == "1.0") else ("no" if radiotap != "N/A" and radiotap != "-" else "N/A") return f" Frame {frame_num}: RA={ra}, TA={ta}, type={frame_type}/{frame_subtype}, {protected_str}, dur={duration}, PLCP={plcp_str}{retry_str}" def main(): if len(sys.argv) < 2: print("Usage: parse_tshark_pcap.py [duration_seconds] [raw_packet_count]", file=sys.stderr) sys.exit(1) pcap_file = sys.argv[1] duration = float(sys.argv[2]) if len(sys.argv) > 2 else 10.0 raw_packet_count = int(sys.argv[3]) if len(sys.argv) > 3 else 0 print("=== Capture Statistics ===") # Parse packets packets = parse_tshark_output(pcap_file) final_count = count_packets(packets) plcp_count = count_plcp_headers(packets) # Check for parsing issues if final_count < raw_packet_count and raw_packet_count > 10: print(f"Warning: Parsed {final_count} packets but pcap file contains {raw_packet_count} packets") print(" This may indicate field extraction issues.") print() # Display basic stats print(f"Total packets captured: {final_count}") print(f"PLCP headers: {plcp_count}") if final_count > 0: rate = final_count / duration print(f"Packet rate: {rate:.1f} packets/second") print() if final_count == 0: print("(No packets captured)") print() print("=== Summary ===") print("✗ No packets captured. Check:") print(f" 1. Is there WiFi traffic on the channel?") print(f" 2. Is the interface actually in monitor mode?") print(f" 3. Try a different channel or longer duration") return # Display sample packets print("Sample packets (first 10):") for i, packet in enumerate(packets[:10]): print(format_sample_packet(packet, i)) print() # Unique RA/TA pairs print("Unique RA/TA pairs (with counts):") pairs = get_unique_ra_ta_pairs(packets) if pairs: for pair, count in sorted(pairs.items(), key=lambda x: x[1], reverse=True): print(f" {pair}: {count} frame(s)") else: print(" (no valid RA/TA pairs found)") print() # PHY rate and MCS histograms per RA/TA pair print("PHY Rate and MCS Histograms per RA/TA pair:") histograms = get_histograms_per_pair(packets) for pair in sorted(histograms.keys()): print(f"\n {pair}:") # PHY Rate histogram print(" PHY Rate (Mbps):") rate_hist = histograms[pair]["rate"] if rate_hist: for rate in sorted(rate_hist.keys(), key=lambda x: float(x) if x.replace(".", "").isdigit() else 0): print(f" {rate} Mbps: {rate_hist[rate]} frame(s)") else: print(" (no PHY rate data)") # MCS histogram print(" MCS Index:") mcs_hist = histograms[pair]["mcs"] if mcs_hist: for mcs in sorted(mcs_hist.keys(), key=lambda x: int(x) if x.isdigit() else 0): print(f" MCS {mcs}: {mcs_hist[mcs]} frame(s)") else: print(" (no MCS data)") print() # Frame type breakdown print("Frame type breakdown:") breakdown = get_frame_type_breakdown(packets) for frame_type, count in sorted(breakdown.items(), key=lambda x: x[1], reverse=True): print(f" {frame_type}: {count} frame(s)") print() # Data frame analysis print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") data_count, encrypted, unencrypted = get_data_frame_analysis(packets) print(f" QoS Data frames (type 2, subtype 8): {data_count}") print(f" Encrypted: {encrypted}") print(f" Unencrypted: {unencrypted}") if data_count > 0: print(" Sample QoS Data frames (likely iperf traffic):") data_frames = [p for p in packets if safe_get_field(p, WLAN_FC_TYPE, "") == "2" and safe_get_field(p, WLAN_FC_SUBTYPE, "") == "8"] for i, packet in enumerate(data_frames[:5]): frame_num = safe_get_field(packet, FRAME_NUMBER, str(i + 1)) ra = safe_get_field(packet, WLAN_RA, "N/A") ta = safe_get_field(packet, WLAN_TA, "N/A") protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") retry = safe_get_field(packet, WLAN_FC_RETRY, "") duration = safe_get_field(packet, WLAN_DURATION, "N/A") protected_str = "encrypted" if (protected == "1" or protected == "1.0") else "unencrypted" retry_str = " [retry]" if (retry == "1" or retry == "1.0") else "" print(f" Frame {frame_num}: RA={ra}, TA={ta}, {protected_str}, dur={duration}{retry_str}") print() # Frames involving server MAC server_mac = "80:84:89:93:c4:b6" print(f"Frames involving server MAC ({server_mac}):") server_frames = [] for packet in packets: ra = safe_get_field(packet, WLAN_RA, "") ta = safe_get_field(packet, WLAN_TA, "") if ra == server_mac or ta == server_mac: server_frames.append(packet) server_count = len(server_frames) print(f" Total frames with server MAC: {server_count}") if server_count > 0: server_breakdown = get_frame_type_breakdown(server_frames) print(" Frame type breakdown:") for frame_type, count in sorted(server_breakdown.items(), key=lambda x: x[1], reverse=True): print(f" {frame_type}: {count} frame(s)") print(" Sample frames:") for i, packet in enumerate(server_frames[:5]): print(format_sample_packet(packet, i)) print() # Summary print("=== Summary ===") if final_count > 0: print(f"✓ Monitor mode is working! Captured {final_count} packet(s)") if plcp_count > 0: print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") else: print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") if __name__ == "__main__": main()