ESP32/esp32_reconfig.py

236 lines
8.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ESP32 Reconfiguration Tool
Iterates through connected devices and updates their settings (SSID, IP, etc.)
without reflashing firmware. Runs sequentially for reliability.
"""
import sys
import os
import argparse
import ipaddress
import re
import time
import serial
from serial.tools import list_ports
# --- Configuration ---
BAUD_RATE = 115200
TIMEOUT = 0.5 # Serial read timeout
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
def detect_devices():
"""Returns a sorted list of ESP32 USB serial ports."""
candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60"))
ports = [p.device for p in candidates]
ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)])
return ports
def extract_device_number(device_path):
match = re.search(r'(\d+)$', device_path)
return int(match.group(1)) if match else 0
def configure_device(port, target_ip, args):
"""
Connects to a single device, resets it, and injects the config.
Returns True if verification succeeds.
"""
try:
ser = serial.Serial(port, BAUD_RATE, timeout=0.1)
except Exception as e:
print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}")
return False
try:
# 1. Reset Device
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
ser.dtr = True
# 2. Wait for App to Settle (Handle GPS delay)
print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True)
start_time = time.time()
# We wait until we see the "esp32>" prompt OR specific log lines
# The prompt is the safest indicator that the console is ready.
prompt_detected = False
buffer = ""
while time.time() - start_time < 10.0:
try:
# Read char by char to catch prompts that don't end in newline
chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore')
if chunk:
buffer += chunk
# Check for prompt or end of init
if "esp32>" in buffer or "Entering console loop" in buffer:
prompt_detected = True
break
# Keep buffer size manageable
if len(buffer) > 1000: buffer = buffer[-1000:]
except Exception:
pass
time.sleep(0.05)
if not prompt_detected:
print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}")
else:
print(" OK")
# 3. Clear Buffers & Wakeup
ser.reset_input_buffer()
ser.write(b'\n') # Send an Enter to clear any partial commands
time.sleep(0.2)
# 4. Construct Config String (Using CRLF \r\n for safety)
csi_val = '1' if args.csi_enable else '0'
role_str = "SERVER" if args.iperf_server else "CLIENT"
iperf_enable_val = '0' if args.no_iperf else '1'
period_us = int(args.iperf_period * 1000000)
# Note: We send \r\n explicitly
config_lines = [
"CFG",
f"SSID:{args.ssid}",
f"PASS:{args.password}",
f"IP:{target_ip}",
f"MASK:{args.netmask}",
f"GW:{args.gateway}",
f"DHCP:0",
f"BAND:{args.band}",
f"BW:{args.bandwidth}",
f"POWERSAVE:{args.powersave}",
f"MODE:{args.mode}",
f"MON_CH:{args.monitor_channel}",
f"CSI:{csi_val}",
f"IPERF_PERIOD_US:{period_us}",
f"IPERF_ROLE:{role_str}",
f"IPERF_PROTO:{args.iperf_proto}",
f"IPERF_DEST_IP:{args.iperf_dest_ip}",
f"IPERF_PORT:{args.iperf_port}",
f"IPERF_BURST:{args.iperf_burst}",
f"IPERF_LEN:{args.iperf_len}",
f"IPERF_ENABLED:{iperf_enable_val}",
"END"
]
config_payload = "\r\n".join(config_lines) + "\r\n"
# 5. Send Config
print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True)
ser.write(config_payload.encode('utf-8'))
ser.flush()
# 6. Verify
verify_start = time.time()
verified = False
ser.timeout = 0.5 # Increase timeout for line reading
while time.time() - verify_start < 8.0:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if not line: continue
# Check for success indicators
if "Config saved" in line or "CSI enable state saved" in line:
verified = True
break
# Check for IP confirmation
if f"got ip:{target_ip}" in line:
verified = True
break
if verified:
print(f" {Colors.GREEN}SUCCESS{Colors.RESET}")
# Final Reset to apply settings cleanly
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
return True
else:
print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}")
return False
except Exception as e:
print(f"[{port}] Error: {e}")
return False
finally:
if ser.is_open:
ser.close()
def main():
parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool')
parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)')
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
parser.add_argument('-P', '--password', default='ez2remember')
parser.add_argument('-g', '--gateway', default='192.168.1.1')
parser.add_argument('-m', '--netmask', default='255.255.255.0')
parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'])
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'])
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('--iperf-period', type=float, default=0.01)
parser.add_argument('--iperf-burst', type=int, default=1)
parser.add_argument('--iperf-len', type=int, default=1470)
parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'])
parser.add_argument('--iperf-dest-ip', default='192.168.1.50')
parser.add_argument('--iperf-port', type=int, default=5001)
parser.add_argument('--no-iperf', action='store_true')
parser.add_argument('--iperf-client', action='store_true')
parser.add_argument('--iperf-server', action='store_true')
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'])
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true')
parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device")
args = parser.parse_args()
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" ESP32 Sequential Reconfig Tool")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
devices = detect_devices()
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
sys.exit(1)
print(f"Found {len(devices)} devices. Starting reconfiguration...\n")
start_ip = ipaddress.IPv4Address(args.start_ip)
for i, port in enumerate(devices):
offset = extract_device_number(port)
target_ip = str(start_ip + offset)
print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}")
success = False
for attempt in range(1, args.retries + 1):
if attempt > 1:
print(f" Retry {attempt}/{args.retries}...")
if configure_device(port, target_ip, args):
success = True
break
time.sleep(1.0)
if not success:
print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n")
else:
print("")
print(f"{Colors.BLUE}Done.{Colors.RESET}")
if __name__ == '__main__':
main()