Add --tcpdump-capture (-T) for high-rate capture; document scapy drops

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Robert McMahon 2026-02-16 17:42:16 -08:00
parent c192d911df
commit ed6a2cd328
1 changed files with 90 additions and 3 deletions

View File

@ -12,6 +12,9 @@ Usage:
# Read from pcap file: # Read from pcap file:
python3 wifi_monitor.py --pcap /tmp/capture.pcap python3 wifi_monitor.py --pcap /tmp/capture.pcap
python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility python3 wifi_monitor.py /tmp/capture.pcap # Also supported for backward compatibility
# High-rate traffic (e.g. iperf): capture with tcpdump to avoid scapy drops, then analyze:
sudo python3 wifi_monitor.py -i wlan0 -c 11 -t 10 --tcpdump-capture
""" """
# Standard library imports # Standard library imports
@ -40,6 +43,7 @@ class Config:
self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0") self.wifi_interface = os.environ.get("WIFI_INTERFACE", "wlan0")
self.keep_pcap = False self.keep_pcap = False
self.full_packet = False self.full_packet = False
self.tcpdump_capture = False # Use tcpdump for capture (recommended for high packet rates)
self.default_snaplen = 256 # Header + some payload self.default_snaplen = 256 # Header + some payload
@property @property
@ -680,8 +684,12 @@ class PacketCapture:
self.parser = PacketParser() self.parser = PacketParser()
self.analyzer = CaptureAnalyzer(self.parser) self.analyzer = CaptureAnalyzer(self.parser)
def capture_from_pcap(self, pcap_file): def capture_from_pcap(self, pcap_file, duration_sec=None):
"""Read and analyze packets from a PCAP file.""" """Read and analyze packets from a PCAP file.
duration_sec: If set (e.g. when file was captured with -t N), used for packet rate.
If None, rate uses 1.0 second.
"""
print("=== Reading packets from PCAP file ===") print("=== Reading packets from PCAP file ===")
print(f"PCAP file: {pcap_file}") print(f"PCAP file: {pcap_file}")
print() print()
@ -694,7 +702,8 @@ class PacketCapture:
print(f"File size: {file_size} bytes") print(f"File size: {file_size} bytes")
print() print()
self.analyzer.analyze(packets, 1.0, None) duration = duration_sec if duration_sec is not None and duration_sec > 0 else 1.0
self.analyzer.analyze(packets, duration, None)
return 0 return 0
except OSError as e: except OSError as e:
print(f"Error reading PCAP file: {e}") print(f"Error reading PCAP file: {e}")
@ -727,6 +736,8 @@ class PacketCapture:
if not await self._test_capture_async(interface): if not await self._test_capture_async(interface):
return 1 return 1
if self.config.tcpdump_capture:
return await self._capture_with_tcpdump_async(interface, duration)
return await self._main_capture_async(interface, duration) return await self._main_capture_async(interface, duration)
finally: finally:
await monitor.stop() await monitor.stop()
@ -868,10 +879,77 @@ class PacketCapture:
traceback.print_exc() traceback.print_exc()
return None return None
async def _capture_with_tcpdump_async(self, interface, duration):
"""Capture using tcpdump (full line rate), then analyze with scapy.
Use this for high packet rates where scapy sniff() would drop packets.
"""
print(f"\n=== Capturing with tcpdump ({duration} seconds) ===")
print("(tcpdump captures at line rate; analysis runs afterward)\n")
keep = self.config.keep_pcap
if keep:
pcap_f = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='wifi_monitor_')
pcap_path = pcap_f.name
pcap_f.close()
print(f"Output: {pcap_path}")
else:
pcap_f = tempfile.NamedTemporaryFile(delete=False, suffix='.pcap', prefix='wifi_monitor_')
pcap_path = pcap_f.name
pcap_f.close()
try:
proc = await asyncio.create_subprocess_exec(
"tcpdump",
"-i", interface,
"-w", pcap_path,
"-n",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(duration)
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
if not os.path.isfile(pcap_path) or os.path.getsize(pcap_path) == 0:
print("Error: tcpdump produced no capture file or empty file.")
return 1
packets = rdpcap(pcap_path)
print(f"Captured {len(packets)} packets. Analyzing...\n")
self.analyzer.analyze(packets, duration, None)
if keep:
print(f"Kept pcap: {pcap_path}")
print(f" (Use: python3 wifi_monitor.py {pcap_path} to re-analyze)")
else:
try:
os.unlink(pcap_path)
except OSError:
pass
return 0
except FileNotFoundError:
print("Error: tcpdump not found. Install tcpdump or use default scapy capture.")
return 1
except Exception as e:
print(f"Error during tcpdump capture: {e}")
import traceback
traceback.print_exc()
if not keep and os.path.isfile(pcap_path):
try:
os.unlink(pcap_path)
except OSError:
pass
return 1
async def _main_capture_async(self, interface, duration): async def _main_capture_async(self, interface, duration):
"""Perform the main packet capture (async).""" """Perform the main packet capture (async)."""
print(f"\n=== Starting scapy capture ({duration} seconds) ===") print(f"\n=== Starting scapy capture ({duration} seconds) ===")
print("Press Ctrl+C to stop early\n") print("Press Ctrl+C to stop early\n")
print("Note: At high packet rates scapy may drop packets. Use --tcpdump-capture (-T) for full-rate capture.\n")
if self.config.full_packet: if self.config.full_packet:
print("Capturing full packets...") print("Capturing full packets...")
@ -978,6 +1056,9 @@ Examples:
# Save captured packets to pcap file: # Save captured packets to pcap file:
sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap sudo python3 wifi_monitor.py -i wlan0 -c 36 -t 10 --keep-pcap
# High-rate traffic (e.g. iperf): capture with tcpdump to avoid drops:
sudo python3 wifi_monitor.py -i wlan0 -c 11 -t 10 --tcpdump-capture
# Analyze existing pcap file: # Analyze existing pcap file:
python3 wifi_monitor.py --pcap /tmp/capture.pcap python3 wifi_monitor.py --pcap /tmp/capture.pcap
python3 wifi_monitor.py /tmp/capture.pcap # Also supported python3 wifi_monitor.py /tmp/capture.pcap # Also supported
@ -1024,6 +1105,11 @@ Examples:
action='store_true', action='store_true',
help='Capture full packets (default: header only, not supported for live capture)' help='Capture full packets (default: header only, not supported for live capture)'
) )
parser.add_argument(
'--tcpdump-capture', '-T',
action='store_true',
help='Use tcpdump for capture (recommended for high packet rates; avoids scapy drops)'
)
# Backward compatibility: if first positional arg looks like a pcap file, use it # Backward compatibility: if first positional arg looks like a pcap file, use it
args, unknown = parser.parse_known_args() args, unknown = parser.parse_known_args()
@ -1041,6 +1127,7 @@ Examples:
# Apply flags to config # Apply flags to config
self.config.keep_pcap = args.keep_pcap self.config.keep_pcap = args.keep_pcap
self.config.full_packet = args.full_packet self.config.full_packet = args.full_packet
self.config.tcpdump_capture = args.tcpdump_capture
# Determine mode # Determine mode
if args.pcap: if args.pcap: