From d3a4937d0a43c68618423ab34084d5a3675ce9ac Mon Sep 17 00:00:00 2001 From: Robert McMahon Date: Fri, 13 Feb 2026 15:09:23 -0800 Subject: [PATCH] Add PCAP file reading support and packet size limiting option - Support reading from PCAP files: python3 test_monitor.py file.pcap - Add --full-packet flag to capture entire packets (default: header-only, 256 bytes) - Default snaplen=256 bytes captures 802.11 header + some payload - Mutually exclusive modes: PCAP file reading vs live interface capture - Improves memory efficiency by default (header-only capture) Co-authored-by: Cursor --- test_monitor.py | 222 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 148 insertions(+), 74 deletions(-) diff --git a/test_monitor.py b/test_monitor.py index 07f0bda..47a0295 100755 --- a/test_monitor.py +++ b/test_monitor.py @@ -4,9 +4,15 @@ Monitor mode WiFi packet capture and analysis using scapy. Replaces the bash script with a pure Python solution. Usage: - sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] + # Live capture from interface: + sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] sudo python3 test_monitor.py wlan0 36 10 sudo python3 test_monitor.py wlan0 36 10 --keep-pcap + sudo python3 test_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) + + # Read from pcap file: + python3 test_monitor.py + python3 test_monitor.py /tmp/capture.pcap """ import sys @@ -24,19 +30,51 @@ from scapy.layers.dot11 import Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Deau # Configuration WIFI_INTERFACE = os.environ.get("WIFI_INTERFACE", "wlan0") KEEP_PCAP = False +FULL_PACKET = False +# Default snaplen: 256 bytes (enough for 802.11 header ~30-40 bytes + some payload) +# 802.11 header can be up to ~40 bytes, plus radiotap header ~20-30 bytes +# 256 bytes gives us header + ~200 bytes of payload, which is usually enough for analysis +DEFAULT_SNAPLEN = 256 def parse_args(): - """Parse command line arguments.""" - global WIFI_INTERFACE, KEEP_PCAP + """Parse command line arguments. + + Returns: + tuple: (mode, interface_or_pcap, channel, duration) + mode: 'live' or 'pcap' + interface_or_pcap: interface name (for live) or pcap file path (for pcap) + channel: channel number (for live mode only) + duration: capture duration in seconds (for live mode only) + """ + global WIFI_INTERFACE, KEEP_PCAP, FULL_PACKET args = sys.argv[1:] + if len(args) == 0: + print("Usage:") + print(" Live capture: sudo python3 test_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") + print(" Read pcap: python3 test_monitor.py ") + sys.exit(1) + # Check for --keep-pcap flag if "--keep-pcap" in args or "-k" in args: KEEP_PCAP = True args = [a for a in args if a not in ["--keep-pcap", "-k"]] - # Parse interface, channel, duration + # Check for --full-packet flag + if "--full-packet" in args or "-f" in args: + FULL_PACKET = True + args = [a for a in args if a not in ["--full-packet", "-f"]] + + # Check if first argument is a pcap file (ends with .pcap or .cap) + if len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')): + pcap_file = args[0] + if not os.path.isfile(pcap_file): + print(f"Error: PCAP file not found: {pcap_file}") + sys.exit(1) + return ('pcap', pcap_file, None, None) + + # Otherwise, parse as live capture arguments if len(args) > 0: # Check if first arg is an interface name if args[0].startswith("wl") and len(args[0]) <= 6: @@ -44,13 +82,14 @@ def parse_args(): CHANNEL = int(args[1]) if len(args) > 1 else 36 DURATION = int(args[2]) if len(args) > 2 else 10 else: + # First arg is channel (using default interface) CHANNEL = int(args[0]) if len(args) > 0 else 36 DURATION = int(args[1]) if len(args) > 1 else 10 else: CHANNEL = 36 DURATION = 10 - return WIFI_INTERFACE, CHANNEL, DURATION + return ('live', WIFI_INTERFACE, CHANNEL, DURATION) def setup_monitor_mode(interface, channel): """Set WiFi interface to monitor mode.""" @@ -435,78 +474,113 @@ def packet_handler(pkt, packets_list): def main(): """Main function.""" - interface, channel, duration = parse_args() + mode, interface_or_pcap, channel, duration = parse_args() - print("=== Testing Monitor Mode with scapy ===") - print(f"Interface: {interface}") - print(f"Channel: {channel}") - print(f"Duration: {duration} seconds") - print() + if mode == 'pcap': + # Read from pcap file + pcap_file = interface_or_pcap + print("=== Reading packets from PCAP file ===") + print(f"PCAP file: {pcap_file}") + print() + + try: + packets = rdpcap(pcap_file) + print(f"Loaded {len(packets)} packets from {pcap_file}") + + # Get file size + file_size = os.path.getsize(pcap_file) + print(f"File size: {file_size} bytes") + print() + + # Analyze packets (duration doesn't matter for pcap mode) + analyze_packets(packets, 1.0) # Use 1.0 as placeholder duration + + except Exception as e: + print(f"Error reading PCAP file: {e}") + import traceback + traceback.print_exc() + sys.exit(1) - # Setup monitor mode - setup_monitor_mode(interface, channel) - - # Test capture (1 second) - print("\nChecking Data Link Type (1 second test capture)...") - print("(This may take up to 2 seconds if no packets are present)") - - test_packets = [] - try: - sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False) - except Exception as e: - print(f"Error during test capture: {e}") - - test_count = len(test_packets) - test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) - - if test_count > 0: - print("Sample packets:") - for i, pkt in enumerate(test_packets[:5]): - ra, ta = get_ra_ta(pkt) - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - plcp = "yes" if pkt.haslayer(RadioTap) else "no" - print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") - - print(f"\nTest capture results:") - print(f" Packets captured: {test_count}") - print(f" PLCP headers: {test_plcp}") - if test_plcp == 0 and test_count > 0: - print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") - - # Main capture - print(f"\n=== Starting scapy capture ({duration} seconds) ===") - print("Press Ctrl+C to stop early\n") - - print(f"Capturing packets for {duration} seconds...") - - packets = [] - pcap_file = None - - if KEEP_PCAP: - pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') - pcap_path = pcap_file.name - pcap_file.close() - print(f"Capturing to file: {pcap_path}") - - try: - if KEEP_PCAP: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True) - wrpcap(pcap_path, packets) - print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") - print(f"Keeping pcap file: {pcap_path}") - print(f" (Use: scapy -r {pcap_path} to analyze)") + else: + # Live capture from interface + interface = interface_or_pcap + + print("=== Testing Monitor Mode with scapy ===") + print(f"Interface: {interface}") + print(f"Channel: {channel}") + print(f"Duration: {duration} seconds") + print() + + # Setup monitor mode + setup_monitor_mode(interface, channel) + + # Test capture (1 second) + print("\nChecking Data Link Type (1 second test capture)...") + print("(This may take up to 2 seconds if no packets are present)") + + test_packets = [] + snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN + try: + sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=snaplen) + except Exception as e: + print(f"Error during test capture: {e}") + + test_count = len(test_packets) + test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) + + if test_count > 0: + print("Sample packets:") + for i, pkt in enumerate(test_packets[:5]): + ra, ta = get_ra_ta(pkt) + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + plcp = "yes" if pkt.haslayer(RadioTap) else "no" + print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") + + print(f"\nTest capture results:") + print(f" Packets captured: {test_count}") + print(f" PLCP headers: {test_plcp}") + if test_plcp == 0 and test_count > 0: + print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") + + # Main capture + print(f"\n=== Starting scapy capture ({duration} seconds) ===") + print("Press Ctrl+C to stop early\n") + + # Set snaplen based on FULL_PACKET flag + snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN + if FULL_PACKET: + print("Capturing full packets...") else: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True) - except KeyboardInterrupt: - print("\nCapture interrupted by user") - except Exception as e: - print(f"\nError during capture: {e}") - import traceback - traceback.print_exc() - - # Analyze packets - analyze_packets(packets, duration) + print(f"Capturing packets (snaplen={snaplen} bytes, header + some payload)...") + + packets = [] + pcap_file = None + + if KEEP_PCAP: + pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') + pcap_path = pcap_file.name + pcap_file.close() + print(f"Capturing to file: {pcap_path}") + + try: + if KEEP_PCAP: + sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen) + wrpcap(pcap_path, packets) + print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") + print(f"Keeping pcap file: {pcap_path}") + print(f" (Use: python3 test_monitor.py {pcap_path} to analyze)") + else: + sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen) + except KeyboardInterrupt: + print("\nCapture interrupted by user") + except Exception as e: + print(f"\nError during capture: {e}") + import traceback + traceback.print_exc() + + # Analyze packets + analyze_packets(packets, duration) if __name__ == "__main__": main()