#!/usr/bin/env python3 """ ESP32 Reconfiguration Tool Iterates through connected devices and updates their settings (SSID, IP, etc.) without reflashing firmware. Runs sequentially for reliability. """ import sys import os import argparse import ipaddress import re import time import serial from serial.tools import list_ports # --- Configuration --- BAUD_RATE = 115200 TIMEOUT = 0.5 # Serial read timeout class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' RESET = '\033[0m' def detect_devices(): """Returns a sorted list of ESP32 USB serial ports.""" candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60")) ports = [p.device for p in candidates] ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)]) return ports def extract_device_number(device_path): match = re.search(r'(\d+)$', device_path) return int(match.group(1)) if match else 0 def configure_device(port, target_ip, args): """ Connects to a single device, resets it, and injects the config. Returns True if verification succeeds. """ try: ser = serial.Serial(port, BAUD_RATE, timeout=0.1) except Exception as e: print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}") return False try: # 1. Reset Device ser.dtr = False ser.rts = True time.sleep(0.1) ser.rts = False ser.dtr = True # 2. Wait for App to Settle (Handle GPS delay) print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True) start_time = time.time() # We wait until we see the "esp32>" prompt OR specific log lines # The prompt is the safest indicator that the console is ready. prompt_detected = False buffer = "" while time.time() - start_time < 10.0: try: # Read char by char to catch prompts that don't end in newline chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore') if chunk: buffer += chunk # Check for prompt or end of init if "esp32>" in buffer or "Entering console loop" in buffer: prompt_detected = True break # Keep buffer size manageable if len(buffer) > 1000: buffer = buffer[-1000:] except Exception: pass time.sleep(0.05) if not prompt_detected: print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}") else: print(" OK") # 3. Clear Buffers & Wakeup ser.reset_input_buffer() ser.write(b'\n') # Send an Enter to clear any partial commands time.sleep(0.2) # 4. Construct Config String (Using CRLF \r\n for safety) csi_val = '1' if args.csi_enable else '0' role_str = "SERVER" if args.iperf_server else "CLIENT" iperf_enable_val = '0' if args.no_iperf else '1' period_us = int(args.iperf_period * 1000000) # Note: We send \r\n explicitly config_lines = [ "CFG", f"SSID:{args.ssid}", f"PASS:{args.password}", f"IP:{target_ip}", f"MASK:{args.netmask}", f"GW:{args.gateway}", f"DHCP:0", f"BAND:{args.band}", f"BW:{args.bandwidth}", f"POWERSAVE:{args.powersave}", f"MODE:{args.mode}", f"MON_CH:{args.monitor_channel}", f"CSI:{csi_val}", f"IPERF_PERIOD_US:{period_us}", f"IPERF_ROLE:{role_str}", f"IPERF_PROTO:{args.iperf_proto}", f"IPERF_DEST_IP:{args.iperf_dest_ip}", f"IPERF_PORT:{args.iperf_port}", f"IPERF_BURST:{args.iperf_burst}", f"IPERF_LEN:{args.iperf_len}", f"IPERF_ENABLED:{iperf_enable_val}", "END" ] config_payload = "\r\n".join(config_lines) + "\r\n" # 5. Send Config print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True) ser.write(config_payload.encode('utf-8')) ser.flush() # 6. Verify verify_start = time.time() verified = False ser.timeout = 0.5 # Increase timeout for line reading while time.time() - verify_start < 8.0: line = ser.readline().decode('utf-8', errors='ignore').strip() if not line: continue # Check for success indicators if "Config saved" in line or "CSI enable state saved" in line: verified = True break # Check for IP confirmation if f"got ip:{target_ip}" in line: verified = True break if verified: print(f" {Colors.GREEN}SUCCESS{Colors.RESET}") # Final Reset to apply settings cleanly ser.dtr = False ser.rts = True time.sleep(0.1) ser.rts = False return True else: print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}") return False except Exception as e: print(f"[{port}] Error: {e}") return False finally: if ser.is_open: ser.close() def main(): parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool') parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)') parser.add_argument('-s', '--ssid', default='ClubHouse2G') parser.add_argument('-P', '--password', default='ez2remember') parser.add_argument('-g', '--gateway', default='192.168.1.1') parser.add_argument('-m', '--netmask', default='255.255.255.0') parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G']) parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80']) parser.add_argument('-ps', '--powersave', default='NONE') parser.add_argument('--iperf-period', type=float, default=0.01) parser.add_argument('--iperf-burst', type=int, default=1) parser.add_argument('--iperf-len', type=int, default=1470) parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP']) parser.add_argument('--iperf-dest-ip', default='192.168.1.50') parser.add_argument('--iperf-port', type=int, default=5001) parser.add_argument('--no-iperf', action='store_true') parser.add_argument('--iperf-client', action='store_true') parser.add_argument('--iperf-server', action='store_true') parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR']) parser.add_argument('-mc', '--monitor-channel', type=int, default=36) parser.add_argument('--csi', dest='csi_enable', action='store_true') parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device") args = parser.parse_args() print(f"{Colors.BLUE}{'='*60}{Colors.RESET}") print(f" ESP32 Sequential Reconfig Tool") print(f"{Colors.BLUE}{'='*60}{Colors.RESET}") devices = detect_devices() if not devices: print(f"{Colors.RED}No devices found.{Colors.RESET}") sys.exit(1) print(f"Found {len(devices)} devices. Starting reconfiguration...\n") start_ip = ipaddress.IPv4Address(args.start_ip) for i, port in enumerate(devices): offset = extract_device_number(port) target_ip = str(start_ip + offset) print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}") success = False for attempt in range(1, args.retries + 1): if attempt > 1: print(f" Retry {attempt}/{args.retries}...") if configure_device(port, target_ip, args): success = True break time.sleep(1.0) if not success: print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n") else: print("") print(f"{Colors.BLUE}Done.{Colors.RESET}") if __name__ == '__main__': main()