Add PCAP file reading support and packet size limiting option

- Support reading from PCAP files: python3 test_monitor.py file.pcap
- Add --full-packet flag to capture entire packets (default: header-only, 256 bytes)
- Default snaplen=256 bytes captures 802.11 header + some payload
- Mutually exclusive modes: PCAP file reading vs live interface capture
- Improves memory efficiency by default (header-only capture)

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Robert McMahon 2026-02-13 15:09:23 -08:00
parent 04e05c409a
commit d3a4937d0a
1 changed files with 148 additions and 74 deletions

View File

@ -4,9 +4,15 @@ Monitor mode WiFi packet capture and analysis using scapy.
Replaces the bash script with a pure Python solution.
Usage:
sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap]
# Live capture from interface:
sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet]
sudo python3 test_monitor.py wlan0 36 10
sudo python3 test_monitor.py wlan0 36 10 --keep-pcap
sudo python3 test_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only)
# Read from pcap file:
python3 test_monitor.py <pcap_file>
python3 test_monitor.py /tmp/capture.pcap
"""
import sys
@ -24,19 +30,51 @@ from scapy.layers.dot11 import Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Deau
# Configuration
WIFI_INTERFACE = os.environ.get("WIFI_INTERFACE", "wlan0")
KEEP_PCAP = False
FULL_PACKET = False
# Default snaplen: 256 bytes (enough for 802.11 header ~30-40 bytes + some payload)
# 802.11 header can be up to ~40 bytes, plus radiotap header ~20-30 bytes
# 256 bytes gives us header + ~200 bytes of payload, which is usually enough for analysis
DEFAULT_SNAPLEN = 256
def parse_args():
"""Parse command line arguments."""
global WIFI_INTERFACE, KEEP_PCAP
"""Parse command line arguments.
Returns:
tuple: (mode, interface_or_pcap, channel, duration)
mode: 'live' or 'pcap'
interface_or_pcap: interface name (for live) or pcap file path (for pcap)
channel: channel number (for live mode only)
duration: capture duration in seconds (for live mode only)
"""
global WIFI_INTERFACE, KEEP_PCAP, FULL_PACKET
args = sys.argv[1:]
if len(args) == 0:
print("Usage:")
print(" Live capture: sudo python3 test_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]")
print(" Read pcap: python3 test_monitor.py <pcap_file>")
sys.exit(1)
# Check for --keep-pcap flag
if "--keep-pcap" in args or "-k" in args:
KEEP_PCAP = True
args = [a for a in args if a not in ["--keep-pcap", "-k"]]
# Parse interface, channel, duration
# Check for --full-packet flag
if "--full-packet" in args or "-f" in args:
FULL_PACKET = True
args = [a for a in args if a not in ["--full-packet", "-f"]]
# Check if first argument is a pcap file (ends with .pcap or .cap)
if len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')):
pcap_file = args[0]
if not os.path.isfile(pcap_file):
print(f"Error: PCAP file not found: {pcap_file}")
sys.exit(1)
return ('pcap', pcap_file, None, None)
# Otherwise, parse as live capture arguments
if len(args) > 0:
# Check if first arg is an interface name
if args[0].startswith("wl") and len(args[0]) <= 6:
@ -44,13 +82,14 @@ def parse_args():
CHANNEL = int(args[1]) if len(args) > 1 else 36
DURATION = int(args[2]) if len(args) > 2 else 10
else:
# First arg is channel (using default interface)
CHANNEL = int(args[0]) if len(args) > 0 else 36
DURATION = int(args[1]) if len(args) > 1 else 10
else:
CHANNEL = 36
DURATION = 10
return WIFI_INTERFACE, CHANNEL, DURATION
return ('live', WIFI_INTERFACE, CHANNEL, DURATION)
def setup_monitor_mode(interface, channel):
"""Set WiFi interface to monitor mode."""
@ -435,78 +474,113 @@ def packet_handler(pkt, packets_list):
def main():
"""Main function."""
interface, channel, duration = parse_args()
mode, interface_or_pcap, channel, duration = parse_args()
print("=== Testing Monitor Mode with scapy ===")
print(f"Interface: {interface}")
print(f"Channel: {channel}")
print(f"Duration: {duration} seconds")
print()
if mode == 'pcap':
# Read from pcap file
pcap_file = interface_or_pcap
print("=== Reading packets from PCAP file ===")
print(f"PCAP file: {pcap_file}")
print()
try:
packets = rdpcap(pcap_file)
print(f"Loaded {len(packets)} packets from {pcap_file}")
# Get file size
file_size = os.path.getsize(pcap_file)
print(f"File size: {file_size} bytes")
print()
# Analyze packets (duration doesn't matter for pcap mode)
analyze_packets(packets, 1.0) # Use 1.0 as placeholder duration
except Exception as e:
print(f"Error reading PCAP file: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Setup monitor mode
setup_monitor_mode(interface, channel)
# Test capture (1 second)
print("\nChecking Data Link Type (1 second test capture)...")
print("(This may take up to 2 seconds if no packets are present)")
test_packets = []
try:
sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False)
except Exception as e:
print(f"Error during test capture: {e}")
test_count = len(test_packets)
test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap))
if test_count > 0:
print("Sample packets:")
for i, pkt in enumerate(test_packets[:5]):
ra, ta = get_ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}")
print(f"\nTest capture results:")
print(f" Packets captured: {test_count}")
print(f" PLCP headers: {test_plcp}")
if test_plcp == 0 and test_count > 0:
print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
# Main capture
print(f"\n=== Starting scapy capture ({duration} seconds) ===")
print("Press Ctrl+C to stop early\n")
print(f"Capturing packets for {duration} seconds...")
packets = []
pcap_file = None
if KEEP_PCAP:
pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_')
pcap_path = pcap_file.name
pcap_file.close()
print(f"Capturing to file: {pcap_path}")
try:
if KEEP_PCAP:
sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True)
wrpcap(pcap_path, packets)
print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes")
print(f"Keeping pcap file: {pcap_path}")
print(f" (Use: scapy -r {pcap_path} to analyze)")
else:
# Live capture from interface
interface = interface_or_pcap
print("=== Testing Monitor Mode with scapy ===")
print(f"Interface: {interface}")
print(f"Channel: {channel}")
print(f"Duration: {duration} seconds")
print()
# Setup monitor mode
setup_monitor_mode(interface, channel)
# Test capture (1 second)
print("\nChecking Data Link Type (1 second test capture)...")
print("(This may take up to 2 seconds if no packets are present)")
test_packets = []
snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN
try:
sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=snaplen)
except Exception as e:
print(f"Error during test capture: {e}")
test_count = len(test_packets)
test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap))
if test_count > 0:
print("Sample packets:")
for i, pkt in enumerate(test_packets[:5]):
ra, ta = get_ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}")
print(f"\nTest capture results:")
print(f" Packets captured: {test_count}")
print(f" PLCP headers: {test_plcp}")
if test_plcp == 0 and test_count > 0:
print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
# Main capture
print(f"\n=== Starting scapy capture ({duration} seconds) ===")
print("Press Ctrl+C to stop early\n")
# Set snaplen based on FULL_PACKET flag
snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN
if FULL_PACKET:
print("Capturing full packets...")
else:
sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True)
except KeyboardInterrupt:
print("\nCapture interrupted by user")
except Exception as e:
print(f"\nError during capture: {e}")
import traceback
traceback.print_exc()
# Analyze packets
analyze_packets(packets, duration)
print(f"Capturing packets (snaplen={snaplen} bytes, header + some payload)...")
packets = []
pcap_file = None
if KEEP_PCAP:
pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_')
pcap_path = pcap_file.name
pcap_file.close()
print(f"Capturing to file: {pcap_path}")
try:
if KEEP_PCAP:
sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen)
wrpcap(pcap_path, packets)
print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes")
print(f"Keeping pcap file: {pcap_path}")
print(f" (Use: python3 test_monitor.py {pcap_path} to analyze)")
else:
sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen)
except KeyboardInterrupt:
print("\nCapture interrupted by user")
except Exception as e:
print(f"\nError during capture: {e}")
import traceback
traceback.print_exc()
# Analyze packets
analyze_packets(packets, duration)
if __name__ == "__main__":
main()