#!/usr/bin/env python3 """ ESP32 Async Batch Configuration Tool The definitive parallel configuration tool. Configures 30+ ESP32 devices concurrently using non-blocking I/O. Features: - Concurrent execution (configure 30 devices in <20 seconds) - Robust Regex-based state detection - Supports verifying both Station Mode (IP check) and Monitor Mode - Context-aware logging - CSI enable/disable control """ import asyncio import serial_asyncio import sys import os import argparse import ipaddress import re import time import logging # Ensure detection script is available sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: import detect_esp32 except ImportError: print("Error: 'detect_esp32.py' not found.") sys.exit(1) # --- Logging Setup --- class DeviceLoggerAdapter(logging.LoggerAdapter): """Prefixes log messages with the device port name""" def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') logger = logging.getLogger("BatchConfig") class Esp32Configurator: """ Manages the lifecycle of configuring a single ESP32 device via Async Serial. """ def __init__(self, port, target_ip, args): self.port = port self.target_ip = target_ip self.args = args self.log = DeviceLoggerAdapter(logger, {'connid': port}) # --- Regex Patterns --- # Success indicators self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_monitor_success = re.compile(r'Monitor mode active', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE) # Prompts indicating device is booting/ready self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) # Error indicators self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE) async def run(self): """Main execution workflow for this device""" try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) except Exception as e: self.log.error(f"Failed to open port: {e}") return False try: # 1. Hardware Reset (DTR/RTS) self.log.info("Resetting...") writer.transport.serial.dtr = False writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False await asyncio.sleep(0.1) writer.transport.serial.dtr = True # 2. Wait for Boot # We assume the device is ready when we see logs or a prompt if not await self._wait_for_boot(reader): self.log.warning("Boot prompt missed, attempting config anyway...") # 3. Send Configuration await self._send_config(writer) # 4. Verify Success return await self._verify_configuration(reader) except Exception as e: self.log.error(f"Exception: {e}") return False finally: writer.close() await writer.wait_closed() async def _wait_for_boot(self, reader): """Reads stream until a known 'ready' prompt appears""" self.log.info("Waiting for boot...") timeout = time.time() + 5 # 5 second boot timeout while time.time() < timeout: try: line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5) line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue if self.regex_ready.search(line): return True except asyncio.TimeoutError: continue return False async def _send_config(self, writer): """Builds and transmits the configuration command""" csi_val = '1' if self.args.csi_enable else '0' self.log.info(f"Sending config for IP {self.target_ip} (CSI:{csi_val})...") # Construct command block config_str = ( f"CFG\n" f"SSID:{self.args.ssid}\n" f"PASS:{self.args.password}\n" f"IP:{self.target_ip}\n" f"MASK:{self.args.netmask}\n" f"GW:{self.args.gateway}\n" f"DHCP:0\n" f"BAND:{self.args.band}\n" f"BW:{self.args.bandwidth}\n" f"POWERSAVE:{self.args.powersave}\n" f"MODE:{self.args.mode}\n" f"MON_CH:{self.args.monitor_channel}\n" f"CSI:{csi_val}\n" f"END\n" ) writer.write(config_str.encode('utf-8')) await writer.drain() async def _verify_configuration(self, reader): """Monitors output for confirmation of Success""" self.log.info("Verifying configuration...") timeout = time.time() + 15 # 15s verification timeout csi_saved = False while time.time() < timeout: try: line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0) line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue # Check for CSI save confirmation if self.regex_csi_saved.search(line): csi_saved = True # Check for Station Mode Success (IP Address) m_ip = self.regex_got_ip.search(line) if m_ip: got_ip = m_ip.group(1) if got_ip == self.target_ip: csi_status = "CSI saved" if csi_saved else "" self.log.info(f"SUCCESS: Assigned {got_ip} {csi_status}") return True else: self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}") # Check for Monitor Mode Success if self.regex_monitor_success.search(line): self.log.info("SUCCESS: Monitor Mode Active") return True # Check for errors if self.regex_error.search(line): self.log.warning(f"Device Reported Error: {line}") except asyncio.TimeoutError: continue self.log.error("Timeout: Device did not confirm configuration.") return False async def main_async(): parser = argparse.ArgumentParser( description='Async ESP32 Batch Config with CSI Control', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Configure 20 iperf baseline devices (NO CSI) %(prog)s --start-ip 192.168.1.81 # Configure devices WITH CSI enabled %(prog)s --start-ip 192.168.1.111 --csi # Configure for monitor mode on channel 36 %(prog)s --start-ip 192.168.1.90 -M MONITOR -mc 36 # 5GHz with 40MHz bandwidth %(prog)s --start-ip 192.168.1.81 -b 5G -B HT40 """ ) # Arguments parser.add_argument('--start-ip', required=True, help='Starting Static IP') parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='WiFi SSID') parser.add_argument('-P', '--password', default='ez2remember', help='WiFi password') parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP') parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G']) parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80']) parser.add_argument('-ps', '--powersave', default='NONE') parser.add_argument('-M', '--mode', default='STA') parser.add_argument('-mc', '--monitor-channel', type=int, default=36) parser.add_argument('--csi', dest='csi_enable', action='store_true', help='Enable CSI capture (default: disabled)') args = parser.parse_args() # 1. Detect print("Step 1: Detecting Devices...") devices = detect_esp32.detect_esp32_devices() if not devices: print("No devices found.") return # Sort naturally (ttyUSB2 before ttyUSB10) def natural_keys(d): return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] devices.sort(key=natural_keys) try: start_ip_obj = ipaddress.IPv4Address(args.start_ip) except: print(f"Invalid IP: {args.start_ip}") return # 2. Configure Concurrently csi_status = "ENABLED" if args.csi_enable else "DISABLED" print(f"Step 2: Configuring {len(devices)} devices concurrently (CSI: {csi_status})...") tasks = [] for i, dev in enumerate(devices): current_ip = str(start_ip_obj + i) configurator = Esp32Configurator(dev.device, current_ip, args) tasks.append(configurator.run()) # Run everything at once results = await asyncio.gather(*tasks) # 3. Report success_count = results.count(True) print("\n" + "="*40) print(f"Total Devices: {len(devices)}") print(f"Success: {success_count}") print(f"Failed: {len(devices) - success_count}") print(f"CSI Setting: {csi_status}") print("="*40) if __name__ == '__main__': try: if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.run(main_async()) except KeyboardInterrupt: print("\nAborted by user.")