#!/usr/bin/env python3 """ ESP32 Async Mass Deployment Tool Combines parallel flashing (via esptool) with async configuration. Features: - Semaphore-limited flashing (prevents USB hub crashes) - Regex-based boot detection (faster/reliable config) - Parallel verification """ import asyncio import serial_asyncio import sys import os import argparse import ipaddress import re import time import logging from pathlib import Path # Ensure detection script is available sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: import detect_esp32 except ImportError: print("Error: 'detect_esp32.py' not found.") sys.exit(1) # --- Configuration --- MAX_CONCURRENT_FLASH = 8 # Limit active flashes to prevent USB brownouts class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' RESET = '\033[0m' # Logger Adapter for context class DeviceLoggerAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') logger = logging.getLogger("Deploy") class DeployWorker: def __init__(self, port, target_ip, args, build_dir, flash_sem): self.port = port self.target_ip = target_ip self.args = args self.build_dir = build_dir self.flash_sem = flash_sem self.log = DeviceLoggerAdapter(logger, {'connid': port}) # Regex Patterns self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) async def run(self): """Main deployment workflow""" try: # 1. FLASHING PHASE (Protected by Semaphore) async with self.flash_sem: if self.args.erase: if not await self._erase_flash(): return False if not await self._flash_firmware(): return False # 2. CONFIG PHASE (Serial interaction) # We assume flash resets device. We open serial immediately to catch boot. # Note: We wait a tiny bit to let esptool release the port handle await asyncio.sleep(0.5) if self.args.ssid and self.args.password: if not await self._configure_device(): return False else: self.log.info(f"{Colors.GREEN}Flash Complete (NVS Preserved){Colors.RESET}") return True except Exception as e: self.log.error(f"Worker Exception: {e}") return False async def _erase_flash(self): self.log.info("Erasing flash...") cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: self.log.info("Erase successful.") return True else: self.log.error(f"Erase failed: {stderr.decode()}") return False async def _flash_firmware(self): self.log.info("Flashing firmware...") # Use relative path for flash_args (must be run from build_dir) cmd = [ 'esptool.py', '-p', self.port, '-b', str(self.args.baud), '--before', 'default_reset', '--after', 'hard_reset', 'write_flash', '@flash_args' ] proc = await asyncio.create_subprocess_exec( *cmd, cwd=self.build_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Wait with timeout try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) except asyncio.TimeoutError: proc.kill() self.log.error("Flash timed out.") return False if proc.returncode == 0: self.log.info("Flash successful.") return True else: self.log.error(f"Flash failed: {stderr.decode()}") return False async def _configure_device(self): """Connects via Serial, waits for boot, sends config""" self.log.info("Connecting to console...") try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) except Exception as e: self.log.error(f"Serial open failed: {e}") return False try: # A. Wait for Boot self.log.info("Waiting for boot...") booted = False end_time = time.time() + 8 # 8s boot timeout while time.time() < end_time: try: line_b = await asyncio.wait_for(reader.readline(), timeout=0.5) line = line_b.decode('utf-8', errors='ignore').strip() if self.regex_ready.search(line): booted = True break except asyncio.TimeoutError: continue if not booted: # Even if we didn't see the specific line, we might still be able to config self.log.warning("Boot prompt not detected, trying to config anyway...") # B. Send Config self.log.info(f"Sending config for {self.target_ip}...") config_str = ( f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\n" f"IP:{self.target_ip}\nMASK:{self.args.netmask}\nGW:{self.args.gateway}\n" f"DHCP:0\nEND\n" ) writer.write(config_str.encode('utf-8')) await writer.drain() # C. Verify self.log.info("Verifying IP...") start_verify = time.time() while time.time() < start_verify + 10: try: line_b = await asyncio.wait_for(reader.readline(), timeout=1.0) line = line_b.decode('utf-8', errors='ignore') m = self.regex_got_ip.search(line) if m: if m.group(1) == self.target_ip: self.log.info(f"{Colors.GREEN}SUCCESS: Configured & Connected{Colors.RESET}") return True except asyncio.TimeoutError: continue self.log.error("Config sent, but no IP confirmation received.") return False except Exception as e: self.log.error(f"Config error: {e}") return False finally: writer.close() await writer.wait_closed() async def main_async(): parser = argparse.ArgumentParser(description='Async ESP32 Mass Deployment') parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir') parser.add_argument('-s', '--ssid', help='WiFi SSID') parser.add_argument('-p', '--password', help='WiFi Password') parser.add_argument('--start-ip', default='192.168.1.51', help='Start IP') parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud') parser.add_argument('--erase', action='store_true', help='Full erase first') parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway') parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') args = parser.parse_args() project_dir = Path(args.dir).resolve() build_dir = project_dir / 'build' # 1. Build Firmware (Sync blocking) print(f"{Colors.YELLOW}[1/3] Building Firmware...{Colors.RESET}") proc = await asyncio.create_subprocess_exec( 'idf.py', 'build', cwd=project_dir, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}") return if not (build_dir / 'flash_args').exists(): print(f"{Colors.RED}Error: build/flash_args missing.{Colors.RESET}") return print(f"{Colors.GREEN}Build Complete.{Colors.RESET}") # 2. Detect Devices print(f"{Colors.YELLOW}[2/3] Scanning Devices...{Colors.RESET}") devices = detect_esp32.detect_esp32_devices() if not devices: print(f"{Colors.RED}No devices found.{Colors.RESET}") return # Sort naturally def natural_keys(d): return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] devices.sort(key=natural_keys) # 3. Deploy print(f"{Colors.YELLOW}[3/3] Deploying to {len(devices)} devices...{Colors.RESET}") print(f"Max Concurrent Flashes: {MAX_CONCURRENT_FLASH}") try: start_ip_obj = ipaddress.IPv4Address(args.start_ip) except: print("Invalid Start IP") return flash_sem = asyncio.Semaphore(MAX_CONCURRENT_FLASH) tasks = [] for i, dev in enumerate(devices): target_ip = str(start_ip_obj + i) worker = DeployWorker(dev.device, target_ip, args, build_dir, flash_sem) tasks.append(worker.run()) results = await asyncio.gather(*tasks) # 4. Summary success = results.count(True) print(f"\n{Colors.BLUE}{'='*40}{Colors.RESET}") print(f"Total: {len(devices)}") print(f"Success: {Colors.GREEN}{success}{Colors.RESET}") print(f"Failed: {Colors.RED}{len(devices) - success}{Colors.RESET}") print(f"{Colors.BLUE}{'='*40}{Colors.RESET}") if __name__ == '__main__': try: # Windows loop fix if needed if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.run(main_async()) except KeyboardInterrupt: print("\nCancelled.")