fiwi_monitor/wifi_monitor.py

850 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Monitor mode WiFi packet capture and analysis using scapy.
Replaces the bash script with a pure Python solution.
Usage:
# Live capture from interface:
sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet]
sudo python3 wifi_monitor.py wlan0 36 10
sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap
sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only)
# Read from pcap file:
python3 wifi_monitor.py <pcap_file>
python3 wifi_monitor.py /tmp/capture.pcap
"""
# Standard library imports
import asyncio
import os
import subprocess
import sys
import tempfile
import time
from collections import Counter, defaultdict
# Third-party imports
from scapy.all import RadioTap, rdpcap, sniff, wrpcap
from scapy.layers.dot11 import (
Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon,
Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS
)
class Config:
"""Configuration class for capture settings."""
def __init__(self):
self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0")
self.keep_pcap = False
self.full_packet = False
self.default_snaplen = 256 # Header + some payload
@property
def snaplen(self):
"""Get snaplen value based on full_packet setting."""
return 0 if self.full_packet else self.default_snaplen
class WiFiMonitor:
"""Handles WiFi interface monitor mode setup and teardown with async support."""
def __init__(self, interface, channel):
self.interface = interface
self.channel = channel
self._error_message = None
self._original_mode = None
self._is_started = False
@property
def error_message(self):
"""Get error message if setup failed."""
return self._error_message
@property
def is_successful(self):
"""Check if setup was successful."""
return self._error_message is None
@property
def is_started(self):
"""Check if monitor mode is currently active."""
return self._is_started
async def start(self):
"""Start monitor mode on the WiFi interface."""
self._error_message = None
result = True
print(f"=== Setting up monitor mode on {self.interface} ===")
if not await self._unmanage_networkmanager():
result = False
await self._unblock_wifi()
if not await self._set_monitor_mode():
result = False
if not await self._set_channel():
result = False
await self._verify_monitor_mode()
if result:
self._is_started = True
return result
async def stop(self):
"""Stop monitor mode and restore interface to managed mode."""
if not self._is_started:
return True
print(f"=== Restoring {self.interface} to managed mode ===")
result = True
try:
# Bring down interface
proc = await asyncio.create_subprocess_exec(
"ip", "link", "set", self.interface, "down",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
await asyncio.sleep(0.5)
# Set back to managed mode
proc = await asyncio.create_subprocess_exec(
"iw", "dev", self.interface, "set", "type", "managed",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode() if stderr else "Unknown error"
print(f"Warning: Failed to set managed mode: {error_msg}")
result = False
else:
print("Restored to managed mode")
# Re-enable NetworkManager management
await self._manage_networkmanager()
# Bring up interface
proc = await asyncio.create_subprocess_exec(
"ip", "link", "set", self.interface, "up",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
self._is_started = False
except OSError as e:
self._error_message = f"Error restoring managed mode: {e}"
print(self._error_message)
result = False
return result
async def _unmanage_networkmanager(self):
"""Try to unmanage interface from NetworkManager."""
try:
proc = await asyncio.create_subprocess_exec(
"which", "nmcli",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
if proc.returncode == 0:
print("Unmanaging interface from NetworkManager...")
proc = await asyncio.create_subprocess_exec(
"nmcli", "device", "set", self.interface, "managed", "no",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
except OSError:
pass
return True
async def _manage_networkmanager(self):
"""Re-enable NetworkManager management of the interface."""
try:
proc = await asyncio.create_subprocess_exec(
"which", "nmcli",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
if proc.returncode == 0:
proc = await asyncio.create_subprocess_exec(
"nmcli", "device", "set", self.interface, "managed", "yes",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
except OSError:
pass
async def _unblock_wifi(self):
"""Unblock WiFi if blocked by rfkill."""
try:
proc = await asyncio.create_subprocess_exec(
"rfkill", "unblock", "wifi",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
except OSError:
pass
async def _set_monitor_mode(self):
"""Set interface to monitor mode."""
try:
proc = await asyncio.create_subprocess_exec(
"iw", "dev", self.interface, "info",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
await proc.wait()
if proc.returncode == 0 and b"type monitor" in stdout:
print(f"Already in monitor mode")
return True
print(f"Setting {self.interface} to monitor mode...")
proc = await asyncio.create_subprocess_exec(
"ip", "link", "set", self.interface, "down",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
await asyncio.sleep(0.5)
proc = await asyncio.create_subprocess_exec(
"iw", "dev", self.interface, "set", "type", "monitor",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
stderr = await proc.communicate()[1]
await proc.wait()
if proc.returncode != 0:
error_msg = stderr.decode() if stderr else "Unknown error"
self._error_message = f"Error setting monitor mode: {error_msg}"
print(self._error_message)
return False
await asyncio.sleep(0.5)
proc = await asyncio.create_subprocess_exec(
"ip", "link", "set", self.interface, "up",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
stderr = await proc.communicate()[1]
await proc.wait()
if proc.returncode != 0:
error_msg = stderr.decode() if stderr else "Unknown error"
self._error_message = f"Error bringing interface up: {error_msg}"
print(self._error_message)
return False
print("Monitor mode activated")
return True
except OSError as e:
self._error_message = f"Error setting monitor mode: {e}"
print(self._error_message)
return False
async def _set_channel(self):
"""Set channel for monitor mode interface."""
try:
proc = await asyncio.create_subprocess_exec(
"iw", "dev", self.interface, "set", "channel", str(self.channel),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
stderr = await proc.communicate()[1]
await proc.wait()
if proc.returncode == 0:
print(f"Channel set to {self.channel}")
return True
else:
print(f"Error setting channel: {stderr.decode() if stderr else 'Unknown error'}")
print("Continuing anyway - channel may not be set correctly")
return True # Non-fatal
except OSError as e:
print(f"Error setting channel: {e}")
print("Continuing anyway - channel may not be set correctly")
return True # Non-fatal
async def _verify_monitor_mode(self):
"""Verify monitor mode is active."""
print("\nVerifying monitor mode...")
try:
proc = await asyncio.create_subprocess_exec(
"iw", "dev", self.interface, "info",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
stdout, _ = await proc.communicate()
await proc.wait()
if proc.returncode == 0:
for line in stdout.decode().splitlines():
if "type" in line or "channel" in line:
print(f"\t{line.strip()}")
except (OSError, subprocess.SubprocessError):
pass
class PacketParser:
"""Handles parsing of 802.11 packet fields."""
@staticmethod
def frame_type_name(pkt):
"""Get human-readable frame type name."""
if not pkt.haslayer(Dot11):
return "Unknown"
dot11 = pkt[Dot11]
fc = dot11.FCfield
if dot11.type == 0: # Management
if dot11.subtype == 8:
return "Beacon"
elif dot11.subtype == 4:
return "Probe Request"
elif dot11.subtype == 5:
return "Probe Response"
elif dot11.subtype == 11:
return "Authentication"
elif dot11.subtype == 12:
return "Deauthentication"
elif dot11.subtype == 0:
return "Association Request"
elif dot11.subtype == 1:
return "Association Response"
elif dot11.subtype == 10:
return "Disassociation"
else:
return f"Management ({dot11.subtype})"
elif dot11.type == 1: # Control
return f"Control ({dot11.subtype})"
elif dot11.type == 2: # Data
if dot11.subtype == 8:
return "QoS Data"
elif dot11.subtype == 0:
return "Data"
else:
return f"Data ({dot11.subtype})"
else:
return f"Unknown ({dot11.type}/{dot11.subtype})"
@staticmethod
def ra_ta(pkt):
"""Extract RA and TA from 802.11 frame."""
if not pkt.haslayer(Dot11):
return None, None
dot11 = pkt[Dot11]
ra = dot11.addr1 if hasattr(dot11, 'addr1') else None
ta = dot11.addr2 if hasattr(dot11, 'addr2') else None
if ra:
ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra)
if ta:
ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta)
return ra, ta
@staticmethod
def phy_info(pkt):
"""Extract PHY rate and MCS from packet (if available in radiotap)."""
phy_rate = None
mcs = None
if pkt.haslayer(RadioTap):
radiotap = pkt[RadioTap]
if hasattr(radiotap, 'Rate'):
rate_val = radiotap.Rate
if rate_val:
phy_rate = rate_val * 0.5 # Convert to Mbps
if hasattr(radiotap, 'MCS'):
mcs_data = radiotap.MCS
if mcs_data and hasattr(mcs_data, 'index'):
mcs = mcs_data.index
return phy_rate, mcs
class CaptureAnalyzer:
"""Analyzes packet captures and generates statistics."""
def __init__(self, parser):
self.parser = parser
def analyze(self, packets, duration):
"""Analyze captured packets and generate statistics."""
print("\n=== Capture Statistics ===")
total_count = len(packets)
print(f"Total packets captured: {total_count}")
if total_count == 0:
self._print_no_packets_message()
return
plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap))
print(f"PLCP headers: {plcp_count}")
if total_count > 0:
rate = total_count / duration
print(f"Packet rate: {rate:.1f} packets/second")
print()
self._print_sample_packets(packets)
self._print_ra_ta_pairs(packets)
self._print_phy_histograms(packets)
self._print_frame_type_breakdown(packets)
self._print_data_frame_analysis(packets)
self._print_server_mac_analysis(packets)
self._print_summary(total_count, plcp_count)
def _print_no_packets_message(self):
"""Print message when no packets are captured."""
print("\n(No packets captured)")
print("\n=== Summary ===")
print("✗ No packets captured. Check:")
print(f" 1. Is there WiFi traffic on the channel?")
print(f" 2. Is the interface actually in monitor mode?")
print(f" 3. Try a different channel or longer duration")
def _print_sample_packets(self, packets):
"""Print sample packets."""
print("Sample packets (first 10):")
for i, pkt in enumerate(packets[:10]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
frame_type = self.parser.frame_type_name(pkt)
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
duration_val = "N/A"
if pkt.haslayer(Dot11):
duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A"
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}")
print()
def _print_ra_ta_pairs(self, packets):
"""Print unique RA/TA pairs."""
print("Unique RA/TA pairs (with counts):")
ra_ta_pairs = Counter()
for pkt in packets:
ra, ta = self.parser.ra_ta(pkt)
if ra or ta:
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
pair = f"{ra_str} -> {ta_str}"
ra_ta_pairs[pair] += 1
if ra_ta_pairs:
for pair, count in ra_ta_pairs.most_common():
print(f" {pair}: {count} frame(s)")
else:
print(" (no valid RA/TA pairs found)")
print()
def _print_phy_histograms(self, packets):
"""Print PHY rate and MCS histograms per RA/TA pair."""
print("PHY Rate and MCS Histograms per RA/TA pair:")
rate_histograms = defaultdict(Counter)
mcs_histograms = defaultdict(Counter)
for pkt in packets:
ra, ta = self.parser.ra_ta(pkt)
if not (ra or ta):
continue
if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2:
continue
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
pair = f"{ra_str} -> {ta_str}"
phy_rate, mcs = self.parser.phy_info(pkt)
if phy_rate:
rate_histograms[pair][phy_rate] += 1
if mcs is not None:
mcs_histograms[pair][mcs] += 1
for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))):
print(f"\n {pair}:")
print(" PHY Rate (Mbps):")
if pair in rate_histograms:
for rate in sorted(rate_histograms[pair].keys()):
print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)")
else:
print(" (no PHY rate data)")
print(" MCS Index:")
if pair in mcs_histograms:
for mcs_val in sorted(mcs_histograms[pair].keys()):
print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)")
else:
print(" (no MCS data)")
print()
def _print_frame_type_breakdown(self, packets):
"""Print frame type breakdown."""
print("Frame type breakdown:")
frame_types = Counter()
for pkt in packets:
if pkt.haslayer(Dot11):
dot11 = pkt[Dot11]
if dot11.type == 0:
frame_types["Management"] += 1
elif dot11.type == 1:
frame_types["Control"] += 1
elif dot11.type == 2:
frame_types["Data"] += 1
else:
frame_types["Unknown"] += 1
for frame_type, count in frame_types.most_common():
print(f" {frame_type}: {count} frame(s)")
print()
def _print_data_frame_analysis(self, packets):
"""Print data frame analysis."""
print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):")
qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8]
qos_count = len(qos_data_frames)
print(f" QoS Data frames (type 2, subtype 8): {qos_count}")
encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40)
unencrypted_count = qos_count - encrypted_count
print(f" Encrypted: {encrypted_count}")
print(f" Unencrypted: {unencrypted_count}")
if qos_count > 0:
print(" Sample QoS Data frames (likely iperf traffic):")
for i, pkt in enumerate(qos_data_frames[:5]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}")
print()
def _print_server_mac_analysis(self, packets):
"""Print analysis of frames involving server MAC."""
server_mac = "80:84:89:93:c4:b6"
print(f"Frames involving server MAC ({server_mac}):")
server_frames = []
for pkt in packets:
ra, ta = self.parser.ra_ta(pkt)
if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()):
server_frames.append(pkt)
server_count = len(server_frames)
print(f" Total frames with server MAC: {server_count}")
if server_count > 0:
server_frame_types = Counter()
for pkt in server_frames:
if pkt.haslayer(Dot11):
dot11 = pkt[Dot11]
if dot11.type == 0:
server_frame_types["Management"] += 1
elif dot11.type == 1:
server_frame_types["Control"] += 1
elif dot11.type == 2:
server_frame_types["Data"] += 1
print(" Frame type breakdown:")
for frame_type, count in server_frame_types.most_common():
print(f" {frame_type}: {count} frame(s)")
print(" Sample frames:")
for i, pkt in enumerate(server_frames[:5]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
frame_type = self.parser.frame_type_name(pkt)
encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted"
retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else ""
duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A"
print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}")
print()
def _print_summary(self, total_count, plcp_count):
"""Print summary statistics."""
print("=== Summary ===")
if total_count > 0:
print(f"✓ Monitor mode is working! Captured {total_count} packet(s)")
if plcp_count > 0:
print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information")
else:
print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
class PacketCapture:
"""Handles packet capture operations."""
def __init__(self, config):
self.config = config
self.parser = PacketParser()
self.analyzer = CaptureAnalyzer(self.parser)
def capture_from_pcap(self, pcap_file):
"""Read and analyze packets from a PCAP file."""
print("=== Reading packets from PCAP file ===")
print(f"PCAP file: {pcap_file}")
print()
try:
packets = rdpcap(pcap_file)
print(f"Loaded {len(packets)} packets from {pcap_file}")
file_size = os.path.getsize(pcap_file)
print(f"File size: {file_size} bytes")
print()
self.analyzer.analyze(packets, 1.0)
return 0
except OSError as e:
print(f"Error reading PCAP file: {e}")
return 1
except Exception as e:
print(f"Error reading PCAP file: {e}")
import traceback
traceback.print_exc()
return 1
def capture_live(self, interface, channel, duration):
"""Capture packets from a live interface."""
return asyncio.run(self._capture_live_async(interface, channel, duration))
async def _capture_live_async(self, interface, channel, duration):
"""Async implementation of live capture."""
print("=== Testing Monitor Mode with scapy ===")
print(f"Interface: {interface}")
print(f"Channel: {channel}")
print(f"Duration: {duration} seconds")
print()
monitor = WiFiMonitor(interface, channel)
if not await monitor.start():
if monitor.error_message:
print(monitor.error_message)
return 1
try:
if not await self._test_capture_async(interface):
return 1
return await self._main_capture_async(interface, duration)
finally:
await monitor.stop()
async def _test_capture_async(self, interface):
"""Perform a test capture to verify monitor mode (async)."""
print("\nChecking Data Link Type (1 second test capture)...")
print("(This may take up to 2 seconds if no packets are present)")
test_packets = []
try:
# Run blocking sniff in executor
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen)
)
except OSError as e:
print(f"Error during test capture (system error): {e}")
return False
except Exception as e:
print(f"Error during test capture: {e}")
return False
test_count = len(test_packets)
test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap))
if test_count > 0:
print("Sample packets:")
for i, pkt in enumerate(test_packets[:5]):
ra, ta = self.parser.ra_ta(pkt)
ra_str = ra if ra else "N/A"
ta_str = ta if ta else "N/A"
plcp = "yes" if pkt.haslayer(RadioTap) else "no"
print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}")
print(f"\nTest capture results:")
print(f" Packets captured: {test_count}")
print(f" PLCP headers: {test_plcp}")
if test_plcp == 0 and test_count > 0:
print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)")
return True
async def _main_capture_async(self, interface, duration):
"""Perform the main packet capture (async)."""
print(f"\n=== Starting scapy capture ({duration} seconds) ===")
print("Press Ctrl+C to stop early\n")
if self.config.full_packet:
print("Capturing full packets...")
else:
print(f"Capturing packets (snaplen={self.config.snaplen} bytes, header + some payload)...")
packets = []
pcap_path = None
if self.config.keep_pcap:
pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_')
pcap_path = pcap_file.name
pcap_file.close()
print(f"Capturing to file: {pcap_path}")
capture_error = None
try:
# Run blocking sniff in executor
loop = asyncio.get_event_loop()
if self.config.keep_pcap:
await loop.run_in_executor(
None,
lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen)
)
await loop.run_in_executor(None, lambda: wrpcap(pcap_path, packets))
print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes")
print(f"Keeping pcap file: {pcap_path}")
print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)")
else:
await loop.run_in_executor(
None,
lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen)
)
except KeyboardInterrupt:
print("\nCapture interrupted by user")
except OSError as e:
capture_error = e
print(f"\nError during capture (system error): {e}")
except Exception as e:
capture_error = e
print(f"\nError during capture: {e}")
import traceback
traceback.print_exc()
self.analyzer.analyze(packets, duration)
if capture_error:
return 1
return 0
class ArgumentParser:
"""Handles command line argument parsing."""
def __init__(self, config):
self.config = config
def parse(self):
"""Parse command line arguments."""
args = sys.argv[1:]
args = self._process_flags(args)
if len(args) == 0:
# Use defaults: wlan0, channel 36, 10 seconds
return self._parse_live_mode([])
if self._is_pcap_file(args):
return self._parse_pcap_mode(args)
return self._parse_live_mode(args)
def _print_usage(self):
"""Print usage information."""
print("Usage:")
print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]")
print(" Defaults: wlan0, channel 36, 10 seconds")
print(" Read pcap: python3 wifi_monitor.py <pcap_file>")
def _process_flags(self, args):
"""Process command line flags."""
if "--keep-pcap" in args or "-k" in args:
self.config.keep_pcap = True
args = [a for a in args if a not in ["--keep-pcap", "-k"]]
if "--full-packet" in args or "-f" in args:
self.config.full_packet = True
args = [a for a in args if a not in ["--full-packet", "-f"]]
return args
def _is_pcap_file(self, args):
"""Check if first argument is a PCAP file."""
return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap'))
def _parse_pcap_mode(self, args):
"""Parse arguments for PCAP file mode."""
pcap_file = args[0]
if not os.path.isfile(pcap_file):
print(f"Error: PCAP file not found: {pcap_file}")
return None
return ('pcap', pcap_file, None, None)
def _parse_live_mode(self, args):
"""Parse arguments for live capture mode."""
if len(args) > 0:
if args[0].startswith("wl") and len(args[0]) <= 6:
self.config.wifi_interface = args[0]
channel = int(args[1]) if len(args) > 1 else 36
duration = int(args[2]) if len(args) > 2 else 10
else:
channel = int(args[0]) if len(args) > 0 else 36
duration = int(args[1]) if len(args) > 1 else 10
else:
channel = 36
duration = 10
return ('live', self.config.wifi_interface, channel, duration)
def main():
"""Main function with single exit point."""
exit_code = 0
config = Config()
arg_parser = ArgumentParser(config)
parsed_args = arg_parser.parse()
if parsed_args is None:
exit_code = 1
else:
mode, interface_or_pcap, channel, duration = parsed_args
capture = PacketCapture(config)
if mode == 'pcap':
exit_code = capture.capture_from_pcap(interface_or_pcap)
else:
exit_code = capture.capture_live(interface_or_pcap, channel, duration)
return exit_code
if __name__ == "__main__":
sys.exit(main())