From b67107e53e045e6dbfe08cfb5b947da8dde587b6 Mon Sep 17 00:00:00 2001 From: Robert McMahon Date: Fri, 13 Feb 2026 15:19:18 -0800 Subject: [PATCH] Refactor WiFi monitor script: improve code quality and rename - Rename test_monitor.py to wifi_monitor.py (better naming) - Delete parse_tshark_pcap.py (deprecated, replaced by wifi_monitor.py) - Refactor to class-based architecture for better modularity: * Config: Configuration management * MonitorModeSetup: Monitor mode setup operations * PacketParser: Packet parsing utilities * PacketAnalyzer: Packet analysis and statistics * PacketCapture: Capture operations (PCAP and live) * ArgumentParser: Command line argument parsing - Add properties where appropriate (snaplen, is_successful, error_message) - Remove 'get_' prefix from utility methods (Python convention) - Fix Python convention violations: * Replace wildcard imports with explicit imports * Use specific exception types instead of bare Exception * Organize imports (stdlib, third-party) * Remove unused imports - Single exit point pattern throughout code - Remove trailing whitespace Co-authored-by: Cursor --- parse_tshark_pcap.py | 404 ------------------------- test_monitor.py | 586 ----------------------------------- wifi_monitor.py | 707 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 707 insertions(+), 990 deletions(-) delete mode 100755 parse_tshark_pcap.py delete mode 100755 test_monitor.py create mode 100755 wifi_monitor.py diff --git a/parse_tshark_pcap.py b/parse_tshark_pcap.py deleted file mode 100755 index 25cd1f2..0000000 --- a/parse_tshark_pcap.py +++ /dev/null @@ -1,404 +0,0 @@ -#!/usr/bin/env python3 -""" -Parse tshark pcap file and extract 802.11 frame information. -This script handles missing fields gracefully and provides detailed statistics. -""" - -import sys -import subprocess -import collections -from typing import List, Dict, Tuple, Optional - -# Field indices (0-based) -FRAME_NUMBER = 0 -FRAME_TIME = 1 -WLAN_RA = 2 -WLAN_TA = 3 -WLAN_FC_TYPE = 4 -WLAN_FC_SUBTYPE = 5 -WLAN_FC_TYPE_SUBTYPE = 6 -WLAN_FC_PROTECTED = 7 -WLAN_FC_RETRY = 8 -WLAN_DURATION = 9 -RADIOTAP_PRESENT = 10 -RADIOTAP_DATARATE = 11 -RADIOTAP_MCS_INDEX = 12 -WLAN_RADIO_DATA_RATE = 13 -WLAN_RADIO_MCS_INDEX = 14 - -def safe_get_field(fields: List[str], index: int, default: str = "N/A") -> str: - """Safely get a field value, handling missing or empty fields.""" - if index < len(fields): - value = fields[index].strip() - if value and value != "-" and value != "": - return value - return default - -def parse_tshark_output(pcap_file: str) -> List[List[str]]: - """Extract fields from pcap file using tshark.""" - fields = [ - "frame.number", - "frame.time", - "wlan.ra", - "wlan.ta", - "wlan.fc.type", - "wlan.fc.subtype", - "wlan.fc.type_subtype", - "wlan.fc.protected", - "wlan.fc.retry", - "wlan.duration", - "radiotap.present", - "radiotap.datarate", - "radiotap.mcs.index", - "wlan_radio.data_rate", - "wlan_radio.mcs.index", - ] - - cmd = [ - "tshark", "-q", "-r", pcap_file, "-n", "-T", "fields" - ] - for field in fields: - cmd.extend(["-e", field]) - - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - check=False # Don't fail on errors - ) - - # Debug: Check if we got any output - if result.stderr: - # Print stderr warnings but don't fail - pass # We'll filter these out - - # Filter out error messages and status lines - lines = [] - raw_lines = result.stdout.splitlines() - - # Debug: If no lines parsed, show what we got - if len(raw_lines) == 0: - return [] - - for line in raw_lines: - line = line.rstrip() # Only strip trailing whitespace, keep leading tabs - # Skip empty lines and tshark status messages - if not line: - continue - if line.startswith("Running as") or line.startswith("Capturing"): - continue - if "tshark:" in line.lower() or "packets captured" in line.lower(): - continue - - # Split by tab to get fields - fields = line.split("\t") - if len(fields) == 0: - continue - - # Check if first field (frame.number) is a valid number - # This handles cases where frame.number might be empty or the line starts with tabs - first_field = fields[0].strip() - # Try to parse as integer - if it succeeds, it's a valid frame number - try: - frame_num = int(first_field) - if frame_num > 0: # Valid frame numbers are positive - lines.append(fields) - except (ValueError, IndexError): - # Not a valid frame number, skip this line - continue - - return lines - - except Exception as e: - print(f"Error running tshark: {e}", file=sys.stderr) - return [] - -def count_packets(packets: List[List[str]]) -> int: - """Count total valid packets.""" - return len(packets) - -def count_plcp_headers(packets: List[List[str]]) -> int: - """Count packets with PLCP headers (radiotap present).""" - count = 0 - for packet in packets: - radiotap = safe_get_field(packet, RADIOTAP_PRESENT, "") - if radiotap and radiotap != "0" and radiotap != "N/A" and radiotap != "-": - count += 1 - return count - -def get_unique_ra_ta_pairs(packets: List[List[str]]) -> Dict[str, int]: - """Count unique RA/TA pairs.""" - pairs = collections.Counter() - for packet in packets: - ra = safe_get_field(packet, WLAN_RA, "N/A") - ta = safe_get_field(packet, WLAN_TA, "N/A") - if ra != "N/A" or ta != "N/A": - pair = f"{ra} -> {ta}" - pairs[pair] += 1 - return dict(pairs) - -def get_phy_rate(packet: List[str]) -> Optional[str]: - """Extract PHY rate from packet (try radiotap first, then wlan_radio).""" - rate = safe_get_field(packet, RADIOTAP_DATARATE, "") - if rate and rate != "0" and rate != "N/A" and rate != "-": - return rate - rate = safe_get_field(packet, WLAN_RADIO_DATA_RATE, "") - if rate and rate != "0" and rate != "N/A" and rate != "-": - return rate - return None - -def get_mcs_index(packet: List[str]) -> Optional[str]: - """Extract MCS index from packet (try radiotap first, then wlan_radio).""" - mcs = safe_get_field(packet, RADIOTAP_MCS_INDEX, "") - if mcs and mcs != "0" and mcs != "255" and mcs != "N/A" and mcs != "-": - return mcs - mcs = safe_get_field(packet, WLAN_RADIO_MCS_INDEX, "") - if mcs and mcs != "0" and mcs != "255" and mcs != "N/A" and mcs != "-": - return mcs - return None - -def get_histograms_per_pair(packets: List[List[str]]) -> Dict[str, Dict[str, Dict[str, int]]]: - """Generate PHY rate and MCS histograms per RA/TA pair.""" - histograms = {} # {pair: {"rate": {rate: count}, "mcs": {mcs: count}}} - - for packet in packets: - ra = safe_get_field(packet, WLAN_RA, "N/A") - ta = safe_get_field(packet, WLAN_TA, "N/A") - if ra == "N/A" and ta == "N/A": - continue - - pair = f"{ra} -> {ta}" - frame_type = safe_get_field(packet, WLAN_FC_TYPE, "") - - # Only process data frames (type 2) for histograms - if frame_type != "2": - continue - - if pair not in histograms: - histograms[pair] = {"rate": collections.Counter(), "mcs": collections.Counter()} - - # Get PHY rate - rate = get_phy_rate(packet) - if rate: - histograms[pair]["rate"][rate] += 1 - - # Get MCS index - mcs = get_mcs_index(packet) - if mcs: - histograms[pair]["mcs"][mcs] += 1 - - return histograms - -def get_frame_type_breakdown(packets: List[List[str]]) -> Dict[str, int]: - """Count frames by type.""" - breakdown = collections.Counter() - for packet in packets: - frame_type = safe_get_field(packet, WLAN_FC_TYPE, "unknown") - type_name = "Unknown" - if frame_type == "0": - type_name = "Management" - elif frame_type == "1": - type_name = "Control" - elif frame_type == "2": - type_name = "Data" - breakdown[type_name] += 1 - return dict(breakdown) - -def get_data_frame_analysis(packets: List[List[str]]) -> Tuple[int, int, int]: - """Analyze data frames (QoS Data frames, subtype 8).""" - data_frames = [] - for packet in packets: - frame_type = safe_get_field(packet, WLAN_FC_TYPE, "") - frame_subtype = safe_get_field(packet, WLAN_FC_SUBTYPE, "") - if frame_type == "2" and frame_subtype == "8": # QoS Data frames - data_frames.append(packet) - - encrypted = 0 - unencrypted = 0 - for packet in data_frames: - protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") - if protected == "1" or protected == "1.0": - encrypted += 1 - elif protected and protected != "-" and protected != "N/A": - unencrypted += 1 - - return len(data_frames), encrypted, unencrypted - -def format_sample_packet(packet: List[str], index: int) -> str: - """Format a packet for display.""" - frame_num = safe_get_field(packet, FRAME_NUMBER, str(index + 1)) - ra = safe_get_field(packet, WLAN_RA, "N/A") - ta = safe_get_field(packet, WLAN_TA, "N/A") - frame_type = safe_get_field(packet, WLAN_FC_TYPE, "N/A") - frame_subtype = safe_get_field(packet, WLAN_FC_SUBTYPE, "N/A") - protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") - retry = safe_get_field(packet, WLAN_FC_RETRY, "") - duration = safe_get_field(packet, WLAN_DURATION, "N/A") - radiotap = safe_get_field(packet, RADIOTAP_PRESENT, "") - - protected_str = "encrypted" if (protected == "1" or protected == "1.0") else "unencrypted" - retry_str = " [retry]" if (retry == "1" or retry == "1.0") else "" - plcp_str = "yes" if (radiotap == "1" or radiotap == "1.0") else ("no" if radiotap != "N/A" and radiotap != "-" else "N/A") - - return f" Frame {frame_num}: RA={ra}, TA={ta}, type={frame_type}/{frame_subtype}, {protected_str}, dur={duration}, PLCP={plcp_str}{retry_str}" - -def main(): - if len(sys.argv) < 2: - print("Usage: parse_tshark_pcap.py [duration_seconds] [raw_packet_count]", file=sys.stderr) - sys.exit(1) - - pcap_file = sys.argv[1] - duration = float(sys.argv[2]) if len(sys.argv) > 2 else 10.0 - raw_packet_count = int(sys.argv[3]) if len(sys.argv) > 3 else 0 - - print("=== Capture Statistics ===") - - # Parse packets - packets = parse_tshark_output(pcap_file) - final_count = count_packets(packets) - - # Debug: If no packets parsed but raw count shows packets, try to see what tshark output - if final_count == 0 and raw_packet_count > 0: - # Try a simple test to see if tshark can read the file - import subprocess - test_result = subprocess.run( - ["tshark", "-q", "-r", pcap_file, "-n", "-T", "fields", "-e", "frame.number"], - capture_output=True, - text=True - ) - if test_result.stdout: - sample_lines = test_result.stdout.splitlines()[:5] - print(f"Debug: tshark -T fields -e frame.number returned {len(test_result.stdout.splitlines())} lines") - print(f"Debug: First 5 lines: {sample_lines}") - else: - print("Debug: tshark returned no output") - plcp_count = count_plcp_headers(packets) - - # Check for parsing issues - if final_count < raw_packet_count and raw_packet_count > 10: - print(f"Warning: Parsed {final_count} packets but pcap file contains {raw_packet_count} packets") - print(" This may indicate field extraction issues.") - print() - - # Display basic stats - print(f"Total packets captured: {final_count}") - print(f"PLCP headers: {plcp_count}") - if final_count > 0: - rate = final_count / duration - print(f"Packet rate: {rate:.1f} packets/second") - print() - - if final_count == 0: - print("(No packets captured)") - print() - print("=== Summary ===") - print("✗ No packets captured. Check:") - print(f" 1. Is there WiFi traffic on the channel?") - print(f" 2. Is the interface actually in monitor mode?") - print(f" 3. Try a different channel or longer duration") - return - - # Display sample packets - print("Sample packets (first 10):") - for i, packet in enumerate(packets[:10]): - print(format_sample_packet(packet, i)) - print() - - # Unique RA/TA pairs - print("Unique RA/TA pairs (with counts):") - pairs = get_unique_ra_ta_pairs(packets) - if pairs: - for pair, count in sorted(pairs.items(), key=lambda x: x[1], reverse=True): - print(f" {pair}: {count} frame(s)") - else: - print(" (no valid RA/TA pairs found)") - print() - - # PHY rate and MCS histograms per RA/TA pair - print("PHY Rate and MCS Histograms per RA/TA pair:") - histograms = get_histograms_per_pair(packets) - for pair in sorted(histograms.keys()): - print(f"\n {pair}:") - - # PHY Rate histogram - print(" PHY Rate (Mbps):") - rate_hist = histograms[pair]["rate"] - if rate_hist: - for rate in sorted(rate_hist.keys(), key=lambda x: float(x) if x.replace(".", "").isdigit() else 0): - print(f" {rate} Mbps: {rate_hist[rate]} frame(s)") - else: - print(" (no PHY rate data)") - - # MCS histogram - print(" MCS Index:") - mcs_hist = histograms[pair]["mcs"] - if mcs_hist: - for mcs in sorted(mcs_hist.keys(), key=lambda x: int(x) if x.isdigit() else 0): - print(f" MCS {mcs}: {mcs_hist[mcs]} frame(s)") - else: - print(" (no MCS data)") - print() - - # Frame type breakdown - print("Frame type breakdown:") - breakdown = get_frame_type_breakdown(packets) - for frame_type, count in sorted(breakdown.items(), key=lambda x: x[1], reverse=True): - print(f" {frame_type}: {count} frame(s)") - print() - - # Data frame analysis - print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") - data_count, encrypted, unencrypted = get_data_frame_analysis(packets) - print(f" QoS Data frames (type 2, subtype 8): {data_count}") - print(f" Encrypted: {encrypted}") - print(f" Unencrypted: {unencrypted}") - - if data_count > 0: - print(" Sample QoS Data frames (likely iperf traffic):") - data_frames = [p for p in packets if safe_get_field(p, WLAN_FC_TYPE, "") == "2" and safe_get_field(p, WLAN_FC_SUBTYPE, "") == "8"] - for i, packet in enumerate(data_frames[:5]): - frame_num = safe_get_field(packet, FRAME_NUMBER, str(i + 1)) - ra = safe_get_field(packet, WLAN_RA, "N/A") - ta = safe_get_field(packet, WLAN_TA, "N/A") - protected = safe_get_field(packet, WLAN_FC_PROTECTED, "") - retry = safe_get_field(packet, WLAN_FC_RETRY, "") - duration = safe_get_field(packet, WLAN_DURATION, "N/A") - protected_str = "encrypted" if (protected == "1" or protected == "1.0") else "unencrypted" - retry_str = " [retry]" if (retry == "1" or retry == "1.0") else "" - print(f" Frame {frame_num}: RA={ra}, TA={ta}, {protected_str}, dur={duration}{retry_str}") - print() - - # Frames involving server MAC - server_mac = "80:84:89:93:c4:b6" - print(f"Frames involving server MAC ({server_mac}):") - server_frames = [] - for packet in packets: - ra = safe_get_field(packet, WLAN_RA, "") - ta = safe_get_field(packet, WLAN_TA, "") - if ra == server_mac or ta == server_mac: - server_frames.append(packet) - - server_count = len(server_frames) - print(f" Total frames with server MAC: {server_count}") - if server_count > 0: - server_breakdown = get_frame_type_breakdown(server_frames) - print(" Frame type breakdown:") - for frame_type, count in sorted(server_breakdown.items(), key=lambda x: x[1], reverse=True): - print(f" {frame_type}: {count} frame(s)") - print(" Sample frames:") - for i, packet in enumerate(server_frames[:5]): - print(format_sample_packet(packet, i)) - print() - - # Summary - print("=== Summary ===") - if final_count > 0: - print(f"✓ Monitor mode is working! Captured {final_count} packet(s)") - if plcp_count > 0: - print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") - else: - print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") - -if __name__ == "__main__": - main() diff --git a/test_monitor.py b/test_monitor.py deleted file mode 100755 index 47a0295..0000000 --- a/test_monitor.py +++ /dev/null @@ -1,586 +0,0 @@ -#!/usr/bin/env python3 -""" -Monitor mode WiFi packet capture and analysis using scapy. -Replaces the bash script with a pure Python solution. - -Usage: - # Live capture from interface: - sudo python3 test_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] - sudo python3 test_monitor.py wlan0 36 10 - sudo python3 test_monitor.py wlan0 36 10 --keep-pcap - sudo python3 test_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) - - # Read from pcap file: - python3 test_monitor.py - python3 test_monitor.py /tmp/capture.pcap -""" - -import sys -import os -import subprocess -import signal -import time -import tempfile -from datetime import datetime -from collections import Counter, defaultdict -from scapy.all import * -from scapy.layers.dot11 import Dot11, Dot11QoS, Dot11Beacon, Dot11ProbeReq, Dot11ProbeResp -from scapy.layers.dot11 import Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Deauth, Dot11Disas - -# Configuration -WIFI_INTERFACE = os.environ.get("WIFI_INTERFACE", "wlan0") -KEEP_PCAP = False -FULL_PACKET = False -# Default snaplen: 256 bytes (enough for 802.11 header ~30-40 bytes + some payload) -# 802.11 header can be up to ~40 bytes, plus radiotap header ~20-30 bytes -# 256 bytes gives us header + ~200 bytes of payload, which is usually enough for analysis -DEFAULT_SNAPLEN = 256 - -def parse_args(): - """Parse command line arguments. - - Returns: - tuple: (mode, interface_or_pcap, channel, duration) - mode: 'live' or 'pcap' - interface_or_pcap: interface name (for live) or pcap file path (for pcap) - channel: channel number (for live mode only) - duration: capture duration in seconds (for live mode only) - """ - global WIFI_INTERFACE, KEEP_PCAP, FULL_PACKET - - args = sys.argv[1:] - - if len(args) == 0: - print("Usage:") - print(" Live capture: sudo python3 test_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") - print(" Read pcap: python3 test_monitor.py ") - sys.exit(1) - - # Check for --keep-pcap flag - if "--keep-pcap" in args or "-k" in args: - KEEP_PCAP = True - args = [a for a in args if a not in ["--keep-pcap", "-k"]] - - # Check for --full-packet flag - if "--full-packet" in args or "-f" in args: - FULL_PACKET = True - args = [a for a in args if a not in ["--full-packet", "-f"]] - - # Check if first argument is a pcap file (ends with .pcap or .cap) - if len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')): - pcap_file = args[0] - if not os.path.isfile(pcap_file): - print(f"Error: PCAP file not found: {pcap_file}") - sys.exit(1) - return ('pcap', pcap_file, None, None) - - # Otherwise, parse as live capture arguments - if len(args) > 0: - # Check if first arg is an interface name - if args[0].startswith("wl") and len(args[0]) <= 6: - WIFI_INTERFACE = args[0] - CHANNEL = int(args[1]) if len(args) > 1 else 36 - DURATION = int(args[2]) if len(args) > 2 else 10 - else: - # First arg is channel (using default interface) - CHANNEL = int(args[0]) if len(args) > 0 else 36 - DURATION = int(args[1]) if len(args) > 1 else 10 - else: - CHANNEL = 36 - DURATION = 10 - - return ('live', WIFI_INTERFACE, CHANNEL, DURATION) - -def setup_monitor_mode(interface, channel): - """Set WiFi interface to monitor mode.""" - print(f"=== Setting up monitor mode on {interface} ===") - - # Try to unmanage interface from NetworkManager - try: - result = subprocess.run(["which", "nmcli"], capture_output=True, text=True) - if result.returncode == 0: - print("Unmanaging interface from NetworkManager...") - subprocess.run( - ["nmcli", "device", "set", interface, "managed", "no"], - capture_output=True, - stderr=subprocess.DEVNULL - ) - except Exception: - pass - - # Unblock WiFi if blocked by rfkill - try: - subprocess.run(["rfkill", "unblock", "wifi"], check=False) - except Exception: - pass - - # Check current mode - try: - result = subprocess.run( - ["iw", "dev", interface, "info"], - capture_output=True, - text=True, - check=True - ) - if "type monitor" in result.stdout: - print(f"Already in monitor mode") - else: - print(f"Setting {interface} to monitor mode...") - - # Bring down interface - subprocess.run( - ["ip", "link", "set", interface, "down"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - time.sleep(0.5) - - # Set monitor mode - subprocess.run( - ["iw", "dev", interface, "set", "type", "monitor"], - check=True, - capture_output=True, - text=True - ) - time.sleep(0.5) - - # Bring up interface - subprocess.run( - ["ip", "link", "set", interface, "up"], - capture_output=True, - text=True, - check=True - ) - print("Monitor mode activated") - except subprocess.CalledProcessError as e: - print(f"Error setting monitor mode: {e}") - if hasattr(e, 'stderr') and e.stderr: - print(f"Error details: {e.stderr}") - sys.exit(1) - - # Set channel - try: - subprocess.run( - ["iw", "dev", interface, "set", "channel", str(channel)], - check=True, - capture_output=True - ) - print(f"Channel set to {channel}") - except subprocess.CalledProcessError as e: - print(f"Error setting channel: {e}") - print("Continuing anyway - channel may not be set correctly") - - # Verify monitor mode - print("\nVerifying monitor mode...") - try: - result = subprocess.run( - ["iw", "dev", interface, "info"], - capture_output=True, - text=True, - check=True - ) - for line in result.stdout.splitlines(): - if "type" in line or "channel" in line: - print(f"\t{line.strip()}") - except Exception: - pass - -def get_frame_type_name(pkt): - """Get human-readable frame type name.""" - if not pkt.haslayer(Dot11): - return "Unknown" - - dot11 = pkt[Dot11] - fc = dot11.FCfield - - if dot11.type == 0: # Management - if dot11.subtype == 8: - return "Beacon" - elif dot11.subtype == 4: - return "Probe Request" - elif dot11.subtype == 5: - return "Probe Response" - elif dot11.subtype == 11: - return "Authentication" - elif dot11.subtype == 12: - return "Deauthentication" - elif dot11.subtype == 0: - return "Association Request" - elif dot11.subtype == 1: - return "Association Response" - elif dot11.subtype == 10: - return "Disassociation" - else: - return f"Management ({dot11.subtype})" - elif dot11.type == 1: # Control - return f"Control ({dot11.subtype})" - elif dot11.type == 2: # Data - if dot11.subtype == 8: - return "QoS Data" - elif dot11.subtype == 0: - return "Data" - else: - return f"Data ({dot11.subtype})" - else: - return f"Unknown ({dot11.type}/{dot11.subtype})" - -def get_ra_ta(pkt): - """Extract RA and TA from 802.11 frame.""" - if not pkt.haslayer(Dot11): - return None, None - - dot11 = pkt[Dot11] - - # RA is typically addr1, TA is typically addr2 - # But this depends on frame type and direction - ra = dot11.addr1 if hasattr(dot11, 'addr1') else None - ta = dot11.addr2 if hasattr(dot11, 'addr2') else None - - # Format MAC addresses - if ra: - ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) - if ta: - ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) - - return ra, ta - -def get_phy_info(pkt): - """Extract PHY rate and MCS from packet (if available in radiotap).""" - phy_rate = None - mcs = None - - # Check for radiotap layer - if pkt.haslayer(RadioTap): - radiotap = pkt[RadioTap] - # Try to get rate (in 0.5 Mbps units) - if hasattr(radiotap, 'Rate'): - rate_val = radiotap.Rate - if rate_val: - phy_rate = rate_val * 0.5 # Convert to Mbps - # Try to get MCS - if hasattr(radiotap, 'MCS'): - mcs_data = radiotap.MCS - if mcs_data and hasattr(mcs_data, 'index'): - mcs = mcs_data.index - - return phy_rate, mcs - -def analyze_packets(packets, duration): - """Analyze captured packets and generate statistics.""" - print("\n=== Capture Statistics ===") - - total_count = len(packets) - print(f"Total packets captured: {total_count}") - - if total_count == 0: - print("\n(No packets captured)") - print("\n=== Summary ===") - print("✗ No packets captured. Check:") - print(f" 1. Is there WiFi traffic on the channel?") - print(f" 2. Is the interface actually in monitor mode?") - print(f" 3. Try a different channel or longer duration") - return - - # Count PLCP headers (radiotap present) - plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) - print(f"PLCP headers: {plcp_count}") - - if total_count > 0: - rate = total_count / duration - print(f"Packet rate: {rate:.1f} packets/second") - print() - - # Sample packets - print("Sample packets (first 10):") - for i, pkt in enumerate(packets[:10]): - ra, ta = get_ra_ta(pkt) - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - - frame_type = get_frame_type_name(pkt) - - # Check if encrypted - encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" - - # Check retry - retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" - - # Duration - duration_val = "N/A" - if pkt.haslayer(Dot11): - duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" - - # PLCP - plcp = "yes" if pkt.haslayer(RadioTap) else "no" - - print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") - print() - - # Unique RA/TA pairs - print("Unique RA/TA pairs (with counts):") - ra_ta_pairs = Counter() - for pkt in packets: - ra, ta = get_ra_ta(pkt) - if ra or ta: - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - pair = f"{ra_str} -> {ta_str}" - ra_ta_pairs[pair] += 1 - - if ra_ta_pairs: - for pair, count in ra_ta_pairs.most_common(): - print(f" {pair}: {count} frame(s)") - else: - print(" (no valid RA/TA pairs found)") - print() - - # PHY rate and MCS histograms per RA/TA pair - print("PHY Rate and MCS Histograms per RA/TA pair:") - rate_histograms = defaultdict(Counter) - mcs_histograms = defaultdict(Counter) - - for pkt in packets: - ra, ta = get_ra_ta(pkt) - if not (ra or ta): - continue - - # Only process data frames (type 2) - if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: - continue - - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - pair = f"{ra_str} -> {ta_str}" - - phy_rate, mcs = get_phy_info(pkt) - if phy_rate: - rate_histograms[pair][phy_rate] += 1 - if mcs is not None: - mcs_histograms[pair][mcs] += 1 - - for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): - print(f"\n {pair}:") - - # PHY Rate histogram - print(" PHY Rate (Mbps):") - if pair in rate_histograms: - for rate in sorted(rate_histograms[pair].keys()): - print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") - else: - print(" (no PHY rate data)") - - # MCS histogram - print(" MCS Index:") - if pair in mcs_histograms: - for mcs_val in sorted(mcs_histograms[pair].keys()): - print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") - else: - print(" (no MCS data)") - print() - - # Frame type breakdown - print("Frame type breakdown:") - frame_types = Counter() - for pkt in packets: - if pkt.haslayer(Dot11): - dot11 = pkt[Dot11] - if dot11.type == 0: - frame_types["Management"] += 1 - elif dot11.type == 1: - frame_types["Control"] += 1 - elif dot11.type == 2: - frame_types["Data"] += 1 - else: - frame_types["Unknown"] += 1 - - for frame_type, count in frame_types.most_common(): - print(f" {frame_type}: {count} frame(s)") - print() - - # Data frame analysis (QoS Data frames, subtype 8) - print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") - qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] - qos_count = len(qos_data_frames) - print(f" QoS Data frames (type 2, subtype 8): {qos_count}") - - encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) - unencrypted_count = qos_count - encrypted_count - print(f" Encrypted: {encrypted_count}") - print(f" Unencrypted: {unencrypted_count}") - - if qos_count > 0: - print(" Sample QoS Data frames (likely iperf traffic):") - for i, pkt in enumerate(qos_data_frames[:5]): - ra, ta = get_ra_ta(pkt) - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" - retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" - duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" - print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") - print() - - # Frames involving server MAC (80:84:89:93:c4:b6) - server_mac = "80:84:89:93:c4:b6" - print(f"Frames involving server MAC ({server_mac}):") - server_frames = [] - for pkt in packets: - ra, ta = get_ra_ta(pkt) - if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): - server_frames.append(pkt) - - server_count = len(server_frames) - print(f" Total frames with server MAC: {server_count}") - if server_count > 0: - server_frame_types = Counter() - for pkt in server_frames: - if pkt.haslayer(Dot11): - dot11 = pkt[Dot11] - if dot11.type == 0: - server_frame_types["Management"] += 1 - elif dot11.type == 1: - server_frame_types["Control"] += 1 - elif dot11.type == 2: - server_frame_types["Data"] += 1 - - print(" Frame type breakdown:") - for frame_type, count in server_frame_types.most_common(): - print(f" {frame_type}: {count} frame(s)") - - print(" Sample frames:") - for i, pkt in enumerate(server_frames[:5]): - ra, ta = get_ra_ta(pkt) - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - frame_type = get_frame_type_name(pkt) - encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" - retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" - duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" - print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") - print() - - # Summary - print("=== Summary ===") - if total_count > 0: - print(f"✓ Monitor mode is working! Captured {total_count} packet(s)") - if plcp_count > 0: - print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") - else: - print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") - -def packet_handler(pkt, packets_list): - """Callback function to handle captured packets.""" - packets_list.append(pkt) - -def main(): - """Main function.""" - mode, interface_or_pcap, channel, duration = parse_args() - - if mode == 'pcap': - # Read from pcap file - pcap_file = interface_or_pcap - print("=== Reading packets from PCAP file ===") - print(f"PCAP file: {pcap_file}") - print() - - try: - packets = rdpcap(pcap_file) - print(f"Loaded {len(packets)} packets from {pcap_file}") - - # Get file size - file_size = os.path.getsize(pcap_file) - print(f"File size: {file_size} bytes") - print() - - # Analyze packets (duration doesn't matter for pcap mode) - analyze_packets(packets, 1.0) # Use 1.0 as placeholder duration - - except Exception as e: - print(f"Error reading PCAP file: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - - else: - # Live capture from interface - interface = interface_or_pcap - - print("=== Testing Monitor Mode with scapy ===") - print(f"Interface: {interface}") - print(f"Channel: {channel}") - print(f"Duration: {duration} seconds") - print() - - # Setup monitor mode - setup_monitor_mode(interface, channel) - - # Test capture (1 second) - print("\nChecking Data Link Type (1 second test capture)...") - print("(This may take up to 2 seconds if no packets are present)") - - test_packets = [] - snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN - try: - sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=snaplen) - except Exception as e: - print(f"Error during test capture: {e}") - - test_count = len(test_packets) - test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) - - if test_count > 0: - print("Sample packets:") - for i, pkt in enumerate(test_packets[:5]): - ra, ta = get_ra_ta(pkt) - ra_str = ra if ra else "N/A" - ta_str = ta if ta else "N/A" - plcp = "yes" if pkt.haslayer(RadioTap) else "no" - print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") - - print(f"\nTest capture results:") - print(f" Packets captured: {test_count}") - print(f" PLCP headers: {test_plcp}") - if test_plcp == 0 and test_count > 0: - print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") - - # Main capture - print(f"\n=== Starting scapy capture ({duration} seconds) ===") - print("Press Ctrl+C to stop early\n") - - # Set snaplen based on FULL_PACKET flag - snaplen = 0 if FULL_PACKET else DEFAULT_SNAPLEN - if FULL_PACKET: - print("Capturing full packets...") - else: - print(f"Capturing packets (snaplen={snaplen} bytes, header + some payload)...") - - packets = [] - pcap_file = None - - if KEEP_PCAP: - pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') - pcap_path = pcap_file.name - pcap_file.close() - print(f"Capturing to file: {pcap_path}") - - try: - if KEEP_PCAP: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen) - wrpcap(pcap_path, packets) - print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") - print(f"Keeping pcap file: {pcap_path}") - print(f" (Use: python3 test_monitor.py {pcap_path} to analyze)") - else: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=snaplen) - except KeyboardInterrupt: - print("\nCapture interrupted by user") - except Exception as e: - print(f"\nError during capture: {e}") - import traceback - traceback.print_exc() - - # Analyze packets - analyze_packets(packets, duration) - -if __name__ == "__main__": - main() diff --git a/wifi_monitor.py b/wifi_monitor.py new file mode 100755 index 0000000..fd6155a --- /dev/null +++ b/wifi_monitor.py @@ -0,0 +1,707 @@ +#!/usr/bin/env python3 +""" +Monitor mode WiFi packet capture and analysis using scapy. +Replaces the bash script with a pure Python solution. + +Usage: + # Live capture from interface: + sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] + sudo python3 wifi_monitor.py wlan0 36 10 + sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap + sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) + + # Read from pcap file: + python3 wifi_monitor.py + python3 wifi_monitor.py /tmp/capture.pcap +""" + +# Standard library imports +import os +import subprocess +import sys +import tempfile +import time +from collections import Counter, defaultdict + +# Third-party imports +from scapy.all import RadioTap, rdpcap, sniff, wrpcap +from scapy.layers.dot11 import ( + Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon, + Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS +) + + +class Config: + """Configuration class for capture settings.""" + def __init__(self): + self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0") + self.keep_pcap = False + self.full_packet = False + self.default_snaplen = 256 # Header + some payload + + @property + def snaplen(self): + """Get snaplen value based on full_packet setting.""" + return 0 if self.full_packet else self.default_snaplen + + +class MonitorModeSetup: + """Handles WiFi interface monitor mode setup.""" + def __init__(self, interface, channel): + self.interface = interface + self.channel = channel + self._error_message = None + + @property + def error_message(self): + """Get error message if setup failed.""" + return self._error_message + + @property + def is_successful(self): + """Check if setup was successful.""" + return self._error_message is None + + def setup(self): + """Set WiFi interface to monitor mode.""" + result = True + print(f"=== Setting up monitor mode on {self.interface} ===") + + if not self._unmanage_networkmanager(): + result = False + + self._unblock_wifi() + + if not self._set_monitor_mode(): + result = False + + if not self._set_channel(): + result = False + + self._verify_monitor_mode() + + return result + + def _unmanage_networkmanager(self): + """Try to unmanage interface from NetworkManager.""" + try: + result = subprocess.run(["which", "nmcli"], capture_output=True, text=True) + if result.returncode == 0: + print("Unmanaging interface from NetworkManager...") + subprocess.run( + ["nmcli", "device", "set", self.interface, "managed", "no"], + capture_output=True, + stderr=subprocess.DEVNULL + ) + except OSError: + pass + return True + + def _unblock_wifi(self): + """Unblock WiFi if blocked by rfkill.""" + try: + subprocess.run(["rfkill", "unblock", "wifi"], check=False) + except OSError: + pass + + def _set_monitor_mode(self): + """Set interface to monitor mode.""" + try: + result = subprocess.run( + ["iw", "dev", self.interface, "info"], + capture_output=True, + text=True, + check=True + ) + if "type monitor" in result.stdout: + print(f"Already in monitor mode") + return True + + print(f"Setting {self.interface} to monitor mode...") + + subprocess.run( + ["ip", "link", "set", self.interface, "down"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + time.sleep(0.5) + + subprocess.run( + ["iw", "dev", self.interface, "set", "type", "monitor"], + check=True, + capture_output=True, + text=True + ) + time.sleep(0.5) + + subprocess.run( + ["ip", "link", "set", self.interface, "up"], + check=True, + capture_output=True, + text=True + ) + print("Monitor mode activated") + return True + except subprocess.CalledProcessError as e: + self._error_message = f"Error setting monitor mode: {e}" + if hasattr(e, 'stderr') and e.stderr: + self._error_message += f"\nError details: {e.stderr}" + print(self._error_message) + return False + + def _set_channel(self): + """Set channel for monitor mode interface.""" + try: + subprocess.run( + ["iw", "dev", self.interface, "set", "channel", str(self.channel)], + check=True, + capture_output=True + ) + print(f"Channel set to {self.channel}") + return True + except subprocess.CalledProcessError as e: + print(f"Error setting channel: {e}") + print("Continuing anyway - channel may not be set correctly") + return True # Non-fatal + + def _verify_monitor_mode(self): + """Verify monitor mode is active.""" + print("\nVerifying monitor mode...") + try: + result = subprocess.run( + ["iw", "dev", self.interface, "info"], + capture_output=True, + text=True, + check=True + ) + for line in result.stdout.splitlines(): + if "type" in line or "channel" in line: + print(f"\t{line.strip()}") + except (OSError, subprocess.SubprocessError): + pass + + +class PacketParser: + """Handles parsing of 802.11 packet fields.""" + @staticmethod + def frame_type_name(pkt): + """Get human-readable frame type name.""" + if not pkt.haslayer(Dot11): + return "Unknown" + + dot11 = pkt[Dot11] + fc = dot11.FCfield + + if dot11.type == 0: # Management + if dot11.subtype == 8: + return "Beacon" + elif dot11.subtype == 4: + return "Probe Request" + elif dot11.subtype == 5: + return "Probe Response" + elif dot11.subtype == 11: + return "Authentication" + elif dot11.subtype == 12: + return "Deauthentication" + elif dot11.subtype == 0: + return "Association Request" + elif dot11.subtype == 1: + return "Association Response" + elif dot11.subtype == 10: + return "Disassociation" + else: + return f"Management ({dot11.subtype})" + elif dot11.type == 1: # Control + return f"Control ({dot11.subtype})" + elif dot11.type == 2: # Data + if dot11.subtype == 8: + return "QoS Data" + elif dot11.subtype == 0: + return "Data" + else: + return f"Data ({dot11.subtype})" + else: + return f"Unknown ({dot11.type}/{dot11.subtype})" + + @staticmethod + def ra_ta(pkt): + """Extract RA and TA from 802.11 frame.""" + if not pkt.haslayer(Dot11): + return None, None + + dot11 = pkt[Dot11] + + ra = dot11.addr1 if hasattr(dot11, 'addr1') else None + ta = dot11.addr2 if hasattr(dot11, 'addr2') else None + + if ra: + ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) + if ta: + ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) + + return ra, ta + + @staticmethod + def phy_info(pkt): + """Extract PHY rate and MCS from packet (if available in radiotap).""" + phy_rate = None + mcs = None + + if pkt.haslayer(RadioTap): + radiotap = pkt[RadioTap] + if hasattr(radiotap, 'Rate'): + rate_val = radiotap.Rate + if rate_val: + phy_rate = rate_val * 0.5 # Convert to Mbps + if hasattr(radiotap, 'MCS'): + mcs_data = radiotap.MCS + if mcs_data and hasattr(mcs_data, 'index'): + mcs = mcs_data.index + + return phy_rate, mcs + + +class PacketAnalyzer: + """Analyzes captured packets and generates statistics.""" + def __init__(self, parser): + self.parser = parser + + def analyze(self, packets, duration): + """Analyze captured packets and generate statistics.""" + print("\n=== Capture Statistics ===") + + total_count = len(packets) + print(f"Total packets captured: {total_count}") + + if total_count == 0: + self._print_no_packets_message() + return + + plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) + print(f"PLCP headers: {plcp_count}") + + if total_count > 0: + rate = total_count / duration + print(f"Packet rate: {rate:.1f} packets/second") + print() + + self._print_sample_packets(packets) + self._print_ra_ta_pairs(packets) + self._print_phy_histograms(packets) + self._print_frame_type_breakdown(packets) + self._print_data_frame_analysis(packets) + self._print_server_mac_analysis(packets) + self._print_summary(total_count, plcp_count) + + def _print_no_packets_message(self): + """Print message when no packets are captured.""" + print("\n(No packets captured)") + print("\n=== Summary ===") + print("✗ No packets captured. Check:") + print(f" 1. Is there WiFi traffic on the channel?") + print(f" 2. Is the interface actually in monitor mode?") + print(f" 3. Try a different channel or longer duration") + + def _print_sample_packets(self, packets): + """Print sample packets.""" + print("Sample packets (first 10):") + for i, pkt in enumerate(packets[:10]): + ra, ta = self.parser.ra_ta(pkt) + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + + frame_type = self.parser.frame_type_name(pkt) + + encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" + + retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" + + duration_val = "N/A" + if pkt.haslayer(Dot11): + duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" + + plcp = "yes" if pkt.haslayer(RadioTap) else "no" + + print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") + print() + + def _print_ra_ta_pairs(self, packets): + """Print unique RA/TA pairs.""" + print("Unique RA/TA pairs (with counts):") + ra_ta_pairs = Counter() + for pkt in packets: + ra, ta = self.parser.ra_ta(pkt) + if ra or ta: + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + pair = f"{ra_str} -> {ta_str}" + ra_ta_pairs[pair] += 1 + + if ra_ta_pairs: + for pair, count in ra_ta_pairs.most_common(): + print(f" {pair}: {count} frame(s)") + else: + print(" (no valid RA/TA pairs found)") + print() + + def _print_phy_histograms(self, packets): + """Print PHY rate and MCS histograms per RA/TA pair.""" + print("PHY Rate and MCS Histograms per RA/TA pair:") + rate_histograms = defaultdict(Counter) + mcs_histograms = defaultdict(Counter) + + for pkt in packets: + ra, ta = self.parser.ra_ta(pkt) + if not (ra or ta): + continue + + if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: + continue + + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + pair = f"{ra_str} -> {ta_str}" + + phy_rate, mcs = self.parser.phy_info(pkt) + if phy_rate: + rate_histograms[pair][phy_rate] += 1 + if mcs is not None: + mcs_histograms[pair][mcs] += 1 + + for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): + print(f"\n {pair}:") + + print(" PHY Rate (Mbps):") + if pair in rate_histograms: + for rate in sorted(rate_histograms[pair].keys()): + print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") + else: + print(" (no PHY rate data)") + + print(" MCS Index:") + if pair in mcs_histograms: + for mcs_val in sorted(mcs_histograms[pair].keys()): + print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") + else: + print(" (no MCS data)") + print() + + def _print_frame_type_breakdown(self, packets): + """Print frame type breakdown.""" + print("Frame type breakdown:") + frame_types = Counter() + for pkt in packets: + if pkt.haslayer(Dot11): + dot11 = pkt[Dot11] + if dot11.type == 0: + frame_types["Management"] += 1 + elif dot11.type == 1: + frame_types["Control"] += 1 + elif dot11.type == 2: + frame_types["Data"] += 1 + else: + frame_types["Unknown"] += 1 + + for frame_type, count in frame_types.most_common(): + print(f" {frame_type}: {count} frame(s)") + print() + + def _print_data_frame_analysis(self, packets): + """Print data frame analysis.""" + print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") + qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] + qos_count = len(qos_data_frames) + print(f" QoS Data frames (type 2, subtype 8): {qos_count}") + + encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) + unencrypted_count = qos_count - encrypted_count + print(f" Encrypted: {encrypted_count}") + print(f" Unencrypted: {unencrypted_count}") + + if qos_count > 0: + print(" Sample QoS Data frames (likely iperf traffic):") + for i, pkt in enumerate(qos_data_frames[:5]): + ra, ta = self.parser.ra_ta(pkt) + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" + retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" + duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" + print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") + print() + + def _print_server_mac_analysis(self, packets): + """Print analysis of frames involving server MAC.""" + server_mac = "80:84:89:93:c4:b6" + print(f"Frames involving server MAC ({server_mac}):") + server_frames = [] + for pkt in packets: + ra, ta = self.parser.ra_ta(pkt) + if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): + server_frames.append(pkt) + + server_count = len(server_frames) + print(f" Total frames with server MAC: {server_count}") + if server_count > 0: + server_frame_types = Counter() + for pkt in server_frames: + if pkt.haslayer(Dot11): + dot11 = pkt[Dot11] + if dot11.type == 0: + server_frame_types["Management"] += 1 + elif dot11.type == 1: + server_frame_types["Control"] += 1 + elif dot11.type == 2: + server_frame_types["Data"] += 1 + + print(" Frame type breakdown:") + for frame_type, count in server_frame_types.most_common(): + print(f" {frame_type}: {count} frame(s)") + + print(" Sample frames:") + for i, pkt in enumerate(server_frames[:5]): + ra, ta = self.parser.ra_ta(pkt) + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + frame_type = self.parser.frame_type_name(pkt) + encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" + retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" + duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" + print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") + print() + + def _print_summary(self, total_count, plcp_count): + """Print summary statistics.""" + print("=== Summary ===") + if total_count > 0: + print(f"✓ Monitor mode is working! Captured {total_count} packet(s)") + if plcp_count > 0: + print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") + else: + print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") + + +class PacketCapture: + """Handles packet capture operations.""" + def __init__(self, config): + self.config = config + self.parser = PacketParser() + self.analyzer = PacketAnalyzer(self.parser) + + def capture_from_pcap(self, pcap_file): + """Read and analyze packets from a PCAP file.""" + print("=== Reading packets from PCAP file ===") + print(f"PCAP file: {pcap_file}") + print() + + try: + packets = rdpcap(pcap_file) + print(f"Loaded {len(packets)} packets from {pcap_file}") + + file_size = os.path.getsize(pcap_file) + print(f"File size: {file_size} bytes") + print() + + self.analyzer.analyze(packets, 1.0) + return 0 + except OSError as e: + print(f"Error reading PCAP file: {e}") + return 1 + except Exception as e: + print(f"Error reading PCAP file: {e}") + import traceback + traceback.print_exc() + return 1 + + def capture_live(self, interface, channel, duration): + """Capture packets from a live interface.""" + print("=== Testing Monitor Mode with scapy ===") + print(f"Interface: {interface}") + print(f"Channel: {channel}") + print(f"Duration: {duration} seconds") + print() + + setup = MonitorModeSetup(interface, channel) + if not setup.setup(): + if setup.error_message: + print(setup.error_message) + return 1 + + if not self._test_capture(interface): + return 1 + + return self._main_capture(interface, duration) + + def _test_capture(self, interface): + """Perform a test capture to verify monitor mode.""" + print("\nChecking Data Link Type (1 second test capture)...") + print("(This may take up to 2 seconds if no packets are present)") + + test_packets = [] + try: + sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen) + except OSError as e: + print(f"Error during test capture (system error): {e}") + return False + except Exception as e: + print(f"Error during test capture: {e}") + return False + + test_count = len(test_packets) + test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) + + if test_count > 0: + print("Sample packets:") + for i, pkt in enumerate(test_packets[:5]): + ra, ta = self.parser.ra_ta(pkt) + ra_str = ra if ra else "N/A" + ta_str = ta if ta else "N/A" + plcp = "yes" if pkt.haslayer(RadioTap) else "no" + print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") + + print(f"\nTest capture results:") + print(f" Packets captured: {test_count}") + print(f" PLCP headers: {test_plcp}") + if test_plcp == 0 and test_count > 0: + print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") + + return True + + def _main_capture(self, interface, duration): + """Perform the main packet capture.""" + print(f"\n=== Starting scapy capture ({duration} seconds) ===") + print("Press Ctrl+C to stop early\n") + + if self.config.full_packet: + print("Capturing full packets...") + else: + print(f"Capturing packets (snaplen={self.config.snaplen} bytes, header + some payload)...") + + packets = [] + pcap_path = None + + if self.config.keep_pcap: + pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') + pcap_path = pcap_file.name + pcap_file.close() + print(f"Capturing to file: {pcap_path}") + + capture_error = None + try: + if self.config.keep_pcap: + sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) + wrpcap(pcap_path, packets) + print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") + print(f"Keeping pcap file: {pcap_path}") + print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)") + else: + sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) + except KeyboardInterrupt: + print("\nCapture interrupted by user") + except OSError as e: + capture_error = e + print(f"\nError during capture (system error): {e}") + except Exception as e: + capture_error = e + print(f"\nError during capture: {e}") + import traceback + traceback.print_exc() + + self.analyzer.analyze(packets, duration) + + if capture_error: + return 1 + return 0 + + +class ArgumentParser: + """Handles command line argument parsing.""" + def __init__(self, config): + self.config = config + + def parse(self): + """Parse command line arguments.""" + args = sys.argv[1:] + + if len(args) == 0: + self._print_usage() + return None + + args = self._process_flags(args) + + if self._is_pcap_file(args): + return self._parse_pcap_mode(args) + + return self._parse_live_mode(args) + + def _print_usage(self): + """Print usage information.""" + print("Usage:") + print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") + print(" Read pcap: python3 wifi_monitor.py ") + + def _process_flags(self, args): + """Process command line flags.""" + if "--keep-pcap" in args or "-k" in args: + self.config.keep_pcap = True + args = [a for a in args if a not in ["--keep-pcap", "-k"]] + + if "--full-packet" in args or "-f" in args: + self.config.full_packet = True + args = [a for a in args if a not in ["--full-packet", "-f"]] + + return args + + def _is_pcap_file(self, args): + """Check if first argument is a PCAP file.""" + return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')) + + def _parse_pcap_mode(self, args): + """Parse arguments for PCAP file mode.""" + pcap_file = args[0] + if not os.path.isfile(pcap_file): + print(f"Error: PCAP file not found: {pcap_file}") + return None + return ('pcap', pcap_file, None, None) + + def _parse_live_mode(self, args): + """Parse arguments for live capture mode.""" + if len(args) > 0: + if args[0].startswith("wl") and len(args[0]) <= 6: + self.config.wifi_interface = args[0] + channel = int(args[1]) if len(args) > 1 else 36 + duration = int(args[2]) if len(args) > 2 else 10 + else: + channel = int(args[0]) if len(args) > 0 else 36 + duration = int(args[1]) if len(args) > 1 else 10 + else: + channel = 36 + duration = 10 + + return ('live', self.config.wifi_interface, channel, duration) + + +def main(): + """Main function with single exit point.""" + exit_code = 0 + + config = Config() + arg_parser = ArgumentParser(config) + parsed_args = arg_parser.parse() + + if parsed_args is None: + exit_code = 1 + else: + mode, interface_or_pcap, channel, duration = parsed_args + capture = PacketCapture(config) + + if mode == 'pcap': + exit_code = capture.capture_from_pcap(interface_or_pcap) + else: + exit_code = capture.capture_live(interface_or_pcap, channel, duration) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main())