#!/usr/bin/env python3 """ Raspberry Pi 5 WiFi Monitor - Capture RA/TA Addresses Uses scapy to parse 802.11 frames and display RA/TA addresses clearly Requirements: sudo apt-get install python3-pip sudo pip3 install scapy Usage: sudo python3 rpi_capture_ra_ta_python.py [channel] [filter_mac] Example: sudo python3 rpi_capture_ra_ta_python.py 11 sudo python3 rpi_capture_ra_ta_python.py 11 80:84:89:93:c4:b6 """ import sys import os import subprocess import signal from datetime import datetime from scapy.all import * from scapy.layers.dot11 import Dot11, Dot11QoS # Configuration WIFI_INTERFACE = "wlan0" CHANNEL = int(sys.argv[1]) if len(sys.argv) > 1 else 11 FILTER_MAC = sys.argv[2] if len(sys.argv) > 2 else None def setup_monitor_mode(): """Set WiFi interface to monitor mode""" print(f"=== Setting up monitor mode on {WIFI_INTERFACE} ===") # Check current mode try: result = subprocess.run( ["iw", "dev", WIFI_INTERFACE, "info"], capture_output=True, text=True, check=True ) if "type monitor" in result.stdout: print(f"Already in monitor mode") else: print(f"Setting {WIFI_INTERFACE} to monitor mode...") subprocess.run(["ip", "link", "set", WIFI_INTERFACE, "down"], check=True) subprocess.run(["iw", "dev", WIFI_INTERFACE, "set", "type", "monitor"], check=True) subprocess.run(["ip", "link", "set", WIFI_INTERFACE, "up"], check=True) print("Monitor mode activated") except subprocess.CalledProcessError as e: print(f"Error setting monitor mode: {e}") sys.exit(1) # Set channel try: subprocess.run(["iw", "dev", WIFI_INTERFACE, "set", "channel", str(CHANNEL)], check=True) print(f"Channel set to {CHANNEL}") except subprocess.CalledProcessError as e: print(f"Error setting channel: {e}") sys.exit(1) def get_frame_type_name(dot11): """Get human-readable frame type name""" type_names = { 0: { # Management 0: "Association Request", 1: "Association Response", 2: "Reassociation Request", 3: "Reassociation Response", 4: "Probe Request", 5: "Probe Response", 8: "Beacon", 10: "Disassociation", 11: "Authentication", 12: "Deauthentication" }, 1: { # Control 10: "RTS", 11: "CTS", 12: "ACK", 13: "CF-End", 14: "CF-End+CF-Ack" }, 2: { # Data 0: "Data", 1: "Data+CF-Ack", 2: "Data+CF-Poll", 3: "Data+CF-Ack+CF-Poll", 4: "Null", 8: "QoS Data", 9: "QoS Data+CF-Ack", 10: "QoS Data+CF-Poll", 11: "QoS Data+CF-Ack+CF-Poll", 12: "QoS Null" } } return type_names.get(dot11.type, {}).get(dot11.subtype, f"Type{dot11.type}/Subtype{dot11.subtype}") def parse_80211_frame(pkt): """Parse 802.11 frame and extract RA/TA addresses""" if not pkt.haslayer(Dot11): return None dot11 = pkt[Dot11] # 802.11 addressing: # For Data frames (To DS=1, From DS=1): # addr1 = RA (Receiver Address) = Next hop destination # addr2 = TA (Transmitter Address) = Transmitting station # addr3 = DA (Destination Address) = Final destination # addr4 = SA (Source Address) = Original source # For Data frames (To DS=0, From DS=1): AP to STA # addr1 = RA = DA (Destination STA) # addr2 = TA = SA (Source AP) # addr3 = BSSID # For Data frames (To DS=1, From DS=0): STA to AP # addr1 = RA = BSSID (AP) # addr2 = TA = SA (Source STA) # addr3 = DA (Destination) frame_type = dot11.type frame_subtype = dot11.subtype # Get addresses addr1 = dot11.addr1 if dot11.addr1 else "N/A" addr2 = dot11.addr2 if dot11.addr2 else "N/A" addr3 = dot11.addr3 if dot11.addr3 else "N/A" addr4 = dot11.addr4 if hasattr(dot11, 'addr4') and dot11.addr4 else None # Extract To DS and From DS flags to_ds = dot11.FCfield & 0x1 from_ds = (dot11.FCfield >> 1) & 0x1 # Determine RA and TA based on frame type if frame_type == 2: # Data frame # For data frames: # addr1 is always RA (receiver) # addr2 is always TA (transmitter) ra = addr1 ta = addr2 # Additional context based on To DS / From DS if to_ds and from_ds: # WDS frame: addr3=DA, addr4=SA context = f"WDS: DA={addr3}, SA={addr4 if addr4 else 'N/A'}" elif to_ds and not from_ds: # STA to AP: addr3=DA context = f"STA→AP: DA={addr3}" elif not to_ds and from_ds: # AP to STA: addr3=BSSID context = f"AP→STA: BSSID={addr3}" else: # Ad-hoc: addr3=BSSID context = f"Ad-hoc: BSSID={addr3}" else: # For management/control frames, addr1=DA, addr2=SA ra = addr1 # Receiver/Destination ta = addr2 # Transmitter/Source context = f"BSSID={addr3}" if addr3 != "N/A" else "" # Get RSSI if available rssi = "N/A" if hasattr(pkt, "dBm_AntSignal"): rssi = f"{pkt.dBm_AntSignal} dBm" elif hasattr(pkt, "notdecoded"): # Try to extract from radiotap header if present pass # Check for QoS data is_qos = pkt.haslayer(Dot11QoS) qos_info = "" if is_qos: qos = pkt[Dot11QoS] tid = qos.TID if hasattr(qos, 'TID') else "N/A" qos_info = f", TID={tid}" return { "type": frame_type, "subtype": frame_subtype, "name": get_frame_type_name(dot11), "ra": ra, "ta": ta, "bssid": addr3, "context": context, "rssi": rssi, "len": len(pkt), "qos": is_qos, "qos_info": qos_info, "retry": bool(dot11.FCfield & 0x8) if hasattr(dot11, 'FCfield') else False } # Statistics stats = { "total": 0, "by_ta": {}, "by_ra": {}, "by_type": {} } def packet_handler(pkt): """Handle captured packets""" frame_info = parse_80211_frame(pkt) if not frame_info: return # Apply MAC filter if specified if FILTER_MAC: filter_mac_clean = FILTER_MAC.lower().replace(":", "").replace("-", "") ta_clean = frame_info["ta"].replace(":", "").replace("-", "").lower() if frame_info["ta"] != "N/A" else "" ra_clean = frame_info["ra"].replace(":", "").replace("-", "").lower() if frame_info["ra"] != "N/A" else "" if filter_mac_clean.lower() not in ta_clean and filter_mac_clean.lower() not in ra_clean: return # Skip this frame # Update statistics stats["total"] += 1 if frame_info["ta"] != "N/A": stats["by_ta"][frame_info["ta"]] = stats["by_ta"].get(frame_info["ta"], 0) + 1 if frame_info["ra"] != "N/A": stats["by_ra"][frame_info["ra"]] = stats["by_ra"].get(frame_info["ra"], 0) + 1 frame_type_key = f"{frame_info['name']}" stats["by_type"][frame_type_key] = stats["by_type"].get(frame_type_key, 0) + 1 # Print frame information timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] retry_str = " [RETRY]" if frame_info["retry"] else "" print(f"\n[{timestamp}] {frame_info['name']}{retry_str}") print(f" RA (Receiver): {frame_info['ra']}") print(f" TA (Transmitter): {frame_info['ta']}") if frame_info['bssid'] != "N/A": print(f" BSSID: {frame_info['bssid']}") if frame_info['context']: print(f" Context: {frame_info['context']}") print(f" RSSI: {frame_info['rssi']}") print(f" Length: {frame_info['len']} bytes{frame_info['qos_info']}") def signal_handler(sig, frame): """Handle Ctrl+C gracefully""" print("\n\nStopping capture...") sys.exit(0) def main(): """Main function""" print(f"=== Raspberry Pi 5 WiFi Monitor - RA/TA Capture ===") print(f"Interface: {WIFI_INTERFACE}") print(f"Channel: {CHANNEL}") if FILTER_MAC: print(f"Filter: {FILTER_MAC}") print("") # Check if running as root if os.geteuid() != 0: print("Error: This script must be run as root (use sudo)") sys.exit(1) # Setup monitor mode setup_monitor_mode() # Register signal handler signal.signal(signal.SIGINT, signal_handler) print("\n=== Starting Capture (showing RA/TA addresses) ===") print("Press Ctrl+C to stop\n") # Build filter bpf_filter = None if FILTER_MAC: bpf_filter = f"ether host {FILTER_MAC}" # Start capturing try: sniff( iface=WIFI_INTERFACE, prn=packet_handler, filter=bpf_filter, store=False ) except KeyboardInterrupt: print("\n\n" + "="*60) print("Capture stopped by user") print("="*60) print(f"\nStatistics:") print(f" Total frames captured: {stats['total']}") print(f"\n Top 5 TAs (Transmitters):") sorted_tas = sorted(stats['by_ta'].items(), key=lambda x: x[1], reverse=True)[:5] for ta, count in sorted_tas: print(f" {ta}: {count} frames") print(f"\n Top 5 RAs (Receivers):") sorted_ras = sorted(stats['by_ra'].items(), key=lambda x: x[1], reverse=True)[:5] for ra, count in sorted_ras: print(f" {ra}: {count} frames") print(f"\n Frame types:") for ftype, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True): print(f" {ftype}: {count}") except Exception as e: print(f"\nError during capture: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()