#!/usr/bin/env python3 """ Monitor mode WiFi packet capture and analysis using scapy. Replaces the bash script with a pure Python solution. Usage: # Live capture from interface: sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] sudo python3 wifi_monitor.py wlan0 36 10 sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) # Read from pcap file: python3 wifi_monitor.py python3 wifi_monitor.py /tmp/capture.pcap """ # Standard library imports import os import subprocess import sys import tempfile import time from collections import Counter, defaultdict # Third-party imports from scapy.all import RadioTap, rdpcap, sniff, wrpcap from scapy.layers.dot11 import ( Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon, Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS ) class Config: """Configuration class for capture settings.""" def __init__(self): self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0") self.keep_pcap = False self.full_packet = False self.default_snaplen = 256 # Header + some payload @property def snaplen(self): """Get snaplen value based on full_packet setting.""" return 0 if self.full_packet else self.default_snaplen class MonitorModeSetup: """Handles WiFi interface monitor mode setup.""" def __init__(self, interface, channel): self.interface = interface self.channel = channel self._error_message = None @property def error_message(self): """Get error message if setup failed.""" return self._error_message @property def is_successful(self): """Check if setup was successful.""" return self._error_message is None def setup(self): """Set WiFi interface to monitor mode.""" result = True print(f"=== Setting up monitor mode on {self.interface} ===") if not self._unmanage_networkmanager(): result = False self._unblock_wifi() if not self._set_monitor_mode(): result = False if not self._set_channel(): result = False self._verify_monitor_mode() return result def _unmanage_networkmanager(self): """Try to unmanage interface from NetworkManager.""" try: result = subprocess.run(["which", "nmcli"], capture_output=True, text=True) if result.returncode == 0: print("Unmanaging interface from NetworkManager...") subprocess.run( ["nmcli", "device", "set", self.interface, "managed", "no"], capture_output=True, stderr=subprocess.DEVNULL ) except OSError: pass return True def _unblock_wifi(self): """Unblock WiFi if blocked by rfkill.""" try: subprocess.run(["rfkill", "unblock", "wifi"], check=False) except OSError: pass def _set_monitor_mode(self): """Set interface to monitor mode.""" try: result = subprocess.run( ["iw", "dev", self.interface, "info"], capture_output=True, text=True, check=True ) if "type monitor" in result.stdout: print(f"Already in monitor mode") return True print(f"Setting {self.interface} to monitor mode...") subprocess.run( ["ip", "link", "set", self.interface, "down"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) time.sleep(0.5) subprocess.run( ["iw", "dev", self.interface, "set", "type", "monitor"], check=True, capture_output=True, text=True ) time.sleep(0.5) subprocess.run( ["ip", "link", "set", self.interface, "up"], check=True, capture_output=True, text=True ) print("Monitor mode activated") return True except subprocess.CalledProcessError as e: self._error_message = f"Error setting monitor mode: {e}" if hasattr(e, 'stderr') and e.stderr: self._error_message += f"\nError details: {e.stderr}" print(self._error_message) return False def _set_channel(self): """Set channel for monitor mode interface.""" try: subprocess.run( ["iw", "dev", self.interface, "set", "channel", str(self.channel)], check=True, capture_output=True ) print(f"Channel set to {self.channel}") return True except subprocess.CalledProcessError as e: print(f"Error setting channel: {e}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal def _verify_monitor_mode(self): """Verify monitor mode is active.""" print("\nVerifying monitor mode...") try: result = subprocess.run( ["iw", "dev", self.interface, "info"], capture_output=True, text=True, check=True ) for line in result.stdout.splitlines(): if "type" in line or "channel" in line: print(f"\t{line.strip()}") except (OSError, subprocess.SubprocessError): pass class PacketParser: """Handles parsing of 802.11 packet fields.""" @staticmethod def frame_type_name(pkt): """Get human-readable frame type name.""" if not pkt.haslayer(Dot11): return "Unknown" dot11 = pkt[Dot11] fc = dot11.FCfield if dot11.type == 0: # Management if dot11.subtype == 8: return "Beacon" elif dot11.subtype == 4: return "Probe Request" elif dot11.subtype == 5: return "Probe Response" elif dot11.subtype == 11: return "Authentication" elif dot11.subtype == 12: return "Deauthentication" elif dot11.subtype == 0: return "Association Request" elif dot11.subtype == 1: return "Association Response" elif dot11.subtype == 10: return "Disassociation" else: return f"Management ({dot11.subtype})" elif dot11.type == 1: # Control return f"Control ({dot11.subtype})" elif dot11.type == 2: # Data if dot11.subtype == 8: return "QoS Data" elif dot11.subtype == 0: return "Data" else: return f"Data ({dot11.subtype})" else: return f"Unknown ({dot11.type}/{dot11.subtype})" @staticmethod def ra_ta(pkt): """Extract RA and TA from 802.11 frame.""" if not pkt.haslayer(Dot11): return None, None dot11 = pkt[Dot11] ra = dot11.addr1 if hasattr(dot11, 'addr1') else None ta = dot11.addr2 if hasattr(dot11, 'addr2') else None if ra: ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) if ta: ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) return ra, ta @staticmethod def phy_info(pkt): """Extract PHY rate and MCS from packet (if available in radiotap).""" phy_rate = None mcs = None if pkt.haslayer(RadioTap): radiotap = pkt[RadioTap] if hasattr(radiotap, 'Rate'): rate_val = radiotap.Rate if rate_val: phy_rate = rate_val * 0.5 # Convert to Mbps if hasattr(radiotap, 'MCS'): mcs_data = radiotap.MCS if mcs_data and hasattr(mcs_data, 'index'): mcs = mcs_data.index return phy_rate, mcs class PacketAnalyzer: """Analyzes captured packets and generates statistics.""" def __init__(self, parser): self.parser = parser def analyze(self, packets, duration): """Analyze captured packets and generate statistics.""" print("\n=== Capture Statistics ===") total_count = len(packets) print(f"Total packets captured: {total_count}") if total_count == 0: self._print_no_packets_message() return plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) print(f"PLCP headers: {plcp_count}") if total_count > 0: rate = total_count / duration print(f"Packet rate: {rate:.1f} packets/second") print() self._print_sample_packets(packets) self._print_ra_ta_pairs(packets) self._print_phy_histograms(packets) self._print_frame_type_breakdown(packets) self._print_data_frame_analysis(packets) self._print_server_mac_analysis(packets) self._print_summary(total_count, plcp_count) def _print_no_packets_message(self): """Print message when no packets are captured.""" print("\n(No packets captured)") print("\n=== Summary ===") print("✗ No packets captured. Check:") print(f" 1. Is there WiFi traffic on the channel?") print(f" 2. Is the interface actually in monitor mode?") print(f" 3. Try a different channel or longer duration") def _print_sample_packets(self, packets): """Print sample packets.""" print("Sample packets (first 10):") for i, pkt in enumerate(packets[:10]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = self.parser.frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = "N/A" if pkt.haslayer(Dot11): duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") print() def _print_ra_ta_pairs(self, packets): """Print unique RA/TA pairs.""" print("Unique RA/TA pairs (with counts):") ra_ta_pairs = Counter() for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if ra or ta: ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" ra_ta_pairs[pair] += 1 if ra_ta_pairs: for pair, count in ra_ta_pairs.most_common(): print(f" {pair}: {count} frame(s)") else: print(" (no valid RA/TA pairs found)") print() def _print_phy_histograms(self, packets): """Print PHY rate and MCS histograms per RA/TA pair.""" print("PHY Rate and MCS Histograms per RA/TA pair:") rate_histograms = defaultdict(Counter) mcs_histograms = defaultdict(Counter) for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if not (ra or ta): continue if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: continue ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" phy_rate, mcs = self.parser.phy_info(pkt) if phy_rate: rate_histograms[pair][phy_rate] += 1 if mcs is not None: mcs_histograms[pair][mcs] += 1 for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): print(f"\n {pair}:") print(" PHY Rate (Mbps):") if pair in rate_histograms: for rate in sorted(rate_histograms[pair].keys()): print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") else: print(" (no PHY rate data)") print(" MCS Index:") if pair in mcs_histograms: for mcs_val in sorted(mcs_histograms[pair].keys()): print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") else: print(" (no MCS data)") print() def _print_frame_type_breakdown(self, packets): """Print frame type breakdown.""" print("Frame type breakdown:") frame_types = Counter() for pkt in packets: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: frame_types["Management"] += 1 elif dot11.type == 1: frame_types["Control"] += 1 elif dot11.type == 2: frame_types["Data"] += 1 else: frame_types["Unknown"] += 1 for frame_type, count in frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print() def _print_data_frame_analysis(self, packets): """Print data frame analysis.""" print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] qos_count = len(qos_data_frames) print(f" QoS Data frames (type 2, subtype 8): {qos_count}") encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) unencrypted_count = qos_count - encrypted_count print(f" Encrypted: {encrypted_count}") print(f" Unencrypted: {unencrypted_count}") if qos_count > 0: print(" Sample QoS Data frames (likely iperf traffic):") for i, pkt in enumerate(qos_data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") print() def _print_server_mac_analysis(self, packets): """Print analysis of frames involving server MAC.""" server_mac = "80:84:89:93:c4:b6" print(f"Frames involving server MAC ({server_mac}):") server_frames = [] for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): server_frames.append(pkt) server_count = len(server_frames) print(f" Total frames with server MAC: {server_count}") if server_count > 0: server_frame_types = Counter() for pkt in server_frames: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: server_frame_types["Management"] += 1 elif dot11.type == 1: server_frame_types["Control"] += 1 elif dot11.type == 2: server_frame_types["Data"] += 1 print(" Frame type breakdown:") for frame_type, count in server_frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print(" Sample frames:") for i, pkt in enumerate(server_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = self.parser.frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") print() def _print_summary(self, total_count, plcp_count): """Print summary statistics.""" print("=== Summary ===") if total_count > 0: print(f"✓ Monitor mode is working! Captured {total_count} packet(s)") if plcp_count > 0: print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") else: print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") class PacketCapture: """Handles packet capture operations.""" def __init__(self, config): self.config = config self.parser = PacketParser() self.analyzer = PacketAnalyzer(self.parser) def capture_from_pcap(self, pcap_file): """Read and analyze packets from a PCAP file.""" print("=== Reading packets from PCAP file ===") print(f"PCAP file: {pcap_file}") print() try: packets = rdpcap(pcap_file) print(f"Loaded {len(packets)} packets from {pcap_file}") file_size = os.path.getsize(pcap_file) print(f"File size: {file_size} bytes") print() self.analyzer.analyze(packets, 1.0) return 0 except OSError as e: print(f"Error reading PCAP file: {e}") return 1 except Exception as e: print(f"Error reading PCAP file: {e}") import traceback traceback.print_exc() return 1 def capture_live(self, interface, channel, duration): """Capture packets from a live interface.""" print("=== Testing Monitor Mode with scapy ===") print(f"Interface: {interface}") print(f"Channel: {channel}") print(f"Duration: {duration} seconds") print() setup = MonitorModeSetup(interface, channel) if not setup.setup(): if setup.error_message: print(setup.error_message) return 1 if not self._test_capture(interface): return 1 return self._main_capture(interface, duration) def _test_capture(self, interface): """Perform a test capture to verify monitor mode.""" print("\nChecking Data Link Type (1 second test capture)...") print("(This may take up to 2 seconds if no packets are present)") test_packets = [] try: sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen) except OSError as e: print(f"Error during test capture (system error): {e}") return False except Exception as e: print(f"Error during test capture: {e}") return False test_count = len(test_packets) test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) if test_count > 0: print("Sample packets:") for i, pkt in enumerate(test_packets[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") print(f"\nTest capture results:") print(f" Packets captured: {test_count}") print(f" PLCP headers: {test_plcp}") if test_plcp == 0 and test_count > 0: print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") return True def _main_capture(self, interface, duration): """Perform the main packet capture.""" print(f"\n=== Starting scapy capture ({duration} seconds) ===") print("Press Ctrl+C to stop early\n") if self.config.full_packet: print("Capturing full packets...") else: print(f"Capturing packets (snaplen={self.config.snaplen} bytes, header + some payload)...") packets = [] pcap_path = None if self.config.keep_pcap: pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') pcap_path = pcap_file.name pcap_file.close() print(f"Capturing to file: {pcap_path}") capture_error = None try: if self.config.keep_pcap: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) wrpcap(pcap_path, packets) print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") print(f"Keeping pcap file: {pcap_path}") print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)") else: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) except KeyboardInterrupt: print("\nCapture interrupted by user") except OSError as e: capture_error = e print(f"\nError during capture (system error): {e}") except Exception as e: capture_error = e print(f"\nError during capture: {e}") import traceback traceback.print_exc() self.analyzer.analyze(packets, duration) if capture_error: return 1 return 0 class ArgumentParser: """Handles command line argument parsing.""" def __init__(self, config): self.config = config def parse(self): """Parse command line arguments.""" args = sys.argv[1:] if len(args) == 0: self._print_usage() return None args = self._process_flags(args) if self._is_pcap_file(args): return self._parse_pcap_mode(args) return self._parse_live_mode(args) def _print_usage(self): """Print usage information.""" print("Usage:") print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") print(" Read pcap: python3 wifi_monitor.py ") def _process_flags(self, args): """Process command line flags.""" if "--keep-pcap" in args or "-k" in args: self.config.keep_pcap = True args = [a for a in args if a not in ["--keep-pcap", "-k"]] if "--full-packet" in args or "-f" in args: self.config.full_packet = True args = [a for a in args if a not in ["--full-packet", "-f"]] return args def _is_pcap_file(self, args): """Check if first argument is a PCAP file.""" return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')) def _parse_pcap_mode(self, args): """Parse arguments for PCAP file mode.""" pcap_file = args[0] if not os.path.isfile(pcap_file): print(f"Error: PCAP file not found: {pcap_file}") return None return ('pcap', pcap_file, None, None) def _parse_live_mode(self, args): """Parse arguments for live capture mode.""" if len(args) > 0: if args[0].startswith("wl") and len(args[0]) <= 6: self.config.wifi_interface = args[0] channel = int(args[1]) if len(args) > 1 else 36 duration = int(args[2]) if len(args) > 2 else 10 else: channel = int(args[0]) if len(args) > 0 else 36 duration = int(args[1]) if len(args) > 1 else 10 else: channel = 36 duration = 10 return ('live', self.config.wifi_interface, channel, duration) def main(): """Main function with single exit point.""" exit_code = 0 config = Config() arg_parser = ArgumentParser(config) parsed_args = arg_parser.parse() if parsed_args is None: exit_code = 1 else: mode, interface_or_pcap, channel, duration = parsed_args capture = PacketCapture(config) if mode == 'pcap': exit_code = capture.capture_from_pcap(interface_or_pcap) else: exit_code = capture.capture_live(interface_or_pcap, channel, duration) return exit_code if __name__ == "__main__": sys.exit(main())