#!/usr/bin/env python3 """ Monitor mode WiFi packet capture and analysis using scapy. Replaces the bash script with a pure Python solution. Usage: # Live capture from interface: sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10 sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10 --full-packet # Read from pcap file: python3 wifi_monitor.py --pcap /tmp/capture.pcap python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility # High-rate traffic (e.g. iperf): capture with tcpdump to avoid scapy drops, then analyze: sudo python3 wifi_monitor.py -i wlan0 -c 11 -t 10 --tcpdump-capture """ # Standard library imports import argparse import asyncio import os import re import subprocess import sys import tempfile import time from collections import Counter, defaultdict from datetime import datetime # Third-party imports from scapy.all import RadioTap, rdpcap, sniff, wrpcap from scapy.layers.dot11 import ( Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon, Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS ) class Config: """Configuration class for capture settings.""" def __init__(self): self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0") self.keep_pcap = False self.full_packet = False self.tcpdump_capture = False # Use tcpdump for capture (recommended for high packet rates) self.default_snaplen = 256 # Header + some payload @property def snaplen(self): """Get snaplen value based on full_packet setting.""" return 0 if self.full_packet else self.default_snaplen class WiFiMonitor: """Handles WiFi interface monitor mode setup and teardown with async support.""" def __init__(self, interface, channel): self.interface = interface self.channel = channel self._error_message = None self._original_mode = None self._is_started = False @property def error_message(self): """Get error message if setup failed.""" return self._error_message @property def is_successful(self): """Check if setup was successful.""" return self._error_message is None @property def is_started(self): """Check if monitor mode is currently active.""" return self._is_started async def start(self): """Start monitor mode on the WiFi interface.""" self._error_message = None result = True print(f"=== Setting up monitor mode on {self.interface} ===") if not await self._unmanage_networkmanager(): result = False await self._unblock_wifi() if not await self._set_monitor_mode(): result = False if not await self._set_channel(): result = False await self._verify_monitor_mode() if result: self._is_started = True return result async def stop(self): """Stop monitor mode and restore interface to managed mode.""" if not self._is_started: return True print(f"=== Restoring {self.interface} to managed mode ===") result = True try: # Bring down interface proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "down", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() await asyncio.sleep(0.5) # Set back to managed mode proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "type", "managed", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" print(f"Warning: Failed to set managed mode: {error_msg}") result = False else: print("Restored to managed mode") # Re-enable NetworkManager management await self._manage_networkmanager() # Bring up interface proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "up", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() self._is_started = False except OSError as e: self._error_message = f"Error restoring managed mode: {e}" print(self._error_message) result = False return result async def _unmanage_networkmanager(self): """Try to unmanage interface from NetworkManager.""" try: proc = await asyncio.create_subprocess_exec( "which", "nmcli", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() if proc.returncode == 0: print("Unmanaging interface from NetworkManager...") proc = await asyncio.create_subprocess_exec( "nmcli", "device", "set", self.interface, "managed", "no", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass return True async def _manage_networkmanager(self): """Re-enable NetworkManager management of the interface.""" try: proc = await asyncio.create_subprocess_exec( "which", "nmcli", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() if proc.returncode == 0: proc = await asyncio.create_subprocess_exec( "nmcli", "device", "set", self.interface, "managed", "yes", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass async def _unblock_wifi(self): """Unblock WiFi if blocked by rfkill.""" try: proc = await asyncio.create_subprocess_exec( "rfkill", "unblock", "wifi", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass async def _set_monitor_mode(self): """Set interface to monitor mode.""" try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "info", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() await proc.wait() if proc.returncode == 0 and b"type monitor" in stdout: print(f"Already in monitor mode") return True print(f"Setting {self.interface} to monitor mode...") proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "down", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() await asyncio.sleep(0.5) proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "type", "monitor", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" self._error_message = f"Error setting monitor mode: {error_msg}" print(self._error_message) return False await asyncio.sleep(0.5) proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "up", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" self._error_message = f"Error bringing interface up: {error_msg}" print(self._error_message) return False print("Monitor mode activated") return True except OSError as e: self._error_message = f"Error setting monitor mode: {e}" print(self._error_message) return False async def _set_channel(self): """Set channel for monitor mode interface.""" try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "channel", str(self.channel), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode == 0: print(f"Channel set to {self.channel}") return True else: print(f"Error setting channel: {stderr.decode() if stderr else 'Unknown error'}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal except OSError as e: print(f"Error setting channel: {e}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal async def _verify_monitor_mode(self): """Verify monitor mode is active.""" print("\nVerifying monitor mode...") try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "info", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) stdout, _ = await proc.communicate() await proc.wait() if proc.returncode == 0: for line in stdout.decode().splitlines(): if "type" in line or "channel" in line: print(f"\t{line.strip()}") except (OSError, subprocess.SubprocessError): pass class PacketParser: """Handles parsing of 802.11 packet fields.""" @staticmethod def _get_subtype_from_fc(pkt): """Extract subtype from Frame Control field, even for encrypted frames.""" if not pkt.haslayer(Dot11): return None dot11 = pkt[Dot11] # Try direct access first if hasattr(dot11, 'subtype'): return dot11.subtype # For encrypted frames, parse from Frame Control field # Frame Control format: Protocol Version (2 bits) | Type (2 bits) | Subtype (4 bits) | ... if hasattr(dot11, 'FCfield'): # FCfield might be the full 16-bit field, extract subtype # Subtype is bits 4-7 (0-indexed from right) fc = dot11.FCfield if hasattr(dot11, 'FCfield') else 0 subtype = (fc >> 4) & 0x0F return subtype return None @staticmethod def frame_type_name(pkt): """Get human-readable frame type name.""" if not pkt.haslayer(Dot11): return "Unknown" dot11 = pkt[Dot11] fc = dot11.FCfield # Get type and subtype frame_type = dot11.type if hasattr(dot11, 'type') else ((fc >> 2) & 0x03) subtype = PacketParser._get_subtype_from_fc(pkt) if frame_type == 0: # Management if subtype == 8: return "Beacon" elif subtype == 4: return "Probe Request" elif subtype == 5: return "Probe Response" elif subtype == 11: return "Authentication" elif subtype == 12: return "Deauthentication" elif subtype == 0: return "Association Request" elif subtype == 1: return "Association Response" elif subtype == 10: return "Disassociation" else: return f"Management ({subtype})" elif frame_type == 1: # Control return f"Control ({subtype})" elif frame_type == 2: # Data if subtype == 8: return "QoS Data" elif subtype == 0: return "Data" else: return f"Data ({subtype})" else: return f"Unknown ({frame_type}/{subtype})" @staticmethod def ra_ta(pkt): """Extract RA and TA from 802.11 frame.""" if not pkt.haslayer(Dot11): return None, None dot11 = pkt[Dot11] ra = dot11.addr1 if hasattr(dot11, 'addr1') else None ta = dot11.addr2 if hasattr(dot11, 'addr2') else None if ra: ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) if ta: ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) return ra, ta @staticmethod def phy_info(pkt): """Extract PHY rate and MCS from packet (if available in radiotap).""" phy_rate = None mcs = None if pkt.haslayer(RadioTap): radiotap = pkt[RadioTap] if hasattr(radiotap, 'Rate'): rate_val = radiotap.Rate if rate_val: phy_rate = rate_val * 0.5 # Convert to Mbps if hasattr(radiotap, 'MCS'): mcs_data = radiotap.MCS if mcs_data and hasattr(mcs_data, 'index'): mcs = mcs_data.index return phy_rate, mcs class CaptureAnalyzer: """Analyzes packet captures and generates statistics.""" def __init__(self, parser): self.parser = parser @staticmethod def _format_timestamp(pkt): """Format packet timestamp as HH:MM:SS.mmm.""" if hasattr(pkt, 'time') and pkt.time is not None: try: ts = float(pkt.time) # pcap can give EDecimal; fromtimestamp needs int/float except (TypeError, ValueError): return "N/A" dt = datetime.fromtimestamp(ts) return dt.strftime("%H:%M:%S.%f")[:-3] # Truncate to milliseconds return "N/A" def analyze(self, packets, duration, tcpdump_data_count=None): """Analyze captured packets and generate statistics.""" print("\n=== Capture Statistics ===") total_count = len(packets) print(f"Total packets captured (scapy): {total_count}") scapy_data_count = sum(1 for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2) if tcpdump_data_count is not None: if tcpdump_data_count > 0: ratio = scapy_data_count / tcpdump_data_count print(f"Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump), ratio: {ratio:.1%}") elif scapy_data_count > 0: print(f"Data frames: {scapy_data_count} (scapy) vs 0 (tcpdump) - tcpdump found 0 data frames") else: print(f"Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump)") else: print(f"Data frames: {scapy_data_count} (scapy) - tcpdump counter unavailable") if total_count == 0: self._print_no_packets_message() return plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) print(f"PLCP headers: {plcp_count}") if total_count > 0: rate = total_count / duration print(f"Packet rate: {rate:.1f} packets/second") print() self._print_sample_packets(packets) self._print_ra_ta_pairs(packets) self._print_phy_histograms(packets) self._print_frame_type_breakdown(packets) self._print_data_frame_analysis(packets) # Calculate scapy data frame count for summary scapy_data_count = sum(1 for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2) self._print_summary(total_count, plcp_count, tcpdump_data_count, scapy_data_count) def _print_no_packets_message(self): """Print message when no packets are captured.""" print("\n(No packets captured)") print("\n=== Summary ===") print("✗ No packets captured. Check:") print(f" 1. Is there WiFi traffic on the channel?") print(f" 2. Is the interface actually in monitor mode?") print(f" 3. Try a different channel or longer duration") def _print_sample_packets(self, packets): """Print sample packets.""" print("Sample packets (first 10):") for i, pkt in enumerate(packets[:10]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = self.parser.frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = "N/A" if pkt.haslayer(Dot11): duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" timestamp = self._format_timestamp(pkt) print(f" Frame {i+1}: time={timestamp}, RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") print() def _print_ra_ta_pairs(self, packets): """Print unique RA/TA pairs.""" print("Unique RA/TA pairs (with counts):") ra_ta_pairs = Counter() for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if ra or ta: ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" ra_ta_pairs[pair] += 1 if ra_ta_pairs: for pair, count in ra_ta_pairs.most_common(): print(f" {pair}: {count} frame(s)") else: print(" (no valid RA/TA pairs found)") print() def _print_phy_histograms(self, packets): """Print PHY rate and MCS histograms per RA/TA pair.""" print("PHY Rate and MCS Histograms per RA/TA pair:") rate_histograms = defaultdict(Counter) mcs_histograms = defaultdict(Counter) for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if not (ra or ta): continue if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: continue ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" phy_rate, mcs = self.parser.phy_info(pkt) if phy_rate: rate_histograms[pair][phy_rate] += 1 if mcs is not None: mcs_histograms[pair][mcs] += 1 for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): print(f"\n {pair}:") print(" PHY Rate (Mbps):") if pair in rate_histograms: for rate in sorted(rate_histograms[pair].keys()): print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") else: print(" (no PHY rate data)") print(" MCS Index:") if pair in mcs_histograms: for mcs_val in sorted(mcs_histograms[pair].keys()): print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") else: print(" (no MCS data)") print() def _print_frame_type_breakdown(self, packets): """Print frame type breakdown.""" print("Frame type breakdown:") frame_types = Counter() for pkt in packets: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: frame_types["Management"] += 1 elif dot11.type == 1: frame_types["Control"] += 1 elif dot11.type == 2: frame_types["Data"] += 1 else: frame_types["Unknown"] += 1 for frame_type, count in frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print() def _print_data_frame_analysis(self, packets): """Print data frame analysis.""" print("Data frame analysis (QoS Data frames, subtype 8):") # Get all data frames data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2] data_count = len(data_frames) print(f" Total data frames: {data_count}") # Analyze subtypes subtype_counts = Counter() for pkt in data_frames: if pkt.haslayer(Dot11): subtype = self.parser._get_subtype_from_fc(pkt) if subtype is not None: subtype_counts[subtype] += 1 else: subtype_counts['unknown'] += 1 if subtype_counts: print(" Data frame subtypes:") for subtype, count in sorted(subtype_counts.items(), key=lambda x: (1 if x[0] == 'unknown' else 0, x[0])): subtype_name = self._get_data_subtype_name(subtype) print(f" Subtype {subtype} ({subtype_name}): {count} frame(s)") # Find QoS Data frames (subtype 8) qos_data_frames = [pkt for pkt in data_frames if self.parser._get_subtype_from_fc(pkt) == 8] qos_count = len(qos_data_frames) print(f" QoS Data frames (type 2, subtype 8): {qos_count}") encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) unencrypted_count = qos_count - encrypted_count print(f" Encrypted: {encrypted_count}") print(f" Unencrypted: {unencrypted_count}") if qos_count > 0: print(" Sample QoS Data frames:") for i, pkt in enumerate(qos_data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") elif data_count > 0: print(" Sample data frames (all subtypes):") for i, pkt in enumerate(data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" dot11 = pkt[Dot11] subtype = self.parser._get_subtype_from_fc(pkt) subtype_name = self._get_data_subtype_name(subtype) if subtype is not None else "Unknown" encrypted = "encrypted" if (dot11.FCfield & 0x40) else "unencrypted" retry = " [retry]" if (dot11.FCfield & 0x08) else "" duration_val = dot11.Duration if hasattr(dot11, 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, subtype={subtype} ({subtype_name}), {encrypted}, dur={duration_val}{retry}") print() def _get_data_subtype_name(self, subtype): """Get human-readable data frame subtype name.""" subtype_names = { 0: "Data", 1: "Data+CF-Ack", 2: "Data+CF-Poll", 3: "Data+CF-Ack+CF-Poll", 4: "Null", 5: "CF-Ack", 6: "CF-Poll", 7: "CF-Ack+CF-Poll", 8: "QoS Data", 9: "QoS Data+CF-Ack", 10: "QoS Data+CF-Poll", 11: "QoS Data+CF-Ack+CF-Poll", 12: "QoS Null", 13: "Reserved", 14: "QoS CF-Poll", 15: "QoS CF-Ack+CF-Poll" } return subtype_names.get(subtype, f"Unknown ({subtype})") def _print_summary(self, total_count, plcp_count, tcpdump_data_count=None, scapy_data_count=0): """Print summary statistics.""" print("=== Summary ===") if total_count > 0: print(f"✓ Monitor mode is working! Captured {total_count} packet(s) (scapy)") if tcpdump_data_count is not None and tcpdump_data_count > 0: print(f"✓ Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump)") if plcp_count > 0: print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") else: print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") class PacketCapture: """Handles packet capture operations.""" def __init__(self, config): self.config = config self.parser = PacketParser() self.analyzer = CaptureAnalyzer(self.parser) def capture_from_pcap(self, pcap_file, duration_sec=None): """Read and analyze packets from a PCAP file. duration_sec: If set (e.g. when file was captured with -t N), used for packet rate. If None, rate uses 1.0 second. """ print("=== Reading packets from PCAP file ===") print(f"PCAP file: {pcap_file}") print() try: packets = rdpcap(pcap_file) print(f"Loaded {len(packets)} packets from {pcap_file}") file_size = os.path.getsize(pcap_file) print(f"File size: {file_size} bytes") print() duration = duration_sec if duration_sec is not None and duration_sec > 0 else 1.0 self.analyzer.analyze(packets, duration, None) return 0 except OSError as e: print(f"Error reading PCAP file: {e}") return 1 except Exception as e: print(f"Error reading PCAP file: {e}") import traceback traceback.print_exc() return 1 def capture_live(self, interface, channel, duration): """Capture packets from a live interface.""" return asyncio.run(self._capture_live_async(interface, channel, duration)) async def _capture_live_async(self, interface, channel, duration): """Async implementation of live capture.""" print("=== Testing Monitor Mode with scapy ===") print(f"Interface: {interface}") print(f"Channel: {channel}") print(f"Time: {duration} seconds") print() monitor = WiFiMonitor(interface, channel) if not await monitor.start(): if monitor.error_message: print(monitor.error_message) return 1 try: if not await self._test_capture_async(interface): return 1 if self.config.tcpdump_capture: return await self._capture_with_tcpdump_async(interface, duration) return await self._main_capture_async(interface, duration) finally: await monitor.stop() async def _test_capture_async(self, interface): """Perform a test capture to verify monitor mode (async).""" print("\nChecking Data Link Type (1 second test capture)...") print("(This may take up to 2 seconds if no packets are present)") test_packets = [] try: # Run blocking sniff in executor # Note: snaplen is not supported for live interface capture in scapy loop = asyncio.get_event_loop() sniff_kwargs = { "iface": interface, "prn": lambda pkt: test_packets.append(pkt), "timeout": 1, "store": False } await loop.run_in_executor( None, lambda: sniff(**sniff_kwargs) ) except OSError as e: print(f"Error during test capture (system error): {e}") return False except Exception as e: print(f"Error during test capture: {e}") return False test_count = len(test_packets) test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) if test_count > 0: print("Sample packets:") for i, pkt in enumerate(test_packets[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" timestamp = self.analyzer._format_timestamp(pkt) print(f" Frame {i+1}: time={timestamp}, RA={ra_str}, TA={ta_str}, PLCP={plcp}") print(f"\nTest capture results:") print(f" Packets captured: {test_count}") print(f" PLCP headers: {test_plcp}") if test_plcp == 0 and test_count > 0: print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") return True async def _run_tcpdump_counter(self, interface, duration): """Run tcpdump concurrently to count data frames.""" try: # Check if tcpdump is available check_proc = await asyncio.create_subprocess_exec( "which", "tcpdump", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await check_proc.wait() if check_proc.returncode != 0: print("Warning: tcpdump not found, skipping concurrent count") return None # Use tcpdump with BPF filter for data frames # For 802.11 frames: wlan[0] contains Frame Control field # Bits 2-3 are the type field: 00=Management, 01=Control, 10=Data # Type 2 (Data) = 0x08 when masked with 0x0C # BPF syntax: use == for equality comparison proc = await asyncio.create_subprocess_exec( "tcpdump", "-i", interface, "-n", # Don't resolve addresses "-q", # Quiet mode (less verbose) "-c", "1000000", # Large count limit "wlan[0] & 0x0C == 0x08", # BPF filter: type == 2 (data frames) stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) # Wait for duration, then terminate await asyncio.sleep(duration) # Terminate tcpdump gracefully try: proc.terminate() await asyncio.wait_for(proc.wait(), timeout=2.0) except asyncio.TimeoutError: proc.kill() await proc.wait() # Read stderr (tcpdump outputs packet count to stderr) _, stderr = await proc.communicate() stderr_text = stderr.decode() if stderr else "" # Check return code if proc.returncode != 0 and proc.returncode != -15: # -15 is SIGTERM, which is expected print(f"Warning: tcpdump exited with code {proc.returncode}") if stderr_text: print(f"tcpdump stderr: {stderr_text[:300]}") return None # Debug: print stderr if no count found if not stderr_text.strip(): print("Warning: tcpdump produced no output") return None # Parse tcpdump output for packet count # Format: "X packets captured" or "X packets received by filter" data_frame_count = 0 for line in stderr_text.splitlines(): # Look for "X packets captured" or "X packets received by filter" match = re.search(r'(\d+)\s+packets?\s+(captured|received by filter)', line, re.IGNORECASE) if match: data_frame_count = int(match.group(1)) break # Also try simpler pattern match = re.search(r'(\d+)\s+packets?', line, re.IGNORECASE) if match and "dropped" not in line.lower() and "packet" in line.lower(): potential_count = int(match.group(1)) if potential_count > data_frame_count: data_frame_count = potential_count if data_frame_count == 0: # Debug output - but still return 0 so we can show it print(f"Warning: tcpdump found 0 data frames. tcpdump stderr: {stderr_text[:200]}") # Return the count even if 0, so we can display it return data_frame_count except FileNotFoundError: print("Warning: tcpdump not found, skipping concurrent count") return None except Exception as e: print(f"Warning: tcpdump counter failed: {e}") import traceback traceback.print_exc() return None async def _capture_with_tcpdump_async(self, interface, duration): """Capture using tcpdump (full line rate), then analyze with scapy. Use this for high packet rates where scapy sniff() would drop packets. """ print(f"\n=== Capturing with tcpdump ({duration} seconds) ===") print("(tcpdump captures at line rate; analysis runs afterward)\n") keep = self.config.keep_pcap if keep: pcap_f = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='wifi_monitor_') pcap_path = pcap_f.name pcap_f.close() print(f"Output: {pcap_path}") else: pcap_f = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='wifi_monitor_') pcap_path = pcap_f.name pcap_f.close() try: proc = await asyncio.create_subprocess_exec( "tcpdump", "-i", interface, "-w", pcap_path, "-n", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) await asyncio.sleep(duration) try: proc.terminate() await asyncio.wait_for(proc.wait(), timeout=2.0) except asyncio.TimeoutError: proc.kill() await proc.wait() if not os.path.isfile(pcap_path) or os.path.getsize(pcap_path) == 0: print("Error: tcpdump produced no capture file or empty file.") return 1 packets = rdpcap(pcap_path) print(f"Captured {len(packets)} packets. Analyzing...\n") self.analyzer.analyze(packets, duration, None) if keep: print(f"Kept pcap: {pcap_path}") print(f" (Use: python3 wifi_monitor.py {pcap_path} to re-analyze)") else: try: os.unlink(pcap_path) except OSError: pass return 0 except FileNotFoundError: print("Error: tcpdump not found. Install tcpdump or use default scapy capture.") return 1 except Exception as e: print(f"Error during tcpdump capture: {e}") import traceback traceback.print_exc() if not keep and os.path.isfile(pcap_path): try: os.unlink(pcap_path) except OSError: pass return 1 async def _main_capture_async(self, interface, duration): """Perform the main packet capture (async).""" print(f"\n=== Starting scapy capture ({duration} seconds) ===") print("Press Ctrl+C to stop early\n") print("Note: At high packet rates scapy may drop packets. Use --tcpdump-capture (-T) for full-rate capture.\n") if self.config.full_packet: print("Capturing full packets...") else: print(f"Capturing packets (full packets - snaplen not supported for live capture)...") packets = [] pcap_path = None if self.config.keep_pcap: pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') pcap_path = pcap_file.name pcap_file.close() print(f"Capturing to file: {pcap_path}") # Start tcpdump counter concurrently (only if available) tcpdump_task = None try: # Check if tcpdump is available before starting check_proc = await asyncio.create_subprocess_exec( "which", "tcpdump", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await check_proc.wait() if check_proc.returncode == 0: print("Starting concurrent tcpdump counter for data frames...") tcpdump_task = asyncio.create_task(self._run_tcpdump_counter(interface, duration)) else: print("Note: tcpdump not found, skipping concurrent count") except Exception: print("Note: tcpdump not found, skipping concurrent count") capture_error = None try: # Run blocking sniff in executor # Note: snaplen is not supported for live interface capture in scapy # We capture full packets and can truncate in post-processing if needed loop = asyncio.get_event_loop() sniff_kwargs = { "iface": interface, "prn": lambda pkt: packets.append(pkt), "timeout": duration, "store": True } if self.config.keep_pcap: await loop.run_in_executor( None, lambda: sniff(**sniff_kwargs) ) await loop.run_in_executor(None, lambda: wrpcap(pcap_path, packets)) print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") print(f"Keeping pcap file: {pcap_path}") print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)") else: await loop.run_in_executor( None, lambda: sniff(**sniff_kwargs) ) except KeyboardInterrupt: print("\nCapture interrupted by user") except OSError as e: capture_error = e print(f"\nError during capture (system error): {e}") except Exception as e: capture_error = e print(f"\nError during capture: {e}") import traceback traceback.print_exc() # Get tcpdump count (if task was started) tcpdump_data_count = None if tcpdump_task is not None: tcpdump_data_count = await tcpdump_task self.analyzer.analyze(packets, duration, tcpdump_data_count) if capture_error: return 1 return 0 class ArgumentParser: """Handles command line argument parsing using argparse.""" def __init__(self, config): self.config = config def parse(self): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Monitor mode WiFi packet capture and analysis using scapy", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Live capture with defaults (wlan0, channel 36, 10 seconds): sudo python3 wifi_monitor.py # Live capture with specific interface, channel, and time: sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10 # Short form: sudo python3 wifi_monitor.py -i wlan1 -c 11 -t 5 # Save captured packets to pcap file: sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap # High-rate traffic (e.g. iperf): capture with tcpdump to avoid drops: sudo python3 wifi_monitor.py -i wlan0 -c 11 -t 10 --tcpdump-capture # Analyze existing pcap file: python3 wifi_monitor.py --pcap /tmp/capture.pcap python3 wifi_monitor.py /tmp/capture.pcap # Also supported """ ) # PCAP file mode parser.add_argument( '--pcap', metavar='FILE', help='Read packets from PCAP file instead of live capture' ) # Live capture mode options parser.add_argument( '--interface', '-i', default='wlan0', metavar='IFACE', help='WiFi interface name (default: wlan0)' ) parser.add_argument( '--channel', '-c', type=int, default=36, metavar='CH', help='WiFi channel (default: 36)' ) parser.add_argument( '--time', '-t', type=int, default=10, metavar='SECONDS', help='Capture time in seconds (default: 10)' ) # Flags parser.add_argument( '--keep-pcap', '-k', action='store_true', help='Save captured packets to a pcap file' ) parser.add_argument( '--full-packet', '-f', action='store_true', help='Capture full packets (default: header only, not supported for live capture)' ) parser.add_argument( '--tcpdump-capture', '-T', action='store_true', help='Use tcpdump for capture (recommended for high packet rates; avoids scapy drops)' ) # Backward compatibility: if first positional arg looks like a pcap file, use it args, unknown = parser.parse_known_args() # Check for backward compatibility: first unknown arg might be a pcap file if unknown and (unknown[0].endswith('.pcap') or unknown[0].endswith('.cap')): if args.pcap: parser.error("Cannot specify both --pcap and positional pcap file argument") args.pcap = unknown[0] unknown = unknown[1:] if unknown: parser.error(f"Unrecognized arguments: {' '.join(unknown)}") # Apply flags to config self.config.keep_pcap = args.keep_pcap self.config.full_packet = args.full_packet self.config.tcpdump_capture = args.tcpdump_capture # Determine mode if args.pcap: if not os.path.isfile(args.pcap): print(f"Error: PCAP file not found: {args.pcap}") return None return ('pcap', args.pcap, None, None) else: # Live capture mode self.config.wifi_interface = args.interface return ('live', args.interface, args.channel, args.time) def main(): """Main function with single exit point.""" exit_code = 0 config = Config() arg_parser = ArgumentParser(config) parsed_args = arg_parser.parse() if parsed_args is None: exit_code = 1 else: mode, interface_or_pcap, channel, duration = parsed_args capture = PacketCapture(config) if mode == 'pcap': exit_code = capture.capture_from_pcap(interface_or_pcap) else: exit_code = capture.capture_live(interface_or_pcap, channel, duration) return exit_code if __name__ == "__main__": sys.exit(main())