308 lines
9.9 KiB
Python
Executable File
308 lines
9.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Raspberry Pi 5 WiFi Monitor - Capture RA/TA Addresses
|
|
Uses scapy to parse 802.11 frames and display RA/TA addresses clearly
|
|
|
|
Requirements:
|
|
sudo apt-get install python3-pip
|
|
sudo pip3 install scapy
|
|
|
|
Usage:
|
|
sudo python3 rpi_capture_ra_ta_python.py [channel] [filter_mac]
|
|
|
|
Example:
|
|
sudo python3 rpi_capture_ra_ta_python.py 11
|
|
sudo python3 rpi_capture_ra_ta_python.py 11 80:84:89:93:c4:b6
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import signal
|
|
from datetime import datetime
|
|
from scapy.all import *
|
|
from scapy.layers.dot11 import Dot11, Dot11QoS
|
|
|
|
# Configuration
|
|
WIFI_INTERFACE = "wlan0"
|
|
CHANNEL = int(sys.argv[1]) if len(sys.argv) > 1 else 11
|
|
FILTER_MAC = sys.argv[2] if len(sys.argv) > 2 else None
|
|
|
|
def setup_monitor_mode():
|
|
"""Set WiFi interface to monitor mode"""
|
|
print(f"=== Setting up monitor mode on {WIFI_INTERFACE} ===")
|
|
|
|
# Check current mode
|
|
try:
|
|
result = subprocess.run(
|
|
["iw", "dev", WIFI_INTERFACE, "info"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
if "type monitor" in result.stdout:
|
|
print(f"Already in monitor mode")
|
|
else:
|
|
print(f"Setting {WIFI_INTERFACE} to monitor mode...")
|
|
subprocess.run(["ip", "link", "set", WIFI_INTERFACE, "down"], check=True)
|
|
subprocess.run(["iw", "dev", WIFI_INTERFACE, "set", "type", "monitor"], check=True)
|
|
subprocess.run(["ip", "link", "set", WIFI_INTERFACE, "up"], check=True)
|
|
print("Monitor mode activated")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error setting monitor mode: {e}")
|
|
sys.exit(1)
|
|
|
|
# Set channel
|
|
try:
|
|
subprocess.run(["iw", "dev", WIFI_INTERFACE, "set", "channel", str(CHANNEL)], check=True)
|
|
print(f"Channel set to {CHANNEL}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error setting channel: {e}")
|
|
sys.exit(1)
|
|
|
|
def get_frame_type_name(dot11):
|
|
"""Get human-readable frame type name"""
|
|
type_names = {
|
|
0: { # Management
|
|
0: "Association Request",
|
|
1: "Association Response",
|
|
2: "Reassociation Request",
|
|
3: "Reassociation Response",
|
|
4: "Probe Request",
|
|
5: "Probe Response",
|
|
8: "Beacon",
|
|
10: "Disassociation",
|
|
11: "Authentication",
|
|
12: "Deauthentication"
|
|
},
|
|
1: { # Control
|
|
10: "RTS",
|
|
11: "CTS",
|
|
12: "ACK",
|
|
13: "CF-End",
|
|
14: "CF-End+CF-Ack"
|
|
},
|
|
2: { # Data
|
|
0: "Data",
|
|
1: "Data+CF-Ack",
|
|
2: "Data+CF-Poll",
|
|
3: "Data+CF-Ack+CF-Poll",
|
|
4: "Null",
|
|
8: "QoS Data",
|
|
9: "QoS Data+CF-Ack",
|
|
10: "QoS Data+CF-Poll",
|
|
11: "QoS Data+CF-Ack+CF-Poll",
|
|
12: "QoS Null"
|
|
}
|
|
}
|
|
return type_names.get(dot11.type, {}).get(dot11.subtype,
|
|
f"Type{dot11.type}/Subtype{dot11.subtype}")
|
|
|
|
def parse_80211_frame(pkt):
|
|
"""Parse 802.11 frame and extract RA/TA addresses"""
|
|
if not pkt.haslayer(Dot11):
|
|
return None
|
|
|
|
dot11 = pkt[Dot11]
|
|
|
|
# 802.11 addressing:
|
|
# For Data frames (To DS=1, From DS=1):
|
|
# addr1 = RA (Receiver Address) = Next hop destination
|
|
# addr2 = TA (Transmitter Address) = Transmitting station
|
|
# addr3 = DA (Destination Address) = Final destination
|
|
# addr4 = SA (Source Address) = Original source
|
|
|
|
# For Data frames (To DS=0, From DS=1): AP to STA
|
|
# addr1 = RA = DA (Destination STA)
|
|
# addr2 = TA = SA (Source AP)
|
|
# addr3 = BSSID
|
|
|
|
# For Data frames (To DS=1, From DS=0): STA to AP
|
|
# addr1 = RA = BSSID (AP)
|
|
# addr2 = TA = SA (Source STA)
|
|
# addr3 = DA (Destination)
|
|
|
|
frame_type = dot11.type
|
|
frame_subtype = dot11.subtype
|
|
|
|
# Get addresses
|
|
addr1 = dot11.addr1 if dot11.addr1 else "N/A"
|
|
addr2 = dot11.addr2 if dot11.addr2 else "N/A"
|
|
addr3 = dot11.addr3 if dot11.addr3 else "N/A"
|
|
addr4 = dot11.addr4 if hasattr(dot11, 'addr4') and dot11.addr4 else None
|
|
|
|
# Extract To DS and From DS flags
|
|
to_ds = dot11.FCfield & 0x1
|
|
from_ds = (dot11.FCfield >> 1) & 0x1
|
|
|
|
# Determine RA and TA based on frame type
|
|
if frame_type == 2: # Data frame
|
|
# For data frames:
|
|
# addr1 is always RA (receiver)
|
|
# addr2 is always TA (transmitter)
|
|
ra = addr1
|
|
ta = addr2
|
|
|
|
# Additional context based on To DS / From DS
|
|
if to_ds and from_ds:
|
|
# WDS frame: addr3=DA, addr4=SA
|
|
context = f"WDS: DA={addr3}, SA={addr4 if addr4 else 'N/A'}"
|
|
elif to_ds and not from_ds:
|
|
# STA to AP: addr3=DA
|
|
context = f"STA→AP: DA={addr3}"
|
|
elif not to_ds and from_ds:
|
|
# AP to STA: addr3=BSSID
|
|
context = f"AP→STA: BSSID={addr3}"
|
|
else:
|
|
# Ad-hoc: addr3=BSSID
|
|
context = f"Ad-hoc: BSSID={addr3}"
|
|
else:
|
|
# For management/control frames, addr1=DA, addr2=SA
|
|
ra = addr1 # Receiver/Destination
|
|
ta = addr2 # Transmitter/Source
|
|
context = f"BSSID={addr3}" if addr3 != "N/A" else ""
|
|
|
|
# Get RSSI if available
|
|
rssi = "N/A"
|
|
if hasattr(pkt, "dBm_AntSignal"):
|
|
rssi = f"{pkt.dBm_AntSignal} dBm"
|
|
elif hasattr(pkt, "notdecoded"):
|
|
# Try to extract from radiotap header if present
|
|
pass
|
|
|
|
# Check for QoS data
|
|
is_qos = pkt.haslayer(Dot11QoS)
|
|
qos_info = ""
|
|
if is_qos:
|
|
qos = pkt[Dot11QoS]
|
|
tid = qos.TID if hasattr(qos, 'TID') else "N/A"
|
|
qos_info = f", TID={tid}"
|
|
|
|
return {
|
|
"type": frame_type,
|
|
"subtype": frame_subtype,
|
|
"name": get_frame_type_name(dot11),
|
|
"ra": ra,
|
|
"ta": ta,
|
|
"bssid": addr3,
|
|
"context": context,
|
|
"rssi": rssi,
|
|
"len": len(pkt),
|
|
"qos": is_qos,
|
|
"qos_info": qos_info,
|
|
"retry": bool(dot11.FCfield & 0x8) if hasattr(dot11, 'FCfield') else False
|
|
}
|
|
|
|
# Statistics
|
|
stats = {
|
|
"total": 0,
|
|
"by_ta": {},
|
|
"by_ra": {},
|
|
"by_type": {}
|
|
}
|
|
|
|
def packet_handler(pkt):
|
|
"""Handle captured packets"""
|
|
frame_info = parse_80211_frame(pkt)
|
|
if not frame_info:
|
|
return
|
|
|
|
# Apply MAC filter if specified
|
|
if FILTER_MAC:
|
|
filter_mac_clean = FILTER_MAC.lower().replace(":", "").replace("-", "")
|
|
ta_clean = frame_info["ta"].replace(":", "").replace("-", "").lower() if frame_info["ta"] != "N/A" else ""
|
|
ra_clean = frame_info["ra"].replace(":", "").replace("-", "").lower() if frame_info["ra"] != "N/A" else ""
|
|
|
|
if filter_mac_clean.lower() not in ta_clean and filter_mac_clean.lower() not in ra_clean:
|
|
return # Skip this frame
|
|
|
|
# Update statistics
|
|
stats["total"] += 1
|
|
if frame_info["ta"] != "N/A":
|
|
stats["by_ta"][frame_info["ta"]] = stats["by_ta"].get(frame_info["ta"], 0) + 1
|
|
if frame_info["ra"] != "N/A":
|
|
stats["by_ra"][frame_info["ra"]] = stats["by_ra"].get(frame_info["ra"], 0) + 1
|
|
frame_type_key = f"{frame_info['name']}"
|
|
stats["by_type"][frame_type_key] = stats["by_type"].get(frame_type_key, 0) + 1
|
|
|
|
# Print frame information
|
|
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
retry_str = " [RETRY]" if frame_info["retry"] else ""
|
|
|
|
print(f"\n[{timestamp}] {frame_info['name']}{retry_str}")
|
|
print(f" RA (Receiver): {frame_info['ra']}")
|
|
print(f" TA (Transmitter): {frame_info['ta']}")
|
|
if frame_info['bssid'] != "N/A":
|
|
print(f" BSSID: {frame_info['bssid']}")
|
|
if frame_info['context']:
|
|
print(f" Context: {frame_info['context']}")
|
|
print(f" RSSI: {frame_info['rssi']}")
|
|
print(f" Length: {frame_info['len']} bytes{frame_info['qos_info']}")
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle Ctrl+C gracefully"""
|
|
print("\n\nStopping capture...")
|
|
sys.exit(0)
|
|
|
|
def main():
|
|
"""Main function"""
|
|
print(f"=== Raspberry Pi 5 WiFi Monitor - RA/TA Capture ===")
|
|
print(f"Interface: {WIFI_INTERFACE}")
|
|
print(f"Channel: {CHANNEL}")
|
|
if FILTER_MAC:
|
|
print(f"Filter: {FILTER_MAC}")
|
|
print("")
|
|
|
|
# Check if running as root
|
|
if os.geteuid() != 0:
|
|
print("Error: This script must be run as root (use sudo)")
|
|
sys.exit(1)
|
|
|
|
# Setup monitor mode
|
|
setup_monitor_mode()
|
|
|
|
# Register signal handler
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
print("\n=== Starting Capture (showing RA/TA addresses) ===")
|
|
print("Press Ctrl+C to stop\n")
|
|
|
|
# Build filter
|
|
bpf_filter = None
|
|
if FILTER_MAC:
|
|
bpf_filter = f"ether host {FILTER_MAC}"
|
|
|
|
# Start capturing
|
|
try:
|
|
sniff(
|
|
iface=WIFI_INTERFACE,
|
|
prn=packet_handler,
|
|
filter=bpf_filter,
|
|
store=False
|
|
)
|
|
except KeyboardInterrupt:
|
|
print("\n\n" + "="*60)
|
|
print("Capture stopped by user")
|
|
print("="*60)
|
|
print(f"\nStatistics:")
|
|
print(f" Total frames captured: {stats['total']}")
|
|
print(f"\n Top 5 TAs (Transmitters):")
|
|
sorted_tas = sorted(stats['by_ta'].items(), key=lambda x: x[1], reverse=True)[:5]
|
|
for ta, count in sorted_tas:
|
|
print(f" {ta}: {count} frames")
|
|
print(f"\n Top 5 RAs (Receivers):")
|
|
sorted_ras = sorted(stats['by_ra'].items(), key=lambda x: x[1], reverse=True)[:5]
|
|
for ra, count in sorted_ras:
|
|
print(f" {ra}: {count} frames")
|
|
print(f"\n Frame types:")
|
|
for ftype, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {ftype}: {count}")
|
|
except Exception as e:
|
|
print(f"\nError during capture: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|