1081 lines
40 KiB
Python
Executable File
1081 lines
40 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Monitor mode WiFi packet capture and analysis using scapy.
|
|
Replaces the bash script with a pure Python solution.
|
|
|
|
Usage:
|
|
# Live capture from interface:
|
|
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10
|
|
sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap
|
|
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10 --full-packet
|
|
|
|
# Read from pcap file:
|
|
python3 wifi_monitor.py --pcap /tmp/capture.pcap
|
|
python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility
|
|
"""
|
|
|
|
# Standard library imports
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime
|
|
|
|
# Third-party imports
|
|
from scapy.all import RadioTap, rdpcap, sniff, wrpcap
|
|
from scapy.layers.dot11 import (
|
|
Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon,
|
|
Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS
|
|
)
|
|
|
|
|
|
class Config:
|
|
"""Configuration class for capture settings."""
|
|
def __init__(self):
|
|
self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0")
|
|
self.keep_pcap = False
|
|
self.full_packet = False
|
|
self.default_snaplen = 256 # Header + some payload
|
|
|
|
@property
|
|
def snaplen(self):
|
|
"""Get snaplen value based on full_packet setting."""
|
|
return 0 if self.full_packet else self.default_snaplen
|
|
|
|
|
|
class WiFiMonitor:
|
|
"""Handles WiFi interface monitor mode setup and teardown with async support."""
|
|
def __init__(self, interface, channel):
|
|
self.interface = interface
|
|
self.channel = channel
|
|
self._error_message = None
|
|
self._original_mode = None
|
|
self._is_started = False
|
|
|
|
@property
|
|
def error_message(self):
|
|
"""Get error message if setup failed."""
|
|
return self._error_message
|
|
|
|
@property
|
|
def is_successful(self):
|
|
"""Check if setup was successful."""
|
|
return self._error_message is None
|
|
|
|
@property
|
|
def is_started(self):
|
|
"""Check if monitor mode is currently active."""
|
|
return self._is_started
|
|
|
|
async def start(self):
|
|
"""Start monitor mode on the WiFi interface."""
|
|
self._error_message = None
|
|
result = True
|
|
print(f"=== Setting up monitor mode on {self.interface} ===")
|
|
|
|
if not await self._unmanage_networkmanager():
|
|
result = False
|
|
|
|
await self._unblock_wifi()
|
|
|
|
if not await self._set_monitor_mode():
|
|
result = False
|
|
|
|
if not await self._set_channel():
|
|
result = False
|
|
|
|
await self._verify_monitor_mode()
|
|
|
|
if result:
|
|
self._is_started = True
|
|
|
|
return result
|
|
|
|
async def stop(self):
|
|
"""Stop monitor mode and restore interface to managed mode."""
|
|
if not self._is_started:
|
|
return True
|
|
|
|
print(f"=== Restoring {self.interface} to managed mode ===")
|
|
result = True
|
|
|
|
try:
|
|
# Bring down interface
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ip", "link", "set", self.interface, "down",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Set back to managed mode
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"iw", "dev", self.interface, "set", "type", "managed",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode() if stderr else "Unknown error"
|
|
print(f"Warning: Failed to set managed mode: {error_msg}")
|
|
result = False
|
|
else:
|
|
print("Restored to managed mode")
|
|
|
|
# Re-enable NetworkManager management
|
|
await self._manage_networkmanager()
|
|
|
|
# Bring up interface
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ip", "link", "set", self.interface, "up",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
|
|
self._is_started = False
|
|
except OSError as e:
|
|
self._error_message = f"Error restoring managed mode: {e}"
|
|
print(self._error_message)
|
|
result = False
|
|
|
|
return result
|
|
|
|
async def _unmanage_networkmanager(self):
|
|
"""Try to unmanage interface from NetworkManager."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"which", "nmcli",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
if proc.returncode == 0:
|
|
print("Unmanaging interface from NetworkManager...")
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"nmcli", "device", "set", self.interface, "managed", "no",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
except OSError:
|
|
pass
|
|
return True
|
|
|
|
async def _manage_networkmanager(self):
|
|
"""Re-enable NetworkManager management of the interface."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"which", "nmcli",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
if proc.returncode == 0:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"nmcli", "device", "set", self.interface, "managed", "yes",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
except OSError:
|
|
pass
|
|
|
|
async def _unblock_wifi(self):
|
|
"""Unblock WiFi if blocked by rfkill."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"rfkill", "unblock", "wifi",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
except OSError:
|
|
pass
|
|
|
|
async def _set_monitor_mode(self):
|
|
"""Set interface to monitor mode."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"iw", "dev", self.interface, "info",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
await proc.wait()
|
|
|
|
if proc.returncode == 0 and b"type monitor" in stdout:
|
|
print(f"Already in monitor mode")
|
|
return True
|
|
|
|
print(f"Setting {self.interface} to monitor mode...")
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ip", "link", "set", self.interface, "down",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await proc.wait()
|
|
await asyncio.sleep(0.5)
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"iw", "dev", self.interface, "set", "type", "monitor",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode() if stderr else "Unknown error"
|
|
self._error_message = f"Error setting monitor mode: {error_msg}"
|
|
print(self._error_message)
|
|
return False
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ip", "link", "set", self.interface, "up",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode() if stderr else "Unknown error"
|
|
self._error_message = f"Error bringing interface up: {error_msg}"
|
|
print(self._error_message)
|
|
return False
|
|
|
|
print("Monitor mode activated")
|
|
return True
|
|
except OSError as e:
|
|
self._error_message = f"Error setting monitor mode: {e}"
|
|
print(self._error_message)
|
|
return False
|
|
|
|
async def _set_channel(self):
|
|
"""Set channel for monitor mode interface."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"iw", "dev", self.interface, "set", "channel", str(self.channel),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode == 0:
|
|
print(f"Channel set to {self.channel}")
|
|
return True
|
|
else:
|
|
print(f"Error setting channel: {stderr.decode() if stderr else 'Unknown error'}")
|
|
print("Continuing anyway - channel may not be set correctly")
|
|
return True # Non-fatal
|
|
except OSError as e:
|
|
print(f"Error setting channel: {e}")
|
|
print("Continuing anyway - channel may not be set correctly")
|
|
return True # Non-fatal
|
|
|
|
async def _verify_monitor_mode(self):
|
|
"""Verify monitor mode is active."""
|
|
print("\nVerifying monitor mode...")
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"iw", "dev", self.interface, "info",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
await proc.wait()
|
|
|
|
if proc.returncode == 0:
|
|
for line in stdout.decode().splitlines():
|
|
if "type" in line or "channel" in line:
|
|
print(f"\t{line.strip()}")
|
|
except (OSError, subprocess.SubprocessError):
|
|
pass
|
|
|
|
|
|
class PacketParser:
|
|
"""Handles parsing of 802.11 packet fields."""
|
|
@staticmethod
|
|
def _get_subtype_from_fc(pkt):
|
|
"""Extract subtype from Frame Control field, even for encrypted frames."""
|
|
if not pkt.haslayer(Dot11):
|
|
return None
|
|
|
|
dot11 = pkt[Dot11]
|
|
# Try direct access first
|
|
if hasattr(dot11, 'subtype'):
|
|
return dot11.subtype
|
|
|
|
# For encrypted frames, parse from Frame Control field
|
|
# Frame Control format: Protocol Version (2 bits) | Type (2 bits) | Subtype (4 bits) | ...
|
|
if hasattr(dot11, 'FCfield'):
|
|
# FCfield might be the full 16-bit field, extract subtype
|
|
# Subtype is bits 4-7 (0-indexed from right)
|
|
fc = dot11.FCfield if hasattr(dot11, 'FCfield') else 0
|
|
subtype = (fc >> 4) & 0x0F
|
|
return subtype
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def frame_type_name(pkt):
|
|
"""Get human-readable frame type name."""
|
|
if not pkt.haslayer(Dot11):
|
|
return "Unknown"
|
|
|
|
dot11 = pkt[Dot11]
|
|
fc = dot11.FCfield
|
|
|
|
# Get type and subtype
|
|
frame_type = dot11.type if hasattr(dot11, 'type') else ((fc >> 2) & 0x03)
|
|
subtype = PacketParser._get_subtype_from_fc(pkt)
|
|
|
|
if frame_type == 0: # Management
|
|
if subtype == 8:
|
|
return "Beacon"
|
|
elif subtype == 4:
|
|
return "Probe Request"
|
|
elif subtype == 5:
|
|
return "Probe Response"
|
|
elif subtype == 11:
|
|
return "Authentication"
|
|
elif subtype == 12:
|
|
return "Deauthentication"
|
|
elif subtype == 0:
|
|
return "Association Request"
|
|
elif subtype == 1:
|
|
return "Association Response"
|
|
elif subtype == 10:
|
|
return "Disassociation"
|
|
else:
|
|
return f"Management ({subtype})"
|
|
elif frame_type == 1: # Control
|
|
return f"Control ({subtype})"
|
|
elif frame_type == 2: # Data
|
|
if subtype == 8:
|
|
return "QoS Data"
|
|
elif subtype == 0:
|
|
return "Data"
|
|
else:
|
|
return f"Data ({subtype})"
|
|
else:
|
|
return f"Unknown ({frame_type}/{subtype})"
|
|
|
|
@staticmethod
|
|
def ra_ta(pkt):
|
|
"""Extract RA and TA from 802.11 frame."""
|
|
if not pkt.haslayer(Dot11):
|
|
return None, None
|
|
|
|
dot11 = pkt[Dot11]
|
|
|
|
ra = dot11.addr1 if hasattr(dot11, 'addr1') else None
|
|
ta = dot11.addr2 if hasattr(dot11, 'addr2') else None
|
|
|
|
if ra:
|
|
ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra)
|
|
if ta:
|
|
ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta)
|
|
|
|
return ra, ta
|
|
|
|
@staticmethod
|
|
def phy_info(pkt):
|
|
"""Extract PHY rate and MCS from packet (if available in radiotap)."""
|
|
phy_rate = None
|
|
mcs = None
|
|
|
|
if pkt.haslayer(RadioTap):
|
|
radiotap = pkt[RadioTap]
|
|
if hasattr(radiotap, 'Rate'):
|
|
rate_val = radiotap.Rate
|
|
if rate_val:
|
|
phy_rate = rate_val * 0.5 # Convert to Mbps
|
|
if hasattr(radiotap, 'MCS'):
|
|
mcs_data = radiotap.MCS
|
|
if mcs_data and hasattr(mcs_data, 'index'):
|
|
mcs = mcs_data.index
|
|
|
|
return phy_rate, mcs
|
|
|
|
|
|
class CaptureAnalyzer:
|
|
"""Analyzes packet captures and generates statistics."""
|
|
def __init__(self, parser):
|
|
self.parser = parser
|
|
|
|
@staticmethod
|
|
def _format_timestamp(pkt):
|
|
"""Format packet timestamp as HH:MM:SS.mmm."""
|
|
if hasattr(pkt, 'time') and pkt.time:
|
|
dt = datetime.fromtimestamp(pkt.time)
|
|
return dt.strftime("%H:%M:%S.%f")[:-3] # Truncate to milliseconds
|
|
return "N/A"
|
|
|
|
def analyze(self, packets, duration, tcpdump_data_count=None):
|
|
"""Analyze captured packets and generate statistics."""
|
|
print("\n=== Capture Statistics ===")
|
|
|
|
total_count = len(packets)
|
|
print(f"Total packets captured (scapy): {total_count}")
|
|
|
|
scapy_data_count = sum(1 for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2)
|
|
|
|
if tcpdump_data_count is not None:
|
|
if tcpdump_data_count > 0:
|
|
ratio = scapy_data_count / tcpdump_data_count
|
|
print(f"Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump), ratio: {ratio:.1%}")
|
|
elif scapy_data_count > 0:
|
|
print(f"Data frames: {scapy_data_count} (scapy) vs 0 (tcpdump) - tcpdump found 0 data frames")
|
|
else:
|
|
print(f"Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump)")
|
|
else:
|
|
print(f"Data frames: {scapy_data_count} (scapy) - tcpdump counter unavailable")
|
|
|
|
if total_count == 0:
|
|
self._print_no_packets_message()
|
|
return
|
|
|
|
plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap))
|
|
print(f"PLCP headers: {plcp_count}")
|
|
|
|
if total_count > 0:
|
|
rate = total_count / duration
|
|
print(f"Packet rate: {rate:.1f} packets/second")
|
|
print()
|
|
|
|
self._print_sample_packets(packets)
|
|
self._print_ra_ta_pairs(packets)
|
|
self._print_phy_histograms(packets)
|
|
self._print_frame_type_breakdown(packets)
|
|
self._print_data_frame_analysis(packets)
|
|
|
|
# Calculate scapy data frame count for summary
|
|
scapy_data_count = sum(1 for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2)
|
|
self._print_summary(total_count, plcp_count, tcpdump_data_count, scapy_data_count)
|
|
|
|
def _print_no_packets_message(self):
|
|
"""Print message when no packets are captured."""
|
|
print("\n(No packets captured)")
|
|
print("\n=== Summary ===")
|
|
print("✗ No packets captured. Check:")
|
|
print(f" 1. Is there WiFi traffic on the channel?")
|
|
print(f" 2. Is the interface actually in monitor mode?")
|
|
print(f" 3. Try a different channel or longer duration")
|
|
|
|
def _print_sample_packets(self, packets):
|
|
"""Print sample packets."""
|
|
print("Sample packets (first 10):")
|
|
for i, pkt in enumerate(packets[:10]):
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
|
|
frame_type = self.parser.frame_type_name(pkt)
|
|
|
|
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
|
|
|
|
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
|
|
|
|
duration_val = "N/A"
|
|
if pkt.haslayer(Dot11):
|
|
duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A"
|
|
|
|
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
|
|
|
|
timestamp = self._format_timestamp(pkt)
|
|
|
|
print(f" Frame {i+1}: time={timestamp}, RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}")
|
|
print()
|
|
|
|
def _print_ra_ta_pairs(self, packets):
|
|
"""Print unique RA/TA pairs."""
|
|
print("Unique RA/TA pairs (with counts):")
|
|
ra_ta_pairs = Counter()
|
|
for pkt in packets:
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
if ra or ta:
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
pair = f"{ra_str} -> {ta_str}"
|
|
ra_ta_pairs[pair] += 1
|
|
|
|
if ra_ta_pairs:
|
|
for pair, count in ra_ta_pairs.most_common():
|
|
print(f" {pair}: {count} frame(s)")
|
|
else:
|
|
print(" (no valid RA/TA pairs found)")
|
|
print()
|
|
|
|
def _print_phy_histograms(self, packets):
|
|
"""Print PHY rate and MCS histograms per RA/TA pair."""
|
|
print("PHY Rate and MCS Histograms per RA/TA pair:")
|
|
rate_histograms = defaultdict(Counter)
|
|
mcs_histograms = defaultdict(Counter)
|
|
|
|
for pkt in packets:
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
if not (ra or ta):
|
|
continue
|
|
|
|
if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2:
|
|
continue
|
|
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
pair = f"{ra_str} -> {ta_str}"
|
|
|
|
phy_rate, mcs = self.parser.phy_info(pkt)
|
|
if phy_rate:
|
|
rate_histograms[pair][phy_rate] += 1
|
|
if mcs is not None:
|
|
mcs_histograms[pair][mcs] += 1
|
|
|
|
for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))):
|
|
print(f"\n {pair}:")
|
|
|
|
print(" PHY Rate (Mbps):")
|
|
if pair in rate_histograms:
|
|
for rate in sorted(rate_histograms[pair].keys()):
|
|
print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)")
|
|
else:
|
|
print(" (no PHY rate data)")
|
|
|
|
print(" MCS Index:")
|
|
if pair in mcs_histograms:
|
|
for mcs_val in sorted(mcs_histograms[pair].keys()):
|
|
print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)")
|
|
else:
|
|
print(" (no MCS data)")
|
|
print()
|
|
|
|
def _print_frame_type_breakdown(self, packets):
|
|
"""Print frame type breakdown."""
|
|
print("Frame type breakdown:")
|
|
frame_types = Counter()
|
|
for pkt in packets:
|
|
if pkt.haslayer(Dot11):
|
|
dot11 = pkt[Dot11]
|
|
if dot11.type == 0:
|
|
frame_types["Management"] += 1
|
|
elif dot11.type == 1:
|
|
frame_types["Control"] += 1
|
|
elif dot11.type == 2:
|
|
frame_types["Data"] += 1
|
|
else:
|
|
frame_types["Unknown"] += 1
|
|
|
|
for frame_type, count in frame_types.most_common():
|
|
print(f" {frame_type}: {count} frame(s)")
|
|
print()
|
|
|
|
def _print_data_frame_analysis(self, packets):
|
|
"""Print data frame analysis."""
|
|
print("Data frame analysis (QoS Data frames, subtype 8):")
|
|
|
|
# Get all data frames
|
|
data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2]
|
|
data_count = len(data_frames)
|
|
print(f" Total data frames: {data_count}")
|
|
|
|
# Analyze subtypes
|
|
subtype_counts = Counter()
|
|
for pkt in data_frames:
|
|
if pkt.haslayer(Dot11):
|
|
subtype = self.parser._get_subtype_from_fc(pkt)
|
|
if subtype is not None:
|
|
subtype_counts[subtype] += 1
|
|
else:
|
|
subtype_counts['unknown'] += 1
|
|
|
|
if subtype_counts:
|
|
print(" Data frame subtypes:")
|
|
for subtype, count in sorted(subtype_counts.items(), key=lambda x: (1 if x[0] == 'unknown' else 0, x[0])):
|
|
subtype_name = self._get_data_subtype_name(subtype)
|
|
print(f" Subtype {subtype} ({subtype_name}): {count} frame(s)")
|
|
|
|
# Find QoS Data frames (subtype 8)
|
|
qos_data_frames = [pkt for pkt in data_frames if self.parser._get_subtype_from_fc(pkt) == 8]
|
|
qos_count = len(qos_data_frames)
|
|
print(f" QoS Data frames (type 2, subtype 8): {qos_count}")
|
|
|
|
encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40)
|
|
unencrypted_count = qos_count - encrypted_count
|
|
print(f" Encrypted: {encrypted_count}")
|
|
print(f" Unencrypted: {unencrypted_count}")
|
|
|
|
if qos_count > 0:
|
|
print(" Sample QoS Data frames:")
|
|
for i, pkt in enumerate(qos_data_frames[:5]):
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
|
|
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
|
|
duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A"
|
|
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}")
|
|
elif data_count > 0:
|
|
print(" Sample data frames (all subtypes):")
|
|
for i, pkt in enumerate(data_frames[:5]):
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
dot11 = pkt[Dot11]
|
|
subtype = self.parser._get_subtype_from_fc(pkt)
|
|
subtype_name = self._get_data_subtype_name(subtype) if subtype is not None else "Unknown"
|
|
encrypted = "encrypted" if (dot11.FCfield & 0x40) else "unencrypted"
|
|
retry = " [retry]" if (dot11.FCfield & 0x08) else ""
|
|
duration_val = dot11.Duration if hasattr(dot11, 'Duration') else "N/A"
|
|
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, subtype={subtype} ({subtype_name}), {encrypted}, dur={duration_val}{retry}")
|
|
print()
|
|
|
|
def _get_data_subtype_name(self, subtype):
|
|
"""Get human-readable data frame subtype name."""
|
|
subtype_names = {
|
|
0: "Data",
|
|
1: "Data+CF-Ack",
|
|
2: "Data+CF-Poll",
|
|
3: "Data+CF-Ack+CF-Poll",
|
|
4: "Null",
|
|
5: "CF-Ack",
|
|
6: "CF-Poll",
|
|
7: "CF-Ack+CF-Poll",
|
|
8: "QoS Data",
|
|
9: "QoS Data+CF-Ack",
|
|
10: "QoS Data+CF-Poll",
|
|
11: "QoS Data+CF-Ack+CF-Poll",
|
|
12: "QoS Null",
|
|
13: "Reserved",
|
|
14: "QoS CF-Poll",
|
|
15: "QoS CF-Ack+CF-Poll"
|
|
}
|
|
return subtype_names.get(subtype, f"Unknown ({subtype})")
|
|
|
|
|
|
def _print_summary(self, total_count, plcp_count, tcpdump_data_count=None, scapy_data_count=0):
|
|
"""Print summary statistics."""
|
|
print("=== Summary ===")
|
|
if total_count > 0:
|
|
print(f"✓ Monitor mode is working! Captured {total_count} packet(s) (scapy)")
|
|
if tcpdump_data_count is not None and tcpdump_data_count > 0:
|
|
print(f"✓ Data frames: {scapy_data_count} (scapy) vs {tcpdump_data_count} (tcpdump)")
|
|
if plcp_count > 0:
|
|
print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information")
|
|
else:
|
|
print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
|
|
|
|
|
|
class PacketCapture:
|
|
"""Handles packet capture operations."""
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.parser = PacketParser()
|
|
self.analyzer = CaptureAnalyzer(self.parser)
|
|
|
|
def capture_from_pcap(self, pcap_file):
|
|
"""Read and analyze packets from a PCAP file."""
|
|
print("=== Reading packets from PCAP file ===")
|
|
print(f"PCAP file: {pcap_file}")
|
|
print()
|
|
|
|
try:
|
|
packets = rdpcap(pcap_file)
|
|
print(f"Loaded {len(packets)} packets from {pcap_file}")
|
|
|
|
file_size = os.path.getsize(pcap_file)
|
|
print(f"File size: {file_size} bytes")
|
|
print()
|
|
|
|
self.analyzer.analyze(packets, 1.0, None)
|
|
return 0
|
|
except OSError as e:
|
|
print(f"Error reading PCAP file: {e}")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"Error reading PCAP file: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
def capture_live(self, interface, channel, duration):
|
|
"""Capture packets from a live interface."""
|
|
return asyncio.run(self._capture_live_async(interface, channel, duration))
|
|
|
|
async def _capture_live_async(self, interface, channel, duration):
|
|
"""Async implementation of live capture."""
|
|
print("=== Testing Monitor Mode with scapy ===")
|
|
print(f"Interface: {interface}")
|
|
print(f"Channel: {channel}")
|
|
print(f"Time: {duration} seconds")
|
|
print()
|
|
|
|
monitor = WiFiMonitor(interface, channel)
|
|
if not await monitor.start():
|
|
if monitor.error_message:
|
|
print(monitor.error_message)
|
|
return 1
|
|
|
|
try:
|
|
if not await self._test_capture_async(interface):
|
|
return 1
|
|
|
|
return await self._main_capture_async(interface, duration)
|
|
finally:
|
|
await monitor.stop()
|
|
|
|
async def _test_capture_async(self, interface):
|
|
"""Perform a test capture to verify monitor mode (async)."""
|
|
print("\nChecking Data Link Type (1 second test capture)...")
|
|
print("(This may take up to 2 seconds if no packets are present)")
|
|
|
|
test_packets = []
|
|
try:
|
|
# Run blocking sniff in executor
|
|
# Note: snaplen is not supported for live interface capture in scapy
|
|
loop = asyncio.get_event_loop()
|
|
sniff_kwargs = {
|
|
"iface": interface,
|
|
"prn": lambda pkt: test_packets.append(pkt),
|
|
"timeout": 1,
|
|
"store": False
|
|
}
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: sniff(**sniff_kwargs)
|
|
)
|
|
except OSError as e:
|
|
print(f"Error during test capture (system error): {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error during test capture: {e}")
|
|
return False
|
|
|
|
test_count = len(test_packets)
|
|
test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap))
|
|
|
|
if test_count > 0:
|
|
print("Sample packets:")
|
|
for i, pkt in enumerate(test_packets[:5]):
|
|
ra, ta = self.parser.ra_ta(pkt)
|
|
ra_str = ra if ra else "N/A"
|
|
ta_str = ta if ta else "N/A"
|
|
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
|
|
timestamp = self.analyzer._format_timestamp(pkt)
|
|
print(f" Frame {i+1}: time={timestamp}, RA={ra_str}, TA={ta_str}, PLCP={plcp}")
|
|
|
|
print(f"\nTest capture results:")
|
|
print(f" Packets captured: {test_count}")
|
|
print(f" PLCP headers: {test_plcp}")
|
|
if test_plcp == 0 and test_count > 0:
|
|
print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
|
|
|
|
return True
|
|
|
|
async def _run_tcpdump_counter(self, interface, duration):
|
|
"""Run tcpdump concurrently to count data frames."""
|
|
try:
|
|
# Check if tcpdump is available
|
|
check_proc = await asyncio.create_subprocess_exec(
|
|
"which", "tcpdump",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await check_proc.wait()
|
|
if check_proc.returncode != 0:
|
|
print("Warning: tcpdump not found, skipping concurrent count")
|
|
return None
|
|
|
|
# Use tcpdump with BPF filter for data frames
|
|
# For 802.11 frames: wlan[0] contains Frame Control field
|
|
# Bits 2-3 are the type field: 00=Management, 01=Control, 10=Data
|
|
# Type 2 (Data) = 0x08 when masked with 0x0C
|
|
# BPF syntax: use == for equality comparison
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"tcpdump",
|
|
"-i", interface,
|
|
"-n", # Don't resolve addresses
|
|
"-q", # Quiet mode (less verbose)
|
|
"-c", "1000000", # Large count limit
|
|
"wlan[0] & 0x0C == 0x08", # BPF filter: type == 2 (data frames)
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
# Wait for duration, then terminate
|
|
await asyncio.sleep(duration)
|
|
|
|
# Terminate tcpdump gracefully
|
|
try:
|
|
proc.terminate()
|
|
await asyncio.wait_for(proc.wait(), timeout=2.0)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
|
|
# Read stderr (tcpdump outputs packet count to stderr)
|
|
_, stderr = await proc.communicate()
|
|
stderr_text = stderr.decode() if stderr else ""
|
|
|
|
# Check return code
|
|
if proc.returncode != 0 and proc.returncode != -15: # -15 is SIGTERM, which is expected
|
|
print(f"Warning: tcpdump exited with code {proc.returncode}")
|
|
if stderr_text:
|
|
print(f"tcpdump stderr: {stderr_text[:300]}")
|
|
return None
|
|
|
|
# Debug: print stderr if no count found
|
|
if not stderr_text.strip():
|
|
print("Warning: tcpdump produced no output")
|
|
return None
|
|
|
|
# Parse tcpdump output for packet count
|
|
# Format: "X packets captured" or "X packets received by filter"
|
|
data_frame_count = 0
|
|
|
|
for line in stderr_text.splitlines():
|
|
# Look for "X packets captured" or "X packets received by filter"
|
|
match = re.search(r'(\d+)\s+packets?\s+(captured|received by filter)', line, re.IGNORECASE)
|
|
if match:
|
|
data_frame_count = int(match.group(1))
|
|
break
|
|
# Also try simpler pattern
|
|
match = re.search(r'(\d+)\s+packets?', line, re.IGNORECASE)
|
|
if match and "dropped" not in line.lower() and "packet" in line.lower():
|
|
potential_count = int(match.group(1))
|
|
if potential_count > data_frame_count:
|
|
data_frame_count = potential_count
|
|
|
|
if data_frame_count == 0:
|
|
# Debug output - but still return 0 so we can show it
|
|
print(f"Warning: tcpdump found 0 data frames. tcpdump stderr: {stderr_text[:200]}")
|
|
|
|
# Return the count even if 0, so we can display it
|
|
return data_frame_count
|
|
except FileNotFoundError:
|
|
print("Warning: tcpdump not found, skipping concurrent count")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Warning: tcpdump counter failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
async def _main_capture_async(self, interface, duration):
|
|
"""Perform the main packet capture (async)."""
|
|
print(f"\n=== Starting scapy capture ({duration} seconds) ===")
|
|
print("Press Ctrl+C to stop early\n")
|
|
|
|
if self.config.full_packet:
|
|
print("Capturing full packets...")
|
|
else:
|
|
print(f"Capturing packets (full packets - snaplen not supported for live capture)...")
|
|
|
|
packets = []
|
|
pcap_path = None
|
|
|
|
if self.config.keep_pcap:
|
|
pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_')
|
|
pcap_path = pcap_file.name
|
|
pcap_file.close()
|
|
print(f"Capturing to file: {pcap_path}")
|
|
|
|
# Start tcpdump counter concurrently (only if available)
|
|
tcpdump_task = None
|
|
try:
|
|
# Check if tcpdump is available before starting
|
|
check_proc = await asyncio.create_subprocess_exec(
|
|
"which", "tcpdump",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await check_proc.wait()
|
|
if check_proc.returncode == 0:
|
|
print("Starting concurrent tcpdump counter for data frames...")
|
|
tcpdump_task = asyncio.create_task(self._run_tcpdump_counter(interface, duration))
|
|
else:
|
|
print("Note: tcpdump not found, skipping concurrent count")
|
|
except Exception:
|
|
print("Note: tcpdump not found, skipping concurrent count")
|
|
|
|
capture_error = None
|
|
try:
|
|
# Run blocking sniff in executor
|
|
# Note: snaplen is not supported for live interface capture in scapy
|
|
# We capture full packets and can truncate in post-processing if needed
|
|
loop = asyncio.get_event_loop()
|
|
sniff_kwargs = {
|
|
"iface": interface,
|
|
"prn": lambda pkt: packets.append(pkt),
|
|
"timeout": duration,
|
|
"store": True
|
|
}
|
|
if self.config.keep_pcap:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: sniff(**sniff_kwargs)
|
|
)
|
|
await loop.run_in_executor(None, lambda: wrpcap(pcap_path, packets))
|
|
print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes")
|
|
print(f"Keeping pcap file: {pcap_path}")
|
|
print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)")
|
|
else:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: sniff(**sniff_kwargs)
|
|
)
|
|
except KeyboardInterrupt:
|
|
print("\nCapture interrupted by user")
|
|
except OSError as e:
|
|
capture_error = e
|
|
print(f"\nError during capture (system error): {e}")
|
|
except Exception as e:
|
|
capture_error = e
|
|
print(f"\nError during capture: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Get tcpdump count (if task was started)
|
|
tcpdump_data_count = None
|
|
if tcpdump_task is not None:
|
|
tcpdump_data_count = await tcpdump_task
|
|
|
|
self.analyzer.analyze(packets, duration, tcpdump_data_count)
|
|
|
|
if capture_error:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
class ArgumentParser:
|
|
"""Handles command line argument parsing using argparse."""
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def parse(self):
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Monitor mode WiFi packet capture and analysis using scapy",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Live capture with defaults (wlan0, channel 36, 10 seconds):
|
|
sudo python3 wifi_monitor.py
|
|
|
|
# Live capture with specific interface, channel, and time:
|
|
sudo python3 wifi_monitor.py --interface wlan0 --channel 36 --time 10
|
|
|
|
# Short form:
|
|
sudo python3 wifi_monitor.py -i wlan1 -c 11 -t 5
|
|
|
|
# Save captured packets to pcap file:
|
|
sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap
|
|
|
|
# Analyze existing pcap file:
|
|
python3 wifi_monitor.py --pcap /tmp/capture.pcap
|
|
python3 wifi_monitor.py /tmp/capture.pcap # Also supported
|
|
"""
|
|
)
|
|
|
|
# PCAP file mode
|
|
parser.add_argument(
|
|
'--pcap',
|
|
metavar='FILE',
|
|
help='Read packets from PCAP file instead of live capture'
|
|
)
|
|
|
|
# Live capture mode options
|
|
parser.add_argument(
|
|
'--interface', '-i',
|
|
default='wlan0',
|
|
metavar='IFACE',
|
|
help='WiFi interface name (default: wlan0)'
|
|
)
|
|
parser.add_argument(
|
|
'--channel', '-c',
|
|
type=int,
|
|
default=36,
|
|
metavar='CH',
|
|
help='WiFi channel (default: 36)'
|
|
)
|
|
parser.add_argument(
|
|
'--time', '-t',
|
|
type=int,
|
|
default=10,
|
|
metavar='SECONDS',
|
|
help='Capture time in seconds (default: 10)'
|
|
)
|
|
|
|
# Flags
|
|
parser.add_argument(
|
|
'--keep-pcap', '-k',
|
|
action='store_true',
|
|
help='Save captured packets to a pcap file'
|
|
)
|
|
parser.add_argument(
|
|
'--full-packet', '-f',
|
|
action='store_true',
|
|
help='Capture full packets (default: header only, not supported for live capture)'
|
|
)
|
|
|
|
# Backward compatibility: if first positional arg looks like a pcap file, use it
|
|
args, unknown = parser.parse_known_args()
|
|
|
|
# Check for backward compatibility: first unknown arg might be a pcap file
|
|
if unknown and (unknown[0].endswith('.pcap') or unknown[0].endswith('.cap')):
|
|
if args.pcap:
|
|
parser.error("Cannot specify both --pcap and positional pcap file argument")
|
|
args.pcap = unknown[0]
|
|
unknown = unknown[1:]
|
|
|
|
if unknown:
|
|
parser.error(f"Unrecognized arguments: {' '.join(unknown)}")
|
|
|
|
# Apply flags to config
|
|
self.config.keep_pcap = args.keep_pcap
|
|
self.config.full_packet = args.full_packet
|
|
|
|
# Determine mode
|
|
if args.pcap:
|
|
if not os.path.isfile(args.pcap):
|
|
print(f"Error: PCAP file not found: {args.pcap}")
|
|
return None
|
|
return ('pcap', args.pcap, None, None)
|
|
else:
|
|
# Live capture mode
|
|
self.config.wifi_interface = args.interface
|
|
return ('live', args.interface, args.channel, args.time)
|
|
|
|
|
|
def main():
|
|
"""Main function with single exit point."""
|
|
exit_code = 0
|
|
|
|
config = Config()
|
|
arg_parser = ArgumentParser(config)
|
|
parsed_args = arg_parser.parse()
|
|
|
|
if parsed_args is None:
|
|
exit_code = 1
|
|
else:
|
|
mode, interface_or_pcap, channel, duration = parsed_args
|
|
capture = PacketCapture(config)
|
|
|
|
if mode == 'pcap':
|
|
exit_code = capture.capture_from_pcap(interface_or_pcap)
|
|
else:
|
|
exit_code = capture.capture_live(interface_or_pcap, channel, duration)
|
|
|
|
return exit_code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|