Refactor CLI to use argparse key-value pairs and improve data frame analysis

- Replace custom argument parsing with argparse for better CLI design
- Use key-value pairs: --interface/-i, --channel/-c, --duration/-d, --pcap
- Remove iperf-specific references (server MAC analysis, iperf mentions)
- Improve data frame subtype detection for encrypted frames
- Add _get_subtype_from_fc() to parse Frame Control field directly
- Show data frame subtype breakdown in analysis output
- Add better debugging when QoS Data frames aren't found
- Maintain backward compatibility for positional pcap file argument

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Robert McMahon 2026-02-13 15:40:08 -08:00
parent 9ff3cb9793
commit 12c57df2a2
1 changed files with 188 additions and 112 deletions

View File

@ -5,17 +5,17 @@ Replaces the bash script with a pure Python solution.
Usage:
# Live capture from interface:
sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet]
sudo python3 wifi_monitor.py wlan0 36 10
sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap
sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only)
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10
sudo python3 wifi_monitor.py -i wlan0 -c 36 -d 10 --keep-pcap
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10 --full-packet
# Read from pcap file:
python3 wifi_monitor.py <pcap_file>
python3 wifi_monitor.py /tmp/capture.pcap
python3 wifi_monitor.py --pcap /tmp/capture.pcap
python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility
"""
# Standard library imports
import argparse
import asyncio
import os
import subprocess
@ -301,6 +301,28 @@ class WiFiMonitor:
class PacketParser:
"""Handles parsing of 802.11 packet fields."""
@staticmethod
def _get_subtype_from_fc(pkt):
"""Extract subtype from Frame Control field, even for encrypted frames."""
if not pkt.haslayer(Dot11):
return None
dot11 = pkt[Dot11]
# Try direct access first
if hasattr(dot11, 'subtype'):
return dot11.subtype
# For encrypted frames, parse from Frame Control field
# Frame Control format: Protocol Version (2 bits) | Type (2 bits) | Subtype (4 bits) | ...
if hasattr(dot11, 'FCfield'):
# FCfield might be the full 16-bit field, extract subtype
# Subtype is bits 4-7 (0-indexed from right)
fc = dot11.FCfield if hasattr(dot11, 'FCfield') else 0
subtype = (fc >> 4) & 0x0F
return subtype
return None
@staticmethod
def frame_type_name(pkt):
"""Get human-readable frame type name."""
@ -309,37 +331,41 @@ class PacketParser:
dot11 = pkt[Dot11]
fc = dot11.FCfield
# Get type and subtype
frame_type = dot11.type if hasattr(dot11, 'type') else ((fc >> 2) & 0x03)
subtype = PacketParser._get_subtype_from_fc(pkt)
if dot11.type == 0: # Management
if dot11.subtype == 8:
if frame_type == 0: # Management
if subtype == 8:
return "Beacon"
elif dot11.subtype == 4:
elif subtype == 4:
return "Probe Request"
elif dot11.subtype == 5:
elif subtype == 5:
return "Probe Response"
elif dot11.subtype == 11:
elif subtype == 11:
return "Authentication"
elif dot11.subtype == 12:
elif subtype == 12:
return "Deauthentication"
elif dot11.subtype == 0:
elif subtype == 0:
return "Association Request"
elif dot11.subtype == 1:
elif subtype == 1:
return "Association Response"
elif dot11.subtype == 10:
elif subtype == 10:
return "Disassociation"
else:
return f"Management ({dot11.subtype})"
elif dot11.type == 1: # Control
return f"Control ({dot11.subtype})"
elif dot11.type == 2: # Data
if dot11.subtype == 8:
return f"Management ({subtype})"
elif frame_type == 1: # Control
return f"Control ({subtype})"
elif frame_type == 2: # Data
if subtype == 8:
return "QoS Data"
elif dot11.subtype == 0:
elif subtype == 0:
return "Data"
else:
return f"Data ({dot11.subtype})"
return f"Data ({subtype})"
else:
return f"Unknown ({dot11.type}/{dot11.subtype})"
return f"Unknown ({frame_type}/{subtype})"
@staticmethod
def ra_ta(pkt):
@ -408,7 +434,6 @@ class CaptureAnalyzer:
self._print_phy_histograms(packets)
self._print_frame_type_breakdown(packets)
self._print_data_frame_analysis(packets)
self._print_server_mac_analysis(packets)
self._print_summary(total_count, plcp_count)
def _print_no_packets_message(self):
@ -526,8 +551,31 @@ class CaptureAnalyzer:
def _print_data_frame_analysis(self, packets):
"""Print data frame analysis."""
print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):")
qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8]
print("Data frame analysis (QoS Data frames, subtype 8):")
# Get all data frames
data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2]
data_count = len(data_frames)
print(f" Total data frames: {data_count}")
# Analyze subtypes
subtype_counts = Counter()
for pkt in data_frames:
if pkt.haslayer(Dot11):
subtype = self.parser._get_subtype_from_fc(pkt)
if subtype is not None:
subtype_counts[subtype] += 1
else:
subtype_counts['unknown'] += 1
if subtype_counts:
print(" Data frame subtypes:")
for subtype, count in sorted(subtype_counts.items()):
subtype_name = self._get_data_subtype_name(subtype)
print(f" Subtype {subtype} ({subtype_name}): {count} frame(s)")
# Find QoS Data frames (subtype 8)
qos_data_frames = [pkt for pkt in data_frames if self.parser._get_subtype_from_fc(pkt) == 8]
qos_count = len(qos_data_frames)
print(f" QoS Data frames (type 2, subtype 8): {qos_count}")
@ -537,7 +585,7 @@ class CaptureAnalyzer:
print(f" Unencrypted: {unencrypted_count}")
if qos_count > 0:
print(" Sample QoS Data frames (likely iperf traffic):")
print(" Sample QoS Data frames:")
for i, pkt in enumerate(qos_data_frames[:5]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
@ -546,47 +594,43 @@ class CaptureAnalyzer:
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}")
print()
def _print_server_mac_analysis(self, packets):
"""Print analysis of frames involving server MAC."""
server_mac = "80:84:89:93:c4:b6"
print(f"Frames involving server MAC ({server_mac}):")
server_frames = []
for pkt in packets:
ra, ta = self.parser.ra_ta(pkt)
if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()):
server_frames.append(pkt)
server_count = len(server_frames)
print(f" Total frames with server MAC: {server_count}")
if server_count > 0:
server_frame_types = Counter()
for pkt in server_frames:
if pkt.haslayer(Dot11):
dot11 = pkt[Dot11]
if dot11.type == 0:
server_frame_types["Management"] += 1
elif dot11.type == 1:
server_frame_types["Control"] += 1
elif dot11.type == 2:
server_frame_types["Data"] += 1
print(" Frame type breakdown:")
for frame_type, count in server_frame_types.most_common():
print(f" {frame_type}: {count} frame(s)")
print(" Sample frames:")
for i, pkt in enumerate(server_frames[:5]):
elif data_count > 0:
print(" Sample data frames (all subtypes):")
for i, pkt in enumerate(data_frames[:5]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
frame_type = self.parser.frame_type_name(pkt)
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}")
dot11 = pkt[Dot11]
subtype = self.parser._get_subtype_from_fc(pkt)
subtype_name = self._get_data_subtype_name(subtype) if subtype is not None else "Unknown"
encrypted = "encrypted" if (dot11.FCfield & 0x40) else "unencrypted"
retry = " [retry]" if (dot11.FCfield & 0x08) else ""
duration_val = dot11.Duration if hasattr(dot11, 'Duration') else "N/A"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, subtype={subtype} ({subtype_name}), {encrypted}, dur={duration_val}{retry}")
print()
def _get_data_subtype_name(self, subtype):
"""Get human-readable data frame subtype name."""
subtype_names = {
0: "Data",
1: "Data+CF-Ack",
2: "Data+CF-Poll",
3: "Data+CF-Ack+CF-Poll",
4: "Null",
5: "CF-Ack",
6: "CF-Poll",
7: "CF-Ack+CF-Poll",
8: "QoS Data",
9: "QoS Data+CF-Ack",
10: "QoS Data+CF-Poll",
11: "QoS Data+CF-Ack+CF-Poll",
12: "QoS Null",
13: "Reserved",
14: "QoS CF-Poll",
15: "QoS CF-Ack+CF-Poll"
}
return subtype_names.get(subtype, f"Unknown ({subtype})")
def _print_summary(self, total_count, plcp_count):
"""Print summary statistics."""
@ -768,71 +812,103 @@ class PacketCapture:
class ArgumentParser:
"""Handles command line argument parsing."""
"""Handles command line argument parsing using argparse."""
def __init__(self, config):
self.config = config
def parse(self):
"""Parse command line arguments."""
args = sys.argv[1:]
parser = argparse.ArgumentParser(
description="Monitor mode WiFi packet capture and analysis using scapy",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Live capture with defaults (wlan0, channel 36, 10 seconds):
sudo python3 wifi_monitor.py
args = self._process_flags(args)
# Live capture with specific interface, channel, and duration:
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --duration 10
if len(args) == 0:
# Use defaults: wlan0, channel 36, 10 seconds
return self._parse_live_mode([])
# Short form:
sudo python3 wifi_monitor.py -i wlan1 -c 11 -d 5
if self._is_pcap_file(args):
return self._parse_pcap_mode(args)
# Save captured packets to pcap file:
sudo python3 wifi_monitor.py -i wlan0 -c 36 -d 10 --keep-pcap
return self._parse_live_mode(args)
# Analyze existing pcap file:
python3 wifi_monitor.py --pcap /tmp/capture.pcap
python3 wifi_monitor.py /tmp/capture.pcap # Also supported
"""
)
def _print_usage(self):
"""Print usage information."""
print("Usage:")
print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]")
print(" Defaults: wlan0, channel 36, 10 seconds")
print(" Read pcap: python3 wifi_monitor.py <pcap_file>")
# PCAP file mode
parser.add_argument(
'--pcap',
metavar='FILE',
help='Read packets from PCAP file instead of live capture'
)
def _process_flags(self, args):
"""Process command line flags."""
if "--keep-pcap" in args or "-k" in args:
self.config.keep_pcap = True
args = [a for a in args if a not in ["--keep-pcap", "-k"]]
# Live capture mode options
parser.add_argument(
'--interface', '-i',
default='wlan0',
metavar='IFACE',
help='WiFi interface name (default: wlan0)'
)
parser.add_argument(
'--channel', '-c',
type=int,
default=36,
metavar='CH',
help='WiFi channel (default: 36)'
)
parser.add_argument(
'--duration', '-d',
type=int,
default=10,
metavar='SECONDS',
help='Capture duration in seconds (default: 10)'
)
if "--full-packet" in args or "-f" in args:
self.config.full_packet = True
args = [a for a in args if a not in ["--full-packet", "-f"]]
# Flags
parser.add_argument(
'--keep-pcap', '-k',
action='store_true',
help='Save captured packets to a pcap file'
)
parser.add_argument(
'--full-packet', '-f',
action='store_true',
help='Capture full packets (default: header only, not supported for live capture)'
)
return args
# Backward compatibility: if first positional arg looks like a pcap file, use it
args, unknown = parser.parse_known_args()
def _is_pcap_file(self, args):
"""Check if first argument is a PCAP file."""
return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap'))
# Check for backward compatibility: first unknown arg might be a pcap file
if unknown and (unknown[0].endswith('.pcap') or unknown[0].endswith('.cap')):
if args.pcap:
parser.error("Cannot specify both --pcap and positional pcap file argument")
args.pcap = unknown[0]
unknown = unknown[1:]
def _parse_pcap_mode(self, args):
"""Parse arguments for PCAP file mode."""
pcap_file = args[0]
if not os.path.isfile(pcap_file):
print(f"Error: PCAP file not found: {pcap_file}")
return None
return ('pcap', pcap_file, None, None)
if unknown:
parser.error(f"Unrecognized arguments: {' '.join(unknown)}")
def _parse_live_mode(self, args):
"""Parse arguments for live capture mode."""
if len(args) > 0:
if args[0].startswith("wl") and len(args[0]) <= 6:
self.config.wifi_interface = args[0]
channel = int(args[1]) if len(args) > 1 else 36
duration = int(args[2]) if len(args) > 2 else 10
else:
channel = int(args[0]) if len(args) > 0 else 36
duration = int(args[1]) if len(args) > 1 else 10
# Apply flags to config
self.config.keep_pcap = args.keep_pcap
self.config.full_packet = args.full_packet
# Determine mode
if args.pcap:
if not os.path.isfile(args.pcap):
print(f"Error: PCAP file not found: {args.pcap}")
return None
return ('pcap', args.pcap, None, None)
else:
channel = 36
duration = 10
return ('live', self.config.wifi_interface, channel, duration)
# Live capture mode
self.config.wifi_interface = args.interface
return ('live', args.interface, args.channel, args.duration)
def main():