#!/usr/bin/env python3 """ Monitor mode WiFi packet capture and analysis using scapy. Replaces the bash script with a pure Python solution. Usage: sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] sudo python3 test_monitor.py wlan0 36 10 sudo python3 test_monitor.py wlan0 36 10 --keep-pcap """ import sys import os import subprocess import signal import time import tempfile from datetime import datetime from collections import Counter, defaultdict from scapy.all import * from scapy.layers.dot11 import Dot11, Dot11QoS, Dot11Beacon, Dot11ProbeReq, Dot11ProbeResp from scapy.layers.dot11 import Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Deauth, Dot11Disas # Configuration WIFI_INTERFACE = os.environ.get("WIFI_INTERFACE", "wlan0") KEEP_PCAP = False def parse_args(): """Parse command line arguments.""" global WIFI_INTERFACE, KEEP_PCAP args = sys.argv[1:] # Check for --keep-pcap flag if "--keep-pcap" in args or "-k" in args: KEEP_PCAP = True args = [a for a in args if a not in ["--keep-pcap", "-k"]] # Parse interface, channel, duration if len(args) > 0: # Check if first arg is an interface name if args[0].startswith("wl") and len(args[0]) <= 6: WIFI_INTERFACE = args[0] CHANNEL = int(args[1]) if len(args) > 1 else 36 DURATION = int(args[2]) if len(args) > 2 else 10 else: CHANNEL = int(args[0]) if len(args) > 0 else 36 DURATION = int(args[1]) if len(args) > 1 else 10 else: CHANNEL = 36 DURATION = 10 return WIFI_INTERFACE, CHANNEL, DURATION def setup_monitor_mode(interface, channel): """Set WiFi interface to monitor mode.""" print(f"=== Setting up monitor mode on {interface} ===") # Try to unmanage interface from NetworkManager try: result = subprocess.run(["which", "nmcli"], capture_output=True, text=True) if result.returncode == 0: print("Unmanaging interface from NetworkManager...") subprocess.run( ["nmcli", "device", "set", interface, "managed", "no"], capture_output=True, stderr=subprocess.DEVNULL ) except Exception: pass # Unblock WiFi if blocked by rfkill try: subprocess.run(["rfkill", "unblock", "wifi"], check=False) except Exception: pass # Check current mode try: result = subprocess.run( ["iw", "dev", interface, "info"], capture_output=True, text=True, check=True ) if "type monitor" in result.stdout: print(f"Already in monitor mode") else: print(f"Setting {interface} to monitor mode...") # Bring down interface subprocess.run( ["ip", "link", "set", interface, "down"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) time.sleep(0.5) # Set monitor mode subprocess.run( ["iw", "dev", interface, "set", "type", "monitor"], check=True, capture_output=True, text=True ) time.sleep(0.5) # Bring up interface subprocess.run( ["ip", "link", "set", interface, "up"], capture_output=True, text=True, check=True ) print("Monitor mode activated") except subprocess.CalledProcessError as e: print(f"Error setting monitor mode: {e}") if hasattr(e, 'stderr') and e.stderr: print(f"Error details: {e.stderr}") sys.exit(1) # Set channel try: subprocess.run( ["iw", "dev", interface, "set", "channel", str(channel)], check=True, capture_output=True ) print(f"Channel set to {channel}") except subprocess.CalledProcessError as e: print(f"Error setting channel: {e}") print("Continuing anyway - channel may not be set correctly") # Verify monitor mode print("\nVerifying monitor mode...") try: result = subprocess.run( ["iw", "dev", interface, "info"], capture_output=True, text=True, check=True ) for line in result.stdout.splitlines(): if "type" in line or "channel" in line: print(f"\t{line.strip()}") except Exception: pass def get_frame_type_name(pkt): """Get human-readable frame type name.""" if not pkt.haslayer(Dot11): return "Unknown" dot11 = pkt[Dot11] fc = dot11.FCfield if dot11.type == 0: # Management if dot11.subtype == 8: return "Beacon" elif dot11.subtype == 4: return "Probe Request" elif dot11.subtype == 5: return "Probe Response" elif dot11.subtype == 11: return "Authentication" elif dot11.subtype == 12: return "Deauthentication" elif dot11.subtype == 0: return "Association Request" elif dot11.subtype == 1: return "Association Response" elif dot11.subtype == 10: return "Disassociation" else: return f"Management ({dot11.subtype})" elif dot11.type == 1: # Control return f"Control ({dot11.subtype})" elif dot11.type == 2: # Data if dot11.subtype == 8: return "QoS Data" elif dot11.subtype == 0: return "Data" else: return f"Data ({dot11.subtype})" else: return f"Unknown ({dot11.type}/{dot11.subtype})" def get_ra_ta(pkt): """Extract RA and TA from 802.11 frame.""" if not pkt.haslayer(Dot11): return None, None dot11 = pkt[Dot11] # RA is typically addr1, TA is typically addr2 # But this depends on frame type and direction ra = dot11.addr1 if hasattr(dot11, 'addr1') else None ta = dot11.addr2 if hasattr(dot11, 'addr2') else None # Format MAC addresses if ra: ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) if ta: ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) return ra, ta def get_phy_info(pkt): """Extract PHY rate and MCS from packet (if available in radiotap).""" phy_rate = None mcs = None # Check for radiotap layer if pkt.haslayer(RadioTap): radiotap = pkt[RadioTap] # Try to get rate (in 0.5 Mbps units) if hasattr(radiotap, 'Rate'): rate_val = radiotap.Rate if rate_val: phy_rate = rate_val * 0.5 # Convert to Mbps # Try to get MCS if hasattr(radiotap, 'MCS'): mcs_data = radiotap.MCS if mcs_data and hasattr(mcs_data, 'index'): mcs = mcs_data.index return phy_rate, mcs def analyze_packets(packets, duration): """Analyze captured packets and generate statistics.""" print("\n=== Capture Statistics ===") total_count = len(packets) print(f"Total packets captured: {total_count}") if total_count == 0: print("\n(No packets captured)") print("\n=== Summary ===") print("✗ No packets captured. Check:") print(f" 1. Is there WiFi traffic on the channel?") print(f" 2. Is the interface actually in monitor mode?") print(f" 3. Try a different channel or longer duration") return # Count PLCP headers (radiotap present) plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) print(f"PLCP headers: {plcp_count}") if total_count > 0: rate = total_count / duration print(f"Packet rate: {rate:.1f} packets/second") print() # Sample packets print("Sample packets (first 10):") for i, pkt in enumerate(packets[:10]): ra, ta = get_ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = get_frame_type_name(pkt) # Check if encrypted encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" # Check retry retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" # Duration duration_val = "N/A" if pkt.haslayer(Dot11): duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" # PLCP plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") print() # Unique RA/TA pairs print("Unique RA/TA pairs (with counts):") ra_ta_pairs = Counter() for pkt in packets: ra, ta = get_ra_ta(pkt) if ra or ta: ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" ra_ta_pairs[pair] += 1 if ra_ta_pairs: for pair, count in ra_ta_pairs.most_common(): print(f" {pair}: {count} frame(s)") else: print(" (no valid RA/TA pairs found)") print() # PHY rate and MCS histograms per RA/TA pair print("PHY Rate and MCS Histograms per RA/TA pair:") rate_histograms = defaultdict(Counter) mcs_histograms = defaultdict(Counter) for pkt in packets: ra, ta = get_ra_ta(pkt) if not (ra or ta): continue # Only process data frames (type 2) if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: continue ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" phy_rate, mcs = get_phy_info(pkt) if phy_rate: rate_histograms[pair][phy_rate] += 1 if mcs is not None: mcs_histograms[pair][mcs] += 1 for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): print(f"\n {pair}:") # PHY Rate histogram print(" PHY Rate (Mbps):") if pair in rate_histograms: for rate in sorted(rate_histograms[pair].keys()): print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") else: print(" (no PHY rate data)") # MCS histogram print(" MCS Index:") if pair in mcs_histograms: for mcs_val in sorted(mcs_histograms[pair].keys()): print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") else: print(" (no MCS data)") print() # Frame type breakdown print("Frame type breakdown:") frame_types = Counter() for pkt in packets: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: frame_types["Management"] += 1 elif dot11.type == 1: frame_types["Control"] += 1 elif dot11.type == 2: frame_types["Data"] += 1 else: frame_types["Unknown"] += 1 for frame_type, count in frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print() # Data frame analysis (QoS Data frames, subtype 8) print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] qos_count = len(qos_data_frames) print(f" QoS Data frames (type 2, subtype 8): {qos_count}") encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) unencrypted_count = qos_count - encrypted_count print(f" Encrypted: {encrypted_count}") print(f" Unencrypted: {unencrypted_count}") if qos_count > 0: print(" Sample QoS Data frames (likely iperf traffic):") for i, pkt in enumerate(qos_data_frames[:5]): ra, ta = get_ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") print() # Frames involving server MAC (80:84:89:93:c4:b6) server_mac = "80:84:89:93:c4:b6" print(f"Frames involving server MAC ({server_mac}):") server_frames = [] for pkt in packets: ra, ta = get_ra_ta(pkt) if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): server_frames.append(pkt) server_count = len(server_frames) print(f" Total frames with server MAC: {server_count}") if server_count > 0: server_frame_types = Counter() for pkt in server_frames: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: server_frame_types["Management"] += 1 elif dot11.type == 1: server_frame_types["Control"] += 1 elif dot11.type == 2: server_frame_types["Data"] += 1 print(" Frame type breakdown:") for frame_type, count in server_frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print(" Sample frames:") for i, pkt in enumerate(server_frames[:5]): ra, ta = get_ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = get_frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") print() # Summary print("=== Summary ===") if total_count > 0: print(f"✓ Monitor mode is working! Captured {total_count} packet(s)") if plcp_count > 0: print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") else: print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") def packet_handler(pkt, packets_list): """Callback function to handle captured packets.""" packets_list.append(pkt) def main(): """Main function.""" interface, channel, duration = parse_args() print("=== Testing Monitor Mode with scapy ===") print(f"Interface: {interface}") print(f"Channel: {channel}") print(f"Duration: {duration} seconds") print() # Setup monitor mode setup_monitor_mode(interface, channel) # Test capture (1 second) print("\nChecking Data Link Type (1 second test capture)...") print("(This may take up to 2 seconds if no packets are present)") test_packets = [] try: sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False) except Exception as e: print(f"Error during test capture: {e}") test_count = len(test_packets) test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) if test_count > 0: print("Sample packets:") for i, pkt in enumerate(test_packets[:5]): ra, ta = get_ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") print(f"\nTest capture results:") print(f" Packets captured: {test_count}") print(f" PLCP headers: {test_plcp}") if test_plcp == 0 and test_count > 0: print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") # Main capture print(f"\n=== Starting scapy capture ({duration} seconds) ===") print("Press Ctrl+C to stop early\n") print(f"Capturing packets for {duration} seconds...") packets = [] pcap_file = None if KEEP_PCAP: pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') pcap_path = pcap_file.name pcap_file.close() print(f"Capturing to file: {pcap_path}") try: if KEEP_PCAP: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True) wrpcap(pcap_path, packets) print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") print(f"Keeping pcap file: {pcap_path}") print(f" (Use: scapy -r {pcap_path} to analyze)") else: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True) except KeyboardInterrupt: print("\nCapture interrupted by user") except Exception as e: print(f"\nError during capture: {e}") import traceback traceback.print_exc() # Analyze packets analyze_packets(packets, duration) if __name__ == "__main__": main()