diff --git a/wifi_monitor.py b/wifi_monitor.py index d4068ae..5c517b1 100755 --- a/wifi_monitor.py +++ b/wifi_monitor.py @@ -5,17 +5,17 @@ Replaces the bash script with a pure Python solution. Usage: # Live capture from interface: - sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] - sudo python3 wifi_monitor.py wlan0 36 10 - sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap - sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) + sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10 + sudo python3 wifi_monitor.py -i wlan0 -c 36 -d 10 --keep-pcap + sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10 --full-packet # Read from pcap file: - python3 wifi_monitor.py - python3 wifi_monitor.py /tmp/capture.pcap + python3 wifi_monitor.py --pcap /tmp/capture.pcap + python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility """ # Standard library imports +import argparse import asyncio import os import subprocess @@ -301,6 +301,28 @@ class WiFiMonitor: class PacketParser: """Handles parsing of 802.11 packet fields.""" + @staticmethod + def _get_subtype_from_fc(pkt): + """Extract subtype from Frame Control field, even for encrypted frames.""" + if not pkt.haslayer(Dot11): + return None + + dot11 = pkt[Dot11] + # Try direct access first + if hasattr(dot11, 'subtype'): + return dot11.subtype + + # For encrypted frames, parse from Frame Control field + # Frame Control format: Protocol Version (2 bits) | Type (2 bits) | Subtype (4 bits) | ... + if hasattr(dot11, 'FCfield'): + # FCfield might be the full 16-bit field, extract subtype + # Subtype is bits 4-7 (0-indexed from right) + fc = dot11.FCfield if hasattr(dot11, 'FCfield') else 0 + subtype = (fc >> 4) & 0x0F + return subtype + + return None + @staticmethod def frame_type_name(pkt): """Get human-readable frame type name.""" @@ -309,37 +331,41 @@ class PacketParser: dot11 = pkt[Dot11] fc = dot11.FCfield + + # Get type and subtype + frame_type = dot11.type if hasattr(dot11, 'type') else ((fc >> 2) & 0x03) + subtype = PacketParser._get_subtype_from_fc(pkt) - if dot11.type == 0: # Management - if dot11.subtype == 8: + if frame_type == 0: # Management + if subtype == 8: return "Beacon" - elif dot11.subtype == 4: + elif subtype == 4: return "Probe Request" - elif dot11.subtype == 5: + elif subtype == 5: return "Probe Response" - elif dot11.subtype == 11: + elif subtype == 11: return "Authentication" - elif dot11.subtype == 12: + elif subtype == 12: return "Deauthentication" - elif dot11.subtype == 0: + elif subtype == 0: return "Association Request" - elif dot11.subtype == 1: + elif subtype == 1: return "Association Response" - elif dot11.subtype == 10: + elif subtype == 10: return "Disassociation" else: - return f"Management ({dot11.subtype})" - elif dot11.type == 1: # Control - return f"Control ({dot11.subtype})" - elif dot11.type == 2: # Data - if dot11.subtype == 8: + return f"Management ({subtype})" + elif frame_type == 1: # Control + return f"Control ({subtype})" + elif frame_type == 2: # Data + if subtype == 8: return "QoS Data" - elif dot11.subtype == 0: + elif subtype == 0: return "Data" else: - return f"Data ({dot11.subtype})" + return f"Data ({subtype})" else: - return f"Unknown ({dot11.type}/{dot11.subtype})" + return f"Unknown ({frame_type}/{subtype})" @staticmethod def ra_ta(pkt): @@ -408,7 +434,6 @@ class CaptureAnalyzer: self._print_phy_histograms(packets) self._print_frame_type_breakdown(packets) self._print_data_frame_analysis(packets) - self._print_server_mac_analysis(packets) self._print_summary(total_count, plcp_count) def _print_no_packets_message(self): @@ -526,8 +551,31 @@ class CaptureAnalyzer: def _print_data_frame_analysis(self, packets): """Print data frame analysis.""" - print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") - qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] + print("Data frame analysis (QoS Data frames, subtype 8):") + + # Get all data frames + data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2] + data_count = len(data_frames) + print(f" Total data frames: {data_count}") + + # Analyze subtypes + subtype_counts = Counter() + for pkt in data_frames: + if pkt.haslayer(Dot11): + subtype = self.parser._get_subtype_from_fc(pkt) + if subtype is not None: + subtype_counts[subtype] += 1 + else: + subtype_counts['unknown'] += 1 + + if subtype_counts: + print(" Data frame subtypes:") + for subtype, count in sorted(subtype_counts.items()): + subtype_name = self._get_data_subtype_name(subtype) + print(f" Subtype {subtype} ({subtype_name}): {count} frame(s)") + + # Find QoS Data frames (subtype 8) + qos_data_frames = [pkt for pkt in data_frames if self.parser._get_subtype_from_fc(pkt) == 8] qos_count = len(qos_data_frames) print(f" QoS Data frames (type 2, subtype 8): {qos_count}") @@ -537,7 +585,7 @@ class CaptureAnalyzer: print(f" Unencrypted: {unencrypted_count}") if qos_count > 0: - print(" Sample QoS Data frames (likely iperf traffic):") + print(" Sample QoS Data frames:") for i, pkt in enumerate(qos_data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" @@ -546,47 +594,43 @@ class CaptureAnalyzer: retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") - print() - - def _print_server_mac_analysis(self, packets): - """Print analysis of frames involving server MAC.""" - server_mac = "80:84:89:93:c4:b6" - print(f"Frames involving server MAC ({server_mac}):") - server_frames = [] - for pkt in packets: - ra, ta = self.parser.ra_ta(pkt) - if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): - server_frames.append(pkt) - - server_count = len(server_frames) - print(f" Total frames with server MAC: {server_count}") - if server_count > 0: - server_frame_types = Counter() - for pkt in server_frames: - if pkt.haslayer(Dot11): - dot11 = pkt[Dot11] - if dot11.type == 0: - server_frame_types["Management"] += 1 - elif dot11.type == 1: - server_frame_types["Control"] += 1 - elif dot11.type == 2: - server_frame_types["Data"] += 1 - - print(" Frame type breakdown:") - for frame_type, count in server_frame_types.most_common(): - print(f" {frame_type}: {count} frame(s)") - - print(" Sample frames:") - for i, pkt in enumerate(server_frames[:5]): + elif data_count > 0: + print(" Sample data frames (all subtypes):") + for i, pkt in enumerate(data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" - frame_type = self.parser.frame_type_name(pkt) - encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" - retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" - duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" - print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") + dot11 = pkt[Dot11] + subtype = self.parser._get_subtype_from_fc(pkt) + subtype_name = self._get_data_subtype_name(subtype) if subtype is not None else "Unknown" + encrypted = "encrypted" if (dot11.FCfield & 0x40) else "unencrypted" + retry = " [retry]" if (dot11.FCfield & 0x08) else "" + duration_val = dot11.Duration if hasattr(dot11, 'Duration') else "N/A" + print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, subtype={subtype} ({subtype_name}), {encrypted}, dur={duration_val}{retry}") print() + + def _get_data_subtype_name(self, subtype): + """Get human-readable data frame subtype name.""" + subtype_names = { + 0: "Data", + 1: "Data+CF-Ack", + 2: "Data+CF-Poll", + 3: "Data+CF-Ack+CF-Poll", + 4: "Null", + 5: "CF-Ack", + 6: "CF-Poll", + 7: "CF-Ack+CF-Poll", + 8: "QoS Data", + 9: "QoS Data+CF-Ack", + 10: "QoS Data+CF-Poll", + 11: "QoS Data+CF-Ack+CF-Poll", + 12: "QoS Null", + 13: "Reserved", + 14: "QoS CF-Poll", + 15: "QoS CF-Ack+CF-Poll" + } + return subtype_names.get(subtype, f"Unknown ({subtype})") + def _print_summary(self, total_count, plcp_count): """Print summary statistics.""" @@ -768,71 +812,103 @@ class PacketCapture: class ArgumentParser: - """Handles command line argument parsing.""" + """Handles command line argument parsing using argparse.""" def __init__(self, config): self.config = config def parse(self): """Parse command line arguments.""" - args = sys.argv[1:] + parser = argparse.ArgumentParser( + description="Monitor mode WiFi packet capture and analysis using scapy", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Live capture with defaults (wlan0, channel 36, 10 seconds): + sudo python3 wifi_monitor.py - args = self._process_flags(args) + # Live capture with specific interface, channel, and duration: + sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10 - if len(args) == 0: - # Use defaults: wlan0, channel 36, 10 seconds - return self._parse_live_mode([]) + # Short form: + sudo python3 wifi_monitor.py -i wlan1 -c 11 -d 5 - if self._is_pcap_file(args): - return self._parse_pcap_mode(args) + # Save captured packets to pcap file: + sudo python3 wifi_monitor.py -i wlan0 -c 36 -d 10 --keep-pcap - return self._parse_live_mode(args) + # Analyze existing pcap file: + python3 wifi_monitor.py --pcap /tmp/capture.pcap + python3 wifi_monitor.py /tmp/capture.pcap # Also supported + """ + ) - def _print_usage(self): - """Print usage information.""" - print("Usage:") - print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") - print(" Defaults: wlan0, channel 36, 10 seconds") - print(" Read pcap: python3 wifi_monitor.py ") + # PCAP file mode + parser.add_argument( + '--pcap', + metavar='FILE', + help='Read packets from PCAP file instead of live capture' + ) - def _process_flags(self, args): - """Process command line flags.""" - if "--keep-pcap" in args or "-k" in args: - self.config.keep_pcap = True - args = [a for a in args if a not in ["--keep-pcap", "-k"]] + # Live capture mode options + parser.add_argument( + '--interface', '-i', + default='wlan0', + metavar='IFACE', + help='WiFi interface name (default: wlan0)' + ) + parser.add_argument( + '--channel', '-c', + type=int, + default=36, + metavar='CH', + help='WiFi channel (default: 36)' + ) + parser.add_argument( + '--duration', '-d', + type=int, + default=10, + metavar='SECONDS', + help='Capture duration in seconds (default: 10)' + ) - if "--full-packet" in args or "-f" in args: - self.config.full_packet = True - args = [a for a in args if a not in ["--full-packet", "-f"]] + # Flags + parser.add_argument( + '--keep-pcap', '-k', + action='store_true', + help='Save captured packets to a pcap file' + ) + parser.add_argument( + '--full-packet', '-f', + action='store_true', + help='Capture full packets (default: header only, not supported for live capture)' + ) - return args + # Backward compatibility: if first positional arg looks like a pcap file, use it + args, unknown = parser.parse_known_args() - def _is_pcap_file(self, args): - """Check if first argument is a PCAP file.""" - return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')) + # Check for backward compatibility: first unknown arg might be a pcap file + if unknown and (unknown[0].endswith('.pcap') or unknown[0].endswith('.cap')): + if args.pcap: + parser.error("Cannot specify both --pcap and positional pcap file argument") + args.pcap = unknown[0] + unknown = unknown[1:] - def _parse_pcap_mode(self, args): - """Parse arguments for PCAP file mode.""" - pcap_file = args[0] - if not os.path.isfile(pcap_file): - print(f"Error: PCAP file not found: {pcap_file}") - return None - return ('pcap', pcap_file, None, None) + if unknown: + parser.error(f"Unrecognized arguments: {' '.join(unknown)}") - def _parse_live_mode(self, args): - """Parse arguments for live capture mode.""" - if len(args) > 0: - if args[0].startswith("wl") and len(args[0]) <= 6: - self.config.wifi_interface = args[0] - channel = int(args[1]) if len(args) > 1 else 36 - duration = int(args[2]) if len(args) > 2 else 10 - else: - channel = int(args[0]) if len(args) > 0 else 36 - duration = int(args[1]) if len(args) > 1 else 10 + # Apply flags to config + self.config.keep_pcap = args.keep_pcap + self.config.full_packet = args.full_packet + + # Determine mode + if args.pcap: + if not os.path.isfile(args.pcap): + print(f"Error: PCAP file not found: {args.pcap}") + return None + return ('pcap', args.pcap, None, None) else: - channel = 36 - duration = 10 - - return ('live', self.config.wifi_interface, channel, duration) + # Live capture mode + self.config.wifi_interface = args.interface + return ('live', args.interface, args.channel, args.duration) def main():