diff --git a/wifi_monitor.py b/wifi_monitor.py index fd6155a..7de9192 100755 --- a/wifi_monitor.py +++ b/wifi_monitor.py @@ -16,6 +16,7 @@ Usage: """ # Standard library imports +import asyncio import os import subprocess import sys @@ -45,12 +46,14 @@ class Config: return 0 if self.full_packet else self.default_snaplen -class MonitorModeSetup: - """Handles WiFi interface monitor mode setup.""" +class WiFiMonitor: + """Handles WiFi interface monitor mode setup and teardown with async support.""" def __init__(self, interface, channel): self.interface = interface self.channel = channel self._error_message = None + self._original_mode = None + self._is_started = False @property def error_message(self): @@ -62,121 +65,239 @@ class MonitorModeSetup: """Check if setup was successful.""" return self._error_message is None - def setup(self): - """Set WiFi interface to monitor mode.""" + @property + def is_started(self): + """Check if monitor mode is currently active.""" + return self._is_started + + async def start(self): + """Start monitor mode on the WiFi interface.""" + self._error_message = None result = True print(f"=== Setting up monitor mode on {self.interface} ===") - if not self._unmanage_networkmanager(): + if not await self._unmanage_networkmanager(): result = False - self._unblock_wifi() + await self._unblock_wifi() - if not self._set_monitor_mode(): + if not await self._set_monitor_mode(): result = False - if not self._set_channel(): + if not await self._set_channel(): result = False - self._verify_monitor_mode() + await self._verify_monitor_mode() + + if result: + self._is_started = True return result - def _unmanage_networkmanager(self): + async def stop(self): + """Stop monitor mode and restore interface to managed mode.""" + if not self._is_started: + return True + + print(f"=== Restoring {self.interface} to managed mode ===") + result = True + + try: + # Bring down interface + proc = await asyncio.create_subprocess_exec( + "ip", "link", "set", self.interface, "down", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() + await asyncio.sleep(0.5) + + # Set back to managed mode + proc = await asyncio.create_subprocess_exec( + "iw", "dev", self.interface, "set", "type", "managed", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE + ) + _, stderr = await proc.communicate() + if proc.returncode != 0: + error_msg = stderr.decode() if stderr else "Unknown error" + print(f"Warning: Failed to set managed mode: {error_msg}") + result = False + else: + print("Restored to managed mode") + + # Re-enable NetworkManager management + await self._manage_networkmanager() + + # Bring up interface + proc = await asyncio.create_subprocess_exec( + "ip", "link", "set", self.interface, "up", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() + + self._is_started = False + except OSError as e: + self._error_message = f"Error restoring managed mode: {e}" + print(self._error_message) + result = False + + return result + + async def _unmanage_networkmanager(self): """Try to unmanage interface from NetworkManager.""" try: - result = subprocess.run(["which", "nmcli"], capture_output=True, text=True) - if result.returncode == 0: + proc = await asyncio.create_subprocess_exec( + "which", "nmcli", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() + if proc.returncode == 0: print("Unmanaging interface from NetworkManager...") - subprocess.run( - ["nmcli", "device", "set", self.interface, "managed", "no"], - capture_output=True, - stderr=subprocess.DEVNULL + proc = await asyncio.create_subprocess_exec( + "nmcli", "device", "set", self.interface, "managed", "no", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL ) + await proc.wait() except OSError: pass return True - def _unblock_wifi(self): - """Unblock WiFi if blocked by rfkill.""" + async def _manage_networkmanager(self): + """Re-enable NetworkManager management of the interface.""" try: - subprocess.run(["rfkill", "unblock", "wifi"], check=False) + proc = await asyncio.create_subprocess_exec( + "which", "nmcli", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() + if proc.returncode == 0: + proc = await asyncio.create_subprocess_exec( + "nmcli", "device", "set", self.interface, "managed", "yes", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() except OSError: pass - def _set_monitor_mode(self): + async def _unblock_wifi(self): + """Unblock WiFi if blocked by rfkill.""" + try: + proc = await asyncio.create_subprocess_exec( + "rfkill", "unblock", "wifi", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL + ) + await proc.wait() + except OSError: + pass + + async def _set_monitor_mode(self): """Set interface to monitor mode.""" try: - result = subprocess.run( - ["iw", "dev", self.interface, "info"], - capture_output=True, - text=True, - check=True + proc = await asyncio.create_subprocess_exec( + "iw", "dev", self.interface, "info", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE ) - if "type monitor" in result.stdout: + stdout, stderr = await proc.communicate() + await proc.wait() + + if proc.returncode == 0 and b"type monitor" in stdout: print(f"Already in monitor mode") return True print(f"Setting {self.interface} to monitor mode...") - subprocess.run( - ["ip", "link", "set", self.interface, "down"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + proc = await asyncio.create_subprocess_exec( + "ip", "link", "set", self.interface, "down", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL ) - time.sleep(0.5) + await proc.wait() + await asyncio.sleep(0.5) - subprocess.run( - ["iw", "dev", self.interface, "set", "type", "monitor"], - check=True, - capture_output=True, - text=True + proc = await asyncio.create_subprocess_exec( + "iw", "dev", self.interface, "set", "type", "monitor", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE ) - time.sleep(0.5) + stderr = await proc.communicate()[1] + await proc.wait() - subprocess.run( - ["ip", "link", "set", self.interface, "up"], - check=True, - capture_output=True, - text=True + if proc.returncode != 0: + error_msg = stderr.decode() if stderr else "Unknown error" + self._error_message = f"Error setting monitor mode: {error_msg}" + print(self._error_message) + return False + + await asyncio.sleep(0.5) + + proc = await asyncio.create_subprocess_exec( + "ip", "link", "set", self.interface, "up", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE ) + stderr = await proc.communicate()[1] + await proc.wait() + + if proc.returncode != 0: + error_msg = stderr.decode() if stderr else "Unknown error" + self._error_message = f"Error bringing interface up: {error_msg}" + print(self._error_message) + return False + print("Monitor mode activated") return True - except subprocess.CalledProcessError as e: + except OSError as e: self._error_message = f"Error setting monitor mode: {e}" - if hasattr(e, 'stderr') and e.stderr: - self._error_message += f"\nError details: {e.stderr}" print(self._error_message) return False - def _set_channel(self): + async def _set_channel(self): """Set channel for monitor mode interface.""" try: - subprocess.run( - ["iw", "dev", self.interface, "set", "channel", str(self.channel)], - check=True, - capture_output=True + proc = await asyncio.create_subprocess_exec( + "iw", "dev", self.interface, "set", "channel", str(self.channel), + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE ) - print(f"Channel set to {self.channel}") - return True - except subprocess.CalledProcessError as e: + stderr = await proc.communicate()[1] + await proc.wait() + + if proc.returncode == 0: + print(f"Channel set to {self.channel}") + return True + else: + print(f"Error setting channel: {stderr.decode() if stderr else 'Unknown error'}") + print("Continuing anyway - channel may not be set correctly") + return True # Non-fatal + except OSError as e: print(f"Error setting channel: {e}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal - def _verify_monitor_mode(self): + async def _verify_monitor_mode(self): """Verify monitor mode is active.""" print("\nVerifying monitor mode...") try: - result = subprocess.run( - ["iw", "dev", self.interface, "info"], - capture_output=True, - text=True, - check=True + proc = await asyncio.create_subprocess_exec( + "iw", "dev", self.interface, "info", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL ) - for line in result.stdout.splitlines(): - if "type" in line or "channel" in line: - print(f"\t{line.strip()}") + stdout, _ = await proc.communicate() + await proc.wait() + + if proc.returncode == 0: + for line in stdout.decode().splitlines(): + if "type" in line or "channel" in line: + print(f"\t{line.strip()}") except (OSError, subprocess.SubprocessError): pass @@ -261,8 +382,8 @@ class PacketParser: return phy_rate, mcs -class PacketAnalyzer: - """Analyzes captured packets and generates statistics.""" +class CaptureAnalyzer: + """Analyzes packet captures and generates statistics.""" def __init__(self, parser): self.parser = parser @@ -486,7 +607,7 @@ class PacketCapture: def __init__(self, config): self.config = config self.parser = PacketParser() - self.analyzer = PacketAnalyzer(self.parser) + self.analyzer = CaptureAnalyzer(self.parser) def capture_from_pcap(self, pcap_file): """Read and analyze packets from a PCAP file.""" @@ -515,31 +636,43 @@ class PacketCapture: def capture_live(self, interface, channel, duration): """Capture packets from a live interface.""" + return asyncio.run(self._capture_live_async(interface, channel, duration)) + + async def _capture_live_async(self, interface, channel, duration): + """Async implementation of live capture.""" print("=== Testing Monitor Mode with scapy ===") print(f"Interface: {interface}") print(f"Channel: {channel}") print(f"Duration: {duration} seconds") print() - setup = MonitorModeSetup(interface, channel) - if not setup.setup(): - if setup.error_message: - print(setup.error_message) + monitor = WiFiMonitor(interface, channel) + if not await monitor.start(): + if monitor.error_message: + print(monitor.error_message) return 1 - if not self._test_capture(interface): - return 1 + try: + if not await self._test_capture_async(interface): + return 1 - return self._main_capture(interface, duration) + return await self._main_capture_async(interface, duration) + finally: + await monitor.stop() - def _test_capture(self, interface): - """Perform a test capture to verify monitor mode.""" + async def _test_capture_async(self, interface): + """Perform a test capture to verify monitor mode (async).""" print("\nChecking Data Link Type (1 second test capture)...") print("(This may take up to 2 seconds if no packets are present)") test_packets = [] try: - sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen) + # Run blocking sniff in executor + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + lambda: sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen) + ) except OSError as e: print(f"Error during test capture (system error): {e}") return False @@ -567,8 +700,8 @@ class PacketCapture: return True - def _main_capture(self, interface, duration): - """Perform the main packet capture.""" + async def _main_capture_async(self, interface, duration): + """Perform the main packet capture (async).""" print(f"\n=== Starting scapy capture ({duration} seconds) ===") print("Press Ctrl+C to stop early\n") @@ -588,14 +721,22 @@ class PacketCapture: capture_error = None try: + # Run blocking sniff in executor + loop = asyncio.get_event_loop() if self.config.keep_pcap: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) - wrpcap(pcap_path, packets) + await loop.run_in_executor( + None, + lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) + ) + await loop.run_in_executor(None, lambda: wrpcap(pcap_path, packets)) print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") print(f"Keeping pcap file: {pcap_path}") print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)") else: - sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) + await loop.run_in_executor( + None, + lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) + ) except KeyboardInterrupt: print("\nCapture interrupted by user") except OSError as e: @@ -623,12 +764,12 @@ class ArgumentParser: """Parse command line arguments.""" args = sys.argv[1:] - if len(args) == 0: - self._print_usage() - return None - args = self._process_flags(args) + if len(args) == 0: + # Use defaults: wlan0, channel 36, 10 seconds + return self._parse_live_mode([]) + if self._is_pcap_file(args): return self._parse_pcap_mode(args) @@ -638,6 +779,7 @@ class ArgumentParser: """Print usage information.""" print("Usage:") print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") + print(" Defaults: wlan0, channel 36, 10 seconds") print(" Read pcap: python3 wifi_monitor.py ") def _process_flags(self, args):