#!/usr/bin/env python3 """ Monitor mode WiFi packet capture and analysis using scapy. Replaces the bash script with a pure Python solution. Usage: # Live capture from interface: sudo python3 wifi_monitor.py [interface] [channel] [duration_seconds] [--keep-pcap] [--full-packet] sudo python3 wifi_monitor.py wlan0 36 10 sudo python3 wifi_monitor.py wlan0 36 10 --keep-pcap sudo python3 wifi_monitor.py wlan0 36 10 --full-packet # Capture entire packets (default: header only) # Read from pcap file: python3 wifi_monitor.py python3 wifi_monitor.py /tmp/capture.pcap """ # Standard library imports import asyncio import os import subprocess import sys import tempfile import time from collections import Counter, defaultdict # Third-party imports from scapy.all import RadioTap, rdpcap, sniff, wrpcap from scapy.layers.dot11 import ( Dot11, Dot11AssoReq, Dot11AssoResp, Dot11Auth, Dot11Beacon, Dot11Deauth, Dot11Disas, Dot11ProbeReq, Dot11ProbeResp, Dot11QoS ) class Config: """Configuration class for capture settings.""" def __init__(self): self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0") self.keep_pcap = False self.full_packet = False self.default_snaplen = 256 # Header + some payload @property def snaplen(self): """Get snaplen value based on full_packet setting.""" return 0 if self.full_packet else self.default_snaplen class WiFiMonitor: """Handles WiFi interface monitor mode setup and teardown with async support.""" def __init__(self, interface, channel): self.interface = interface self.channel = channel self._error_message = None self._original_mode = None self._is_started = False @property def error_message(self): """Get error message if setup failed.""" return self._error_message @property def is_successful(self): """Check if setup was successful.""" return self._error_message is None @property def is_started(self): """Check if monitor mode is currently active.""" return self._is_started async def start(self): """Start monitor mode on the WiFi interface.""" self._error_message = None result = True print(f"=== Setting up monitor mode on {self.interface} ===") if not await self._unmanage_networkmanager(): result = False await self._unblock_wifi() if not await self._set_monitor_mode(): result = False if not await self._set_channel(): result = False await self._verify_monitor_mode() if result: self._is_started = True return result async def stop(self): """Stop monitor mode and restore interface to managed mode.""" if not self._is_started: return True print(f"=== Restoring {self.interface} to managed mode ===") result = True try: # Bring down interface proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "down", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() await asyncio.sleep(0.5) # Set back to managed mode proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "type", "managed", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" print(f"Warning: Failed to set managed mode: {error_msg}") result = False else: print("Restored to managed mode") # Re-enable NetworkManager management await self._manage_networkmanager() # Bring up interface proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "up", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() self._is_started = False except OSError as e: self._error_message = f"Error restoring managed mode: {e}" print(self._error_message) result = False return result async def _unmanage_networkmanager(self): """Try to unmanage interface from NetworkManager.""" try: proc = await asyncio.create_subprocess_exec( "which", "nmcli", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() if proc.returncode == 0: print("Unmanaging interface from NetworkManager...") proc = await asyncio.create_subprocess_exec( "nmcli", "device", "set", self.interface, "managed", "no", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass return True async def _manage_networkmanager(self): """Re-enable NetworkManager management of the interface.""" try: proc = await asyncio.create_subprocess_exec( "which", "nmcli", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() if proc.returncode == 0: proc = await asyncio.create_subprocess_exec( "nmcli", "device", "set", self.interface, "managed", "yes", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass async def _unblock_wifi(self): """Unblock WiFi if blocked by rfkill.""" try: proc = await asyncio.create_subprocess_exec( "rfkill", "unblock", "wifi", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() except OSError: pass async def _set_monitor_mode(self): """Set interface to monitor mode.""" try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "info", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() await proc.wait() if proc.returncode == 0 and b"type monitor" in stdout: print(f"Already in monitor mode") return True print(f"Setting {self.interface} to monitor mode...") proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "down", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() await asyncio.sleep(0.5) proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "type", "monitor", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" self._error_message = f"Error setting monitor mode: {error_msg}" print(self._error_message) return False await asyncio.sleep(0.5) proc = await asyncio.create_subprocess_exec( "ip", "link", "set", self.interface, "up", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" self._error_message = f"Error bringing interface up: {error_msg}" print(self._error_message) return False print("Monitor mode activated") return True except OSError as e: self._error_message = f"Error setting monitor mode: {e}" print(self._error_message) return False async def _set_channel(self): """Set channel for monitor mode interface.""" try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "set", "channel", str(self.channel), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode == 0: print(f"Channel set to {self.channel}") return True else: print(f"Error setting channel: {stderr.decode() if stderr else 'Unknown error'}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal except OSError as e: print(f"Error setting channel: {e}") print("Continuing anyway - channel may not be set correctly") return True # Non-fatal async def _verify_monitor_mode(self): """Verify monitor mode is active.""" print("\nVerifying monitor mode...") try: proc = await asyncio.create_subprocess_exec( "iw", "dev", self.interface, "info", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) stdout, _ = await proc.communicate() await proc.wait() if proc.returncode == 0: for line in stdout.decode().splitlines(): if "type" in line or "channel" in line: print(f"\t{line.strip()}") except (OSError, subprocess.SubprocessError): pass class PacketParser: """Handles parsing of 802.11 packet fields.""" @staticmethod def frame_type_name(pkt): """Get human-readable frame type name.""" if not pkt.haslayer(Dot11): return "Unknown" dot11 = pkt[Dot11] fc = dot11.FCfield if dot11.type == 0: # Management if dot11.subtype == 8: return "Beacon" elif dot11.subtype == 4: return "Probe Request" elif dot11.subtype == 5: return "Probe Response" elif dot11.subtype == 11: return "Authentication" elif dot11.subtype == 12: return "Deauthentication" elif dot11.subtype == 0: return "Association Request" elif dot11.subtype == 1: return "Association Response" elif dot11.subtype == 10: return "Disassociation" else: return f"Management ({dot11.subtype})" elif dot11.type == 1: # Control return f"Control ({dot11.subtype})" elif dot11.type == 2: # Data if dot11.subtype == 8: return "QoS Data" elif dot11.subtype == 0: return "Data" else: return f"Data ({dot11.subtype})" else: return f"Unknown ({dot11.type}/{dot11.subtype})" @staticmethod def ra_ta(pkt): """Extract RA and TA from 802.11 frame.""" if not pkt.haslayer(Dot11): return None, None dot11 = pkt[Dot11] ra = dot11.addr1 if hasattr(dot11, 'addr1') else None ta = dot11.addr2 if hasattr(dot11, 'addr2') else None if ra: ra = ra.lower() if isinstance(ra, str) else ':'.join(f'{b:02x}' for b in ra) if ta: ta = ta.lower() if isinstance(ta, str) else ':'.join(f'{b:02x}' for b in ta) return ra, ta @staticmethod def phy_info(pkt): """Extract PHY rate and MCS from packet (if available in radiotap).""" phy_rate = None mcs = None if pkt.haslayer(RadioTap): radiotap = pkt[RadioTap] if hasattr(radiotap, 'Rate'): rate_val = radiotap.Rate if rate_val: phy_rate = rate_val * 0.5 # Convert to Mbps if hasattr(radiotap, 'MCS'): mcs_data = radiotap.MCS if mcs_data and hasattr(mcs_data, 'index'): mcs = mcs_data.index return phy_rate, mcs class CaptureAnalyzer: """Analyzes packet captures and generates statistics.""" def __init__(self, parser): self.parser = parser def analyze(self, packets, duration): """Analyze captured packets and generate statistics.""" print("\n=== Capture Statistics ===") total_count = len(packets) print(f"Total packets captured: {total_count}") if total_count == 0: self._print_no_packets_message() return plcp_count = sum(1 for pkt in packets if pkt.haslayer(RadioTap)) print(f"PLCP headers: {plcp_count}") if total_count > 0: rate = total_count / duration print(f"Packet rate: {rate:.1f} packets/second") print() self._print_sample_packets(packets) self._print_ra_ta_pairs(packets) self._print_phy_histograms(packets) self._print_frame_type_breakdown(packets) self._print_data_frame_analysis(packets) self._print_server_mac_analysis(packets) self._print_summary(total_count, plcp_count) def _print_no_packets_message(self): """Print message when no packets are captured.""" print("\n(No packets captured)") print("\n=== Summary ===") print("✗ No packets captured. Check:") print(f" 1. Is there WiFi traffic on the channel?") print(f" 2. Is the interface actually in monitor mode?") print(f" 3. Try a different channel or longer duration") def _print_sample_packets(self, packets): """Print sample packets.""" print("Sample packets (first 10):") for i, pkt in enumerate(packets[:10]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = self.parser.frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = "N/A" if pkt.haslayer(Dot11): duration_val = pkt[Dot11].Duration if hasattr(pkt[Dot11], 'Duration') else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}, PLCP={plcp}{retry}") print() def _print_ra_ta_pairs(self, packets): """Print unique RA/TA pairs.""" print("Unique RA/TA pairs (with counts):") ra_ta_pairs = Counter() for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if ra or ta: ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" ra_ta_pairs[pair] += 1 if ra_ta_pairs: for pair, count in ra_ta_pairs.most_common(): print(f" {pair}: {count} frame(s)") else: print(" (no valid RA/TA pairs found)") print() def _print_phy_histograms(self, packets): """Print PHY rate and MCS histograms per RA/TA pair.""" print("PHY Rate and MCS Histograms per RA/TA pair:") rate_histograms = defaultdict(Counter) mcs_histograms = defaultdict(Counter) for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if not (ra or ta): continue if not pkt.haslayer(Dot11) or pkt[Dot11].type != 2: continue ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" pair = f"{ra_str} -> {ta_str}" phy_rate, mcs = self.parser.phy_info(pkt) if phy_rate: rate_histograms[pair][phy_rate] += 1 if mcs is not None: mcs_histograms[pair][mcs] += 1 for pair in sorted(set(list(rate_histograms.keys()) + list(mcs_histograms.keys()))): print(f"\n {pair}:") print(" PHY Rate (Mbps):") if pair in rate_histograms: for rate in sorted(rate_histograms[pair].keys()): print(f" {rate} Mbps: {rate_histograms[pair][rate]} frame(s)") else: print(" (no PHY rate data)") print(" MCS Index:") if pair in mcs_histograms: for mcs_val in sorted(mcs_histograms[pair].keys()): print(f" MCS {mcs_val}: {mcs_histograms[pair][mcs_val]} frame(s)") else: print(" (no MCS data)") print() def _print_frame_type_breakdown(self, packets): """Print frame type breakdown.""" print("Frame type breakdown:") frame_types = Counter() for pkt in packets: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: frame_types["Management"] += 1 elif dot11.type == 1: frame_types["Control"] += 1 elif dot11.type == 2: frame_types["Data"] += 1 else: frame_types["Unknown"] += 1 for frame_type, count in frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print() def _print_data_frame_analysis(self, packets): """Print data frame analysis.""" print("Data frame analysis (iperf typically uses QoS Data frames, subtype 8):") qos_data_frames = [pkt for pkt in packets if pkt.haslayer(Dot11) and pkt[Dot11].type == 2 and pkt[Dot11].subtype == 8] qos_count = len(qos_data_frames) print(f" QoS Data frames (type 2, subtype 8): {qos_count}") encrypted_count = sum(1 for pkt in qos_data_frames if pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) unencrypted_count = qos_count - encrypted_count print(f" Encrypted: {encrypted_count}") print(f" Unencrypted: {unencrypted_count}") if qos_count > 0: print(" Sample QoS Data frames (likely iperf traffic):") for i, pkt in enumerate(qos_data_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, {encrypted}, dur={duration_val}{retry}") print() def _print_server_mac_analysis(self, packets): """Print analysis of frames involving server MAC.""" server_mac = "80:84:89:93:c4:b6" print(f"Frames involving server MAC ({server_mac}):") server_frames = [] for pkt in packets: ra, ta = self.parser.ra_ta(pkt) if (ra and ra.lower() == server_mac.lower()) or (ta and ta.lower() == server_mac.lower()): server_frames.append(pkt) server_count = len(server_frames) print(f" Total frames with server MAC: {server_count}") if server_count > 0: server_frame_types = Counter() for pkt in server_frames: if pkt.haslayer(Dot11): dot11 = pkt[Dot11] if dot11.type == 0: server_frame_types["Management"] += 1 elif dot11.type == 1: server_frame_types["Control"] += 1 elif dot11.type == 2: server_frame_types["Data"] += 1 print(" Frame type breakdown:") for frame_type, count in server_frame_types.most_common(): print(f" {frame_type}: {count} frame(s)") print(" Sample frames:") for i, pkt in enumerate(server_frames[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" frame_type = self.parser.frame_type_name(pkt) encrypted = "encrypted" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x40) else "unencrypted" retry = " [retry]" if (pkt.haslayer(Dot11) and pkt[Dot11].FCfield & 0x08) else "" duration_val = pkt[Dot11].Duration if pkt.haslayer(Dot11) and hasattr(pkt[Dot11], 'Duration') else "N/A" print(f" Frame {i+1}: RA={ra_str}, TA={ta_str}, type={frame_type}, {encrypted}, dur={duration_val}{retry}") print() def _print_summary(self, total_count, plcp_count): """Print summary statistics.""" print("=== Summary ===") if total_count > 0: print(f"✓ Monitor mode is working! Captured {total_count} packet(s)") if plcp_count > 0: print(f"✓ PLCP headers detected: {plcp_count} packet(s) with radiotap information") else: print("⚠ No PLCP headers detected (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") class PacketCapture: """Handles packet capture operations.""" def __init__(self, config): self.config = config self.parser = PacketParser() self.analyzer = CaptureAnalyzer(self.parser) def capture_from_pcap(self, pcap_file): """Read and analyze packets from a PCAP file.""" print("=== Reading packets from PCAP file ===") print(f"PCAP file: {pcap_file}") print() try: packets = rdpcap(pcap_file) print(f"Loaded {len(packets)} packets from {pcap_file}") file_size = os.path.getsize(pcap_file) print(f"File size: {file_size} bytes") print() self.analyzer.analyze(packets, 1.0) return 0 except OSError as e: print(f"Error reading PCAP file: {e}") return 1 except Exception as e: print(f"Error reading PCAP file: {e}") import traceback traceback.print_exc() return 1 def capture_live(self, interface, channel, duration): """Capture packets from a live interface.""" return asyncio.run(self._capture_live_async(interface, channel, duration)) async def _capture_live_async(self, interface, channel, duration): """Async implementation of live capture.""" print("=== Testing Monitor Mode with scapy ===") print(f"Interface: {interface}") print(f"Channel: {channel}") print(f"Duration: {duration} seconds") print() monitor = WiFiMonitor(interface, channel) if not await monitor.start(): if monitor.error_message: print(monitor.error_message) return 1 try: if not await self._test_capture_async(interface): return 1 return await self._main_capture_async(interface, duration) finally: await monitor.stop() async def _test_capture_async(self, interface): """Perform a test capture to verify monitor mode (async).""" print("\nChecking Data Link Type (1 second test capture)...") print("(This may take up to 2 seconds if no packets are present)") test_packets = [] try: # Run blocking sniff in executor loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: sniff(iface=interface, prn=lambda pkt: test_packets.append(pkt), timeout=1, store=False, snaplen=self.config.snaplen) ) except OSError as e: print(f"Error during test capture (system error): {e}") return False except Exception as e: print(f"Error during test capture: {e}") return False test_count = len(test_packets) test_plcp = sum(1 for pkt in test_packets if pkt.haslayer(RadioTap)) if test_count > 0: print("Sample packets:") for i, pkt in enumerate(test_packets[:5]): ra, ta = self.parser.ra_ta(pkt) ra_str = ra if ra else "N/A" ta_str = ta if ta else "N/A" plcp = "yes" if pkt.haslayer(RadioTap) else "no" print(f" Frame {i+1 if i > 0 else test_count}: RA={ra_str}, TA={ta_str}, PLCP={plcp}") print(f"\nTest capture results:") print(f" Packets captured: {test_count}") print(f" PLCP headers: {test_plcp}") if test_plcp == 0 and test_count > 0: print(" Note: Packets captured but no radiotap headers (may be using DLT_IEEE802_11 instead of DLT_IEEE802_11_RADIO)") return True async def _main_capture_async(self, interface, duration): """Perform the main packet capture (async).""" print(f"\n=== Starting scapy capture ({duration} seconds) ===") print("Press Ctrl+C to stop early\n") if self.config.full_packet: print("Capturing full packets...") else: print(f"Capturing packets (snaplen={self.config.snaplen} bytes, header + some payload)...") packets = [] pcap_path = None if self.config.keep_pcap: pcap_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='scapy_capture_') pcap_path = pcap_file.name pcap_file.close() print(f"Capturing to file: {pcap_path}") capture_error = None try: # Run blocking sniff in executor loop = asyncio.get_event_loop() if self.config.keep_pcap: await loop.run_in_executor( None, lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) ) await loop.run_in_executor(None, lambda: wrpcap(pcap_path, packets)) print(f"Pcap file size: {os.path.getsize(pcap_path)} bytes") print(f"Keeping pcap file: {pcap_path}") print(f" (Use: python3 wifi_monitor.py {pcap_path} to analyze)") else: await loop.run_in_executor( None, lambda: sniff(iface=interface, prn=lambda pkt: packets.append(pkt), timeout=duration, store=True, snaplen=self.config.snaplen) ) except KeyboardInterrupt: print("\nCapture interrupted by user") except OSError as e: capture_error = e print(f"\nError during capture (system error): {e}") except Exception as e: capture_error = e print(f"\nError during capture: {e}") import traceback traceback.print_exc() self.analyzer.analyze(packets, duration) if capture_error: return 1 return 0 class ArgumentParser: """Handles command line argument parsing.""" def __init__(self, config): self.config = config def parse(self): """Parse command line arguments.""" args = sys.argv[1:] args = self._process_flags(args) if len(args) == 0: # Use defaults: wlan0, channel 36, 10 seconds return self._parse_live_mode([]) if self._is_pcap_file(args): return self._parse_pcap_mode(args) return self._parse_live_mode(args) def _print_usage(self): """Print usage information.""" print("Usage:") print(" Live capture: sudo python3 wifi_monitor.py [interface] [channel] [duration] [--keep-pcap] [--full-packet]") print(" Defaults: wlan0, channel 36, 10 seconds") print(" Read pcap: python3 wifi_monitor.py ") def _process_flags(self, args): """Process command line flags.""" if "--keep-pcap" in args or "-k" in args: self.config.keep_pcap = True args = [a for a in args if a not in ["--keep-pcap", "-k"]] if "--full-packet" in args or "-f" in args: self.config.full_packet = True args = [a for a in args if a not in ["--full-packet", "-f"]] return args def _is_pcap_file(self, args): """Check if first argument is a PCAP file.""" return len(args) > 0 and (args[0].endswith('.pcap') or args[0].endswith('.cap')) def _parse_pcap_mode(self, args): """Parse arguments for PCAP file mode.""" pcap_file = args[0] if not os.path.isfile(pcap_file): print(f"Error: PCAP file not found: {pcap_file}") return None return ('pcap', pcap_file, None, None) def _parse_live_mode(self, args): """Parse arguments for live capture mode.""" if len(args) > 0: if args[0].startswith("wl") and len(args[0]) <= 6: self.config.wifi_interface = args[0] channel = int(args[1]) if len(args) > 1 else 36 duration = int(args[2]) if len(args) > 2 else 10 else: channel = int(args[0]) if len(args) > 0 else 36 duration = int(args[1]) if len(args) > 1 else 10 else: channel = 36 duration = 10 return ('live', self.config.wifi_interface, channel, duration) def main(): """Main function with single exit point.""" exit_code = 0 config = Config() arg_parser = ArgumentParser(config) parsed_args = arg_parser.parse() if parsed_args is None: exit_code = 1 else: mode, interface_or_pcap, channel, duration = parsed_args capture = PacketCapture(config) if mode == 'pcap': exit_code = capture.capture_from_pcap(interface_or_pcap) else: exit_code = capture.capture_live(interface_or_pcap, channel, duration) return exit_code if __name__ == "__main__": sys.exit(main())