390 lines
15 KiB
Python
Executable File
390 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 WiFi Configuration Tool - Static IP with auto-disable DHCP and CSI control
|
|
"""
|
|
|
|
import serial
|
|
import time
|
|
import sys
|
|
import argparse
|
|
|
|
def log_verbose(message, verbose=False):
|
|
"""Print message only if verbose is enabled"""
|
|
if verbose:
|
|
print(f"[VERBOSE] {message}")
|
|
|
|
def config_device(port, ip, ssid="ClubHouse2G", password="ez2remember",
|
|
gateway="192.168.1.1", netmask="255.255.255.0",
|
|
band="2.4G", bandwidth="HT20", powersave="NONE",
|
|
mode="STA", monitor_channel=36, csi_enable=False,
|
|
reboot=True, verbose=False):
|
|
"""Configure ESP32 device via serial with static IP and CSI control"""
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"ESP32 WiFi Configuration (Static IP + Mode + CSI)")
|
|
print(f"{'='*70}")
|
|
print(f"Port: {port}")
|
|
print(f"SSID: {ssid}")
|
|
print(f"Password: {'*' * len(password)}")
|
|
print(f"IP: {ip} (DHCP disabled)")
|
|
print(f"Gateway: {gateway}")
|
|
print(f"Netmask: {netmask}")
|
|
print(f"Mode: {mode}")
|
|
if mode == "MONITOR":
|
|
print(f"Mon Ch: {monitor_channel}")
|
|
print(f"Band: {band}")
|
|
print(f"Bandwidth: {bandwidth}")
|
|
print(f"PowerSave: {powersave}")
|
|
print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}")
|
|
print(f"Reboot: {'Yes' if reboot else 'No'}")
|
|
print(f"Verbose: {verbose}")
|
|
print(f"{'='*70}\n")
|
|
|
|
try:
|
|
# Open serial connection
|
|
log_verbose(f"Opening serial port {port} at 115200 baud...", verbose)
|
|
ser = serial.Serial(port, 115200, timeout=0.5, write_timeout=0.5)
|
|
log_verbose(f"Serial port opened successfully", verbose)
|
|
log_verbose(f"Port settings: {ser}", verbose)
|
|
|
|
time.sleep(0.2)
|
|
|
|
# Check if there's any data waiting
|
|
if ser.in_waiting:
|
|
log_verbose(f"{ser.in_waiting} bytes waiting in buffer", verbose)
|
|
existing = ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
|
|
log_verbose(f"Existing data: {existing[:100]}", verbose)
|
|
|
|
# Build config message
|
|
# DHCP is always disabled (0) when IP address is provided
|
|
config_lines = [
|
|
"CFG",
|
|
f"SSID:{ssid}",
|
|
f"PASS:{password}",
|
|
f"IP:{ip}",
|
|
f"MASK:{netmask}",
|
|
f"GW:{gateway}",
|
|
"DHCP:0", # Always disabled for static IP
|
|
f"BAND:{band}",
|
|
f"BW:{bandwidth}",
|
|
f"POWERSAVE:{powersave}",
|
|
f"MODE:{mode}",
|
|
f"MON_CH:{monitor_channel}",
|
|
f"CSI:{'1' if csi_enable else '0'}",
|
|
"END"
|
|
]
|
|
|
|
config = '\n'.join(config_lines) + '\n'
|
|
|
|
log_verbose(f"Config message size: {len(config)} bytes", verbose)
|
|
if verbose:
|
|
print("[VERBOSE] Config message:")
|
|
for line in config_lines:
|
|
display_line = line if not line.startswith("PASS:") else "PASS:********"
|
|
print(f"[VERBOSE] {display_line}")
|
|
|
|
# Send config
|
|
print("Sending configuration...")
|
|
print("\nConfiguration being sent:")
|
|
for line in config_lines:
|
|
display_line = line if not line.startswith("PASS:") else "PASS:********"
|
|
print(f" {display_line}")
|
|
print()
|
|
|
|
start_time = time.time()
|
|
|
|
bytes_written = ser.write(config.encode('utf-8'))
|
|
ser.flush()
|
|
|
|
send_time = time.time() - start_time
|
|
log_verbose(f"Wrote {bytes_written} bytes in {send_time:.3f}s", verbose)
|
|
print(f"Sent {bytes_written} bytes")
|
|
|
|
print("Waiting for response...")
|
|
time.sleep(3)
|
|
|
|
# Read response
|
|
if ser.in_waiting:
|
|
response_size = ser.in_waiting
|
|
print(f"\n✓ Response received: {response_size} bytes")
|
|
|
|
response = ser.read(response_size).decode('utf-8', errors='ignore')
|
|
|
|
print("\nDevice response:")
|
|
print("-" * 70)
|
|
for line in response.split('\n')[:30]:
|
|
if line.strip():
|
|
print(f" {line}")
|
|
print("-" * 70)
|
|
|
|
# Check for key indicators
|
|
success_indicators = []
|
|
warning_indicators = []
|
|
|
|
if "OK" in response:
|
|
success_indicators.append("✓ Configuration acknowledged (OK)")
|
|
if "Config saved" in response or "saved to NVS" in response:
|
|
success_indicators.append("✓ Config saved to NVS")
|
|
if "CSI enable state saved" in response:
|
|
csi_state = "ENABLED" if csi_enable else "DISABLED"
|
|
success_indicators.append(f"✓ CSI {csi_state} saved to NVS")
|
|
if "got ip:" in response.lower():
|
|
success_indicators.append("✓ Device connected to WiFi!")
|
|
import re
|
|
ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', response, re.IGNORECASE)
|
|
if ip_match:
|
|
received_ip = ip_match.group(1)
|
|
success_indicators.append(f" Assigned IP: {received_ip}")
|
|
if received_ip != ip:
|
|
warning_indicators.append(f"⚠ Warning: Device got {received_ip} instead of configured {ip}")
|
|
warning_indicators.append(" This might indicate DHCP is still enabled")
|
|
if "connected" in response.lower():
|
|
success_indicators.append("✓ WiFi connection established")
|
|
|
|
if "failed" in response.lower() or "disconnect" in response.lower():
|
|
warning_indicators.append("⚠ WiFi connection may have failed")
|
|
if "error" in response.lower():
|
|
warning_indicators.append("⚠ Error detected in response")
|
|
|
|
if success_indicators:
|
|
print("\nStatus indicators:")
|
|
for indicator in success_indicators:
|
|
print(f" {indicator}")
|
|
|
|
if warning_indicators:
|
|
print("\nWarnings:")
|
|
for warning in warning_indicators:
|
|
print(f" {warning}")
|
|
else:
|
|
print("\n⚠ No response from device")
|
|
print(" This could mean:")
|
|
print(" - Device is not running config handler")
|
|
print(" - Wrong serial port")
|
|
print(" - Baud rate mismatch")
|
|
|
|
# Reboot device if requested
|
|
if reboot:
|
|
print("\n" + "="*70)
|
|
print("Rebooting device...")
|
|
print("="*70)
|
|
log_verbose("Performing hardware reset via DTR/RTS", verbose)
|
|
|
|
ser.dtr = False
|
|
ser.rts = True
|
|
time.sleep(0.1)
|
|
|
|
ser.rts = False
|
|
time.sleep(0.1)
|
|
|
|
ser.dtr = True
|
|
|
|
print("✓ Reset signal sent - waiting for boot...")
|
|
|
|
time.sleep(3)
|
|
|
|
if ser.in_waiting:
|
|
boot_msg = ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
|
|
|
|
print("\nBoot messages:")
|
|
print("-" * 70)
|
|
for line in boot_msg.split('\n')[:40]:
|
|
if line.strip():
|
|
print(f" {line}")
|
|
print("-" * 70)
|
|
|
|
# Check boot status
|
|
boot_success = []
|
|
boot_warnings = []
|
|
|
|
if "WiFi config loaded from NVS" in boot_msg:
|
|
boot_success.append("✓ Config successfully loaded from NVS")
|
|
elif "No WiFi config" in boot_msg or "YELLOW LED" in boot_msg:
|
|
boot_warnings.append("✗ NO CONFIG found in NVS")
|
|
boot_warnings.append(" Device does not see saved config")
|
|
|
|
# Check CSI status
|
|
if "CSI Capture: ENABLED" in boot_msg:
|
|
boot_success.append("✓ CSI capture is ENABLED")
|
|
elif "CSI Capture: DISABLED" in boot_msg:
|
|
if csi_enable:
|
|
boot_warnings.append("⚠ CSI is DISABLED but was configured as ENABLED")
|
|
else:
|
|
boot_success.append("✓ CSI capture is DISABLED (as configured)")
|
|
|
|
# Check if device got the correct static IP
|
|
import re
|
|
ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', boot_msg, re.IGNORECASE)
|
|
if ip_match:
|
|
received_ip = ip_match.group(1)
|
|
if received_ip == ip:
|
|
boot_success.append(f"✓ Device got correct static IP: {ip}")
|
|
else:
|
|
boot_warnings.append(f"⚠ Device got {received_ip} instead of {ip}")
|
|
boot_warnings.append(" DHCP may still be enabled or IP conflict exists")
|
|
|
|
if "WiFi CONNECTED" in boot_msg:
|
|
boot_success.append("✓ WiFi connection confirmed")
|
|
|
|
if boot_success:
|
|
print("\nBoot Status - SUCCESS:")
|
|
for msg in boot_success:
|
|
print(f" {msg}")
|
|
|
|
if boot_warnings:
|
|
print("\nBoot Status - ISSUES:")
|
|
for msg in boot_warnings:
|
|
print(f" {msg}")
|
|
else:
|
|
print("\n⚠ No boot messages received")
|
|
print(" Device may still be booting...")
|
|
|
|
if verbose:
|
|
log_verbose(f"Input buffer: {ser.in_waiting} bytes", verbose)
|
|
log_verbose(f"Output buffer empty: {ser.out_waiting == 0}", verbose)
|
|
|
|
ser.close()
|
|
log_verbose("Serial port closed", verbose)
|
|
|
|
print(f"\n{'='*70}")
|
|
print("Configuration Summary")
|
|
print(f"{'='*70}")
|
|
print(f"Port: {port}")
|
|
print(f"Static IP: {ip}")
|
|
print(f"SSID: {ssid}")
|
|
print(f"Mode: {mode}")
|
|
print(f"Band: {band}")
|
|
print(f"Bandwidth: {bandwidth}")
|
|
print(f"PowerSave: {powersave}")
|
|
print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}")
|
|
print(f"DHCP: Disabled (static IP mode)")
|
|
print(f"{'='*70}")
|
|
print("\nNext steps:")
|
|
print(f" 1. Test connection:")
|
|
print(f" ping {ip}")
|
|
print(f" iperf -c {ip}")
|
|
print(f"\n 2. Verify device has correct IP:")
|
|
print(f" idf.py -p {port} monitor")
|
|
print(f" Look for: 'got ip:{ip}'")
|
|
if csi_enable:
|
|
print(f"\n 3. Verify CSI is capturing:")
|
|
print(f" Look for: 'CSI Capture: ENABLED'")
|
|
print(f" 'Captured X CSI packets'")
|
|
|
|
return True
|
|
|
|
except serial.SerialException as e:
|
|
print(f"\n✗ Serial error: {e}")
|
|
log_verbose(f"Serial exception details: {type(e).__name__}", verbose)
|
|
print(" Is another program using this port?")
|
|
return False
|
|
except KeyboardInterrupt:
|
|
print("\n\nConfiguration cancelled by user")
|
|
if 'ser' in locals() and ser.is_open:
|
|
ser.close()
|
|
log_verbose("Serial port closed after interrupt", verbose)
|
|
return False
|
|
except Exception as e:
|
|
print(f"\n✗ Error: {e}")
|
|
if verbose:
|
|
import traceback
|
|
print("\n[VERBOSE] Full traceback:")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Configure ESP32 WiFi with static IP (DHCP automatically disabled) and CSI control',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Configure device #1 for STA mode with CSI DISABLED (baseline testing)
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA
|
|
|
|
# Configure device #1 for STA mode with CSI ENABLED
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi
|
|
|
|
# Configure device #25 for MONITOR mode (collapse detection, CSI not needed)
|
|
%(prog)s -p /dev/ttyUSB1 -i 192.168.1.90 -M MONITOR -mc 36
|
|
|
|
# STA mode with CSI for iperf + CSI correlation testing
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi -ps NONE
|
|
|
|
# Monitor mode on 2.4GHz channel 6
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.91 -M MONITOR -mc 6 -b 2.4G
|
|
|
|
# STA mode on 5GHz with 40MHz bandwidth and CSI
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA -b 5G -B HT40 --csi
|
|
|
|
# With verbose output
|
|
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 -v
|
|
|
|
Note:
|
|
- Mode and CSI enable state are saved to NVS
|
|
- Device will auto-start in configured mode on boot
|
|
- CSI defaults to DISABLED unless --csi flag is used
|
|
- DHCP is always disabled when using this script (static IP mode)
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('-p', '--port', required=True,
|
|
help='Serial port (e.g., /dev/ttyUSB0)')
|
|
parser.add_argument('-i', '--ip', required=True,
|
|
help='Static IP address (DHCP will be disabled)')
|
|
parser.add_argument('-s', '--ssid', default='ClubHouse2G',
|
|
help='WiFi SSID (default: ClubHouse2G)')
|
|
parser.add_argument('-P', '--password', default='ez2remember',
|
|
help='WiFi password (default: ez2remember)')
|
|
parser.add_argument('-g', '--gateway', default='192.168.1.1',
|
|
help='Gateway IP (default: 192.168.1.1)')
|
|
parser.add_argument('-m', '--netmask', default='255.255.255.0',
|
|
help='Netmask (default: 255.255.255.0)')
|
|
parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'],
|
|
help='WiFi band: 2.4G or 5G (default: 2.4G)')
|
|
parser.add_argument('-B', '--bandwidth', default='HT20',
|
|
choices=['HT20', 'HT40', 'VHT80'],
|
|
help='Channel bandwidth: HT20 (20MHz), HT40 (40MHz), VHT80 (80MHz, 5GHz only) (default: HT20)')
|
|
parser.add_argument('-ps', '--powersave', default='NONE',
|
|
choices=['NONE', 'MIN', 'MIN_MODEM', 'MAX', 'MAX_MODEM'],
|
|
help='Power save mode: NONE (no PS, best for CSI), MIN/MIN_MODEM, MAX/MAX_MODEM (default: NONE)')
|
|
parser.add_argument('-M', '--mode', default='STA',
|
|
choices=['STA', 'MONITOR'],
|
|
help='Operating mode: STA (connect to AP, CSI+iperf) or MONITOR (promiscuous, collapse detection) (default: STA)')
|
|
parser.add_argument('-mc', '--monitor-channel', type=int, default=36,
|
|
help='Monitor mode channel (1-11 for 2.4GHz, 36-165 for 5GHz) (default: 36)')
|
|
parser.add_argument('--csi', action='store_true',
|
|
help='Enable CSI capture (default: disabled). Use for devices that need CSI data collection.')
|
|
parser.add_argument('-r', '--no-reboot', action='store_true',
|
|
help='Do NOT reboot device after configuration')
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='Enable verbose output')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate bandwidth selection
|
|
if args.bandwidth == 'VHT80' and args.band == '2.4G':
|
|
print("\n✗ Error: VHT80 (80MHz) is only supported on 5GHz band")
|
|
print(" Either use -b 5G or choose HT20/HT40 bandwidth")
|
|
sys.exit(1)
|
|
|
|
success = config_device(
|
|
port=args.port,
|
|
ip=args.ip,
|
|
ssid=args.ssid,
|
|
password=args.password,
|
|
gateway=args.gateway,
|
|
netmask=args.netmask,
|
|
band=args.band,
|
|
bandwidth=args.bandwidth,
|
|
powersave=args.powersave,
|
|
mode=args.mode,
|
|
monitor_channel=args.monitor_channel,
|
|
csi_enable=args.csi,
|
|
reboot=not args.no_reboot,
|
|
verbose=args.verbose
|
|
)
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|