#!/usr/bin/env python3 """ ESP32 Mass Deployment Tool Parallel flashing and WiFi configuration for multiple ESP32 devices """ import os import sys import subprocess import glob import time import argparse from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' NC = '\033[0m' # No Color class DeviceDeployer: def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51", netmask="255.255.255.0", gateway="192.168.1.1", baud_rate=460800, max_retries=2, verify_ping=True, num_devices=None, verbose=False, parallel=True): self.project_dir = Path(project_dir) self.ssid = ssid self.password = password self.start_ip = start_ip self.netmask = netmask self.gateway = gateway self.baud_rate = baud_rate self.max_retries = max_retries self.verify_ping = verify_ping self.num_devices = num_devices self.verbose = verbose self.parallel = parallel # Parse IP address ip_parts = start_ip.split('.') self.ip_base = '.'.join(ip_parts[:3]) self.ip_start = int(ip_parts[3]) self.devices = [] self.results = {} self.log_dir = Path('/tmp') def print_banner(self): """Print deployment configuration""" print() print(f"{Colors.BLUE}{'='*70}") print("ESP32 Mass Deployment Tool") print(f"{'='*70}{Colors.NC}") print(f"Project: {self.project_dir}") print(f"SSID: {self.ssid}") print(f"Password: {'*' * len(self.password)}") print(f"Start IP: {self.start_ip}") print(f"Gateway: {self.gateway}") print(f"Netmask: {self.netmask}") print(f"Baud Rate: {self.baud_rate}") print(f"Max Retries: {self.max_retries}") print(f"Verify Ping: {self.verify_ping}") print(f"Mode: {'Parallel' if self.parallel else 'Sequential'}") if self.num_devices: print(f"Max Devices: {self.num_devices}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") def build_firmware(self): """Build the firmware using idf.py""" print() print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}") try: result = subprocess.run( ['idf.py', 'build'], cwd=self.project_dir, check=True, capture_output=not self.verbose ) print(f"{Colors.GREEN}✓ Build complete{Colors.NC}") return True except subprocess.CalledProcessError as e: print(f"{Colors.RED}✗ Build failed!{Colors.NC}") if self.verbose: print(e.stderr.decode() if e.stderr else "") return False def detect_devices(self): """Detect connected ESP32 devices""" print() print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}") # Find all USB serial devices self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')) if not self.devices: print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}") print("Connect ESP32 devices via USB and try again.") return False # Limit to num_devices if specified if self.num_devices and len(self.devices) > self.num_devices: print(f"Limiting to first {self.num_devices} devices") self.devices = self.devices[:self.num_devices] print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}") for i, device in enumerate(self.devices): ip = self.get_ip_for_index(i) print(f" [{i:2d}] {device:14s} → {ip}") return True def get_ip_for_index(self, index): """Calculate IP address for device index""" return f"{self.ip_base}.{self.ip_start + index}" def flash_and_configure(self, index, device): """Flash and configure a single device""" ip_addr = self.get_ip_for_index(index) log_file = self.log_dir / f"esp32_deploy_{index}.log" log_lines = [] def log(msg): log_lines.append(msg) if self.verbose or not self.parallel: print(f"[{index}] {msg}") for attempt in range(1, self.max_retries + 1): log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===") log(f"Target IP: {ip_addr}") # Flash firmware log("Flashing...") try: result = subprocess.run( ['idf.py', '-p', device, '-b', str(self.baud_rate), 'flash'], cwd=self.project_dir, check=True, capture_output=True, timeout=300 # 5 minute timeout ) log("✓ Flash successful") except subprocess.CalledProcessError as e: log(f"✗ Flash failed on attempt {attempt}") if attempt == self.max_retries: # Write log file with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'FAILED', 'log': log_lines } time.sleep(2) continue except subprocess.TimeoutExpired: log(f"✗ Flash timeout on attempt {attempt}") if attempt == self.max_retries: with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'TIMEOUT', 'log': log_lines } continue # Wait for device to boot log("Waiting for boot...") time.sleep(3) # Configure WiFi log("Configuring WiFi...") try: config = ( f"CFG\n" f"SSID:{self.ssid}\n" f"PASS:{self.password}\n" f"IP:{ip_addr}\n" f"MASK:{self.netmask}\n" f"GW:{self.gateway}\n" f"DHCP:0\n" f"END\n" ) with open(device, 'w') as f: f.write(config) log("✓ Config sent") except Exception as e: log(f"✗ Config error: {e}") # Wait for network to initialize log("Waiting for network...") time.sleep(5) # Verify connectivity if self.verify_ping: log("Verifying connectivity...") try: result = subprocess.run( ['ping', '-c', '2', '-W', '3', ip_addr], capture_output=True, timeout=10 ) if result.returncode == 0: log("✓ Ping successful") # Write log file with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'SUCCESS', 'log': log_lines } else: log(f"✗ Ping failed on attempt {attempt}") if attempt == self.max_retries: with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'NO_PING', 'log': log_lines } except subprocess.TimeoutExpired: log(f"✗ Ping timeout on attempt {attempt}") else: # Write log file with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'SUCCESS', 'log': log_lines } time.sleep(2) # If we get here, all retries failed with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'FAILED', 'log': log_lines } def deploy_all_parallel(self): """Deploy to all devices in parallel""" print() print(f"{Colors.YELLOW}[3/4] Flashing and configuring (parallel)...{Colors.NC}") print() # Clean old log files for f in self.log_dir.glob('esp32_deploy_*.log'): f.unlink() # Use ThreadPoolExecutor for parallel execution max_workers = min(32, len(self.devices)) with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all jobs futures = { executor.submit(self.flash_and_configure, i, device): (i, device) for i, device in enumerate(self.devices) } # Collect results as they complete for future in as_completed(futures): result = future.result() self.results[result['index']] = result # Print immediate status self.print_device_status(result) def deploy_all_sequential(self): """Deploy to devices one at a time (sequential)""" print() print(f"{Colors.YELLOW}[3/4] Flashing and configuring (sequential)...{Colors.NC}") print() # Clean old log files for f in self.log_dir.glob('esp32_deploy_*.log'): f.unlink() for i, device in enumerate(self.devices): print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}") result = self.flash_and_configure(i, device) self.results[result['index']] = result # Print status after each device self.print_device_status(result) print() def print_device_status(self, result): """Print status for a single device""" status_color = { 'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW, 'FAILED': Colors.RED, 'TIMEOUT': Colors.RED }.get(result['status'], Colors.RED) status_text = { 'SUCCESS': 'OK', 'NO_PING': 'FLASHED, NO PING', 'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT' }.get(result['status'], 'ERROR') print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s} → " f"{result['ip']:15s} [{status_text}]{Colors.NC}") def deploy_all(self): """Deploy to all devices (parallel or sequential)""" if self.parallel: self.deploy_all_parallel() else: self.deploy_all_sequential() def print_summary(self): """Print deployment summary""" print() print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") # Count statuses success_count = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS') no_ping_count = sum(1 for r in self.results.values() if r['status'] == 'NO_PING') failed_count = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT']) # Print all devices for i in range(len(self.devices)): if i in self.results: result = self.results[i] status_icon = { 'SUCCESS': f"{Colors.GREEN}✓{Colors.NC}", 'NO_PING': f"{Colors.YELLOW}⚠{Colors.NC}", 'FAILED': f"{Colors.RED}✗{Colors.NC}", 'TIMEOUT': f"{Colors.RED}✗{Colors.NC}" }.get(result['status'], f"{Colors.RED}?{Colors.NC}") status_msg = { 'NO_PING': " (no ping response)", 'FAILED': " (failed)", 'TIMEOUT': " (timeout)" }.get(result['status'], "") print(f"{status_icon} {result['device']:14s} → {result['ip']}{status_msg}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") print(f"Total: {len(self.devices)} devices") print(f"{Colors.GREEN}Success: {success_count}{Colors.NC}") if no_ping_count > 0: print(f"{Colors.YELLOW}Warning: {no_ping_count} (flashed but no ping){Colors.NC}") if failed_count > 0: print(f"{Colors.RED}Failed: {failed_count}{Colors.NC}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") # Print log location print() print(f"Logs: /tmp/esp32_deploy_*.log") # Print test commands print() print("Test commands:") print(f" # Test first device") print(f" iperf -c {self.get_ip_for_index(0)}") print() print(f" # Ping all devices") ip_range = f"{self.ip_start}..{self.ip_start + len(self.devices) - 1}" print(f" for i in {{{ip_range}}}; do ping -c 1 {self.ip_base}.$i & done; wait") print() return failed_count def main(): parser = argparse.ArgumentParser( description='ESP32 Mass Deployment Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Deploy to first 30 devices (default) %(prog)s -s ClubHouse2G -p mypassword # Deploy sequentially (easier debugging) %(prog)s -s ClubHouse2G -p mypassword --sequential # Deploy to all connected devices %(prog)s -s ClubHouse2G -p mypassword -n 0 # Custom IP range %(prog)s -s ClubHouse2G -p mypassword --start-ip 10.0.0.100 # From environment variables export SSID="MyNetwork" export PASSWORD="secret123" export START_IP="192.168.1.51" %(prog)s # Verbose sequential mode (see everything) %(prog)s -s ClubHouse2G -p mypassword --sequential -v """ ) parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project directory (default: current dir)') parser.add_argument('-s', '--ssid', default=os.environ.get('SSID', 'ClubHouse2G'), help='WiFi SSID (default: ClubHouse2G or $SSID)') parser.add_argument('-p', '--password', default=os.environ.get('PASSWORD', ''), help='WiFi password (default: $PASSWORD)') parser.add_argument('--start-ip', default=os.environ.get('START_IP', '192.168.1.51'), help='Starting IP address (default: 192.168.1.51 or $START_IP)') parser.add_argument('-n', '--num-devices', type=int, default=30, help='Number of devices to deploy (default: 30, use 0 for all)') parser.add_argument('-g', '--gateway', default=os.environ.get('GATEWAY', '192.168.1.1'), help='Gateway IP (default: 192.168.1.1 or $GATEWAY)') parser.add_argument('-m', '--netmask', default=os.environ.get('NETMASK', '255.255.255.0'), help='Network mask (default: 255.255.255.0 or $NETMASK)') parser.add_argument('-b', '--baud', type=int, default=int(os.environ.get('BAUD_RATE', 460800)), help='Baud rate for flashing (default: 460800 or $BAUD_RATE)') parser.add_argument('-r', '--retries', type=int, default=int(os.environ.get('MAX_RETRIES', 2)), help='Max retries per device (default: 2 or $MAX_RETRIES)') parser.add_argument('--no-verify', action='store_true', help='Skip ping verification') parser.add_argument('--sequential', action='store_true', help='Flash devices sequentially instead of parallel') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') args = parser.parse_args() # Validate password if not args.password: print(f"{Colors.RED}ERROR: WiFi password not set!{Colors.NC}") print() print("Provide password via:") print(" 1. Command line: -p 'your_password'") print(" 2. Environment: export PASSWORD='your_password'") print() print("Example:") print(f" {sys.argv[0]} -s MyWiFi -p mypassword") sys.exit(1) # Create deployer deployer = DeviceDeployer( project_dir=args.dir, ssid=args.ssid, password=args.password, start_ip=args.start_ip, netmask=args.netmask, gateway=args.gateway, baud_rate=args.baud, max_retries=args.retries, verify_ping=not args.no_verify, num_devices=args.num_devices if args.num_devices > 0 else None, verbose=args.verbose, parallel=not args.sequential ) # Run deployment deployer.print_banner() if not deployer.build_firmware(): sys.exit(1) if not deployer.detect_devices(): sys.exit(1) deployer.deploy_all() failed_count = deployer.print_summary() sys.exit(failed_count) if __name__ == '__main__': main()