#!/usr/bin/env python3 """ ESP32 Mass Deployment Tool Parallel or sequential flashing and WiFi configuration for multiple ESP32 devices """ import os import sys import subprocess import glob import time import argparse from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' NC = '\033[0m' # No Color class DeviceDeployer: def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51", netmask="255.255.255.0", gateway="192.168.1.1", baud_rate=460800, max_retries=2, verify_ping=True, num_devices=None, verbose=False, parallel=True): self.project_dir = Path(project_dir) self.ssid = ssid self.password = password self.start_ip = start_ip self.netmask = netmask self.gateway = gateway self.baud_rate = baud_rate self.max_retries = max_retries self.verify_ping = verify_ping self.num_devices = num_devices self.verbose = verbose self.parallel = parallel # Parse IP address ip_parts = start_ip.split('.') self.ip_base = '.'.join(ip_parts[:3]) self.ip_start = int(ip_parts[3]) self.devices = [] self.results = {} def print_banner(self): """Print deployment configuration""" print() print(f"{Colors.BLUE}{'='*70}") print("ESP32 Mass Deployment Tool") print(f"{'='*70}{Colors.NC}") print(f"Project: {self.project_dir}") print(f"SSID: {self.ssid}") print(f"Password: {'*' * len(self.password)}") print(f"Start IP: {self.start_ip}") print(f"Gateway: {self.gateway}") print(f"Netmask: {self.netmask}") print(f"Baud Rate: {self.baud_rate}") print(f"Max Retries: {self.max_retries}") print(f"Verify Ping: {self.verify_ping}") print(f"Mode: {'Parallel' if self.parallel else 'Sequential'}") if self.num_devices: print(f"Max Devices: {self.num_devices}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") def build_firmware(self): """Build the firmware using idf.py""" print() print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}") try: result = subprocess.run( ['idf.py', 'build'], cwd=self.project_dir, check=True, capture_output=not self.verbose ) print(f"{Colors.GREEN}✓ Build complete{Colors.NC}") return True except subprocess.CalledProcessError as e: print(f"{Colors.RED}✗ Build failed!{Colors.NC}") if self.verbose: print(e.stderr.decode() if e.stderr else "") return False def detect_devices(self): """Detect connected ESP32 devices""" print() print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}") # Find all USB serial devices self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')) if not self.devices: print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}") print("Connect ESP32 devices via USB and try again.") return False # Limit to num_devices if specified if self.num_devices and len(self.devices) > self.num_devices: print(f"Limiting to first {self.num_devices} devices") self.devices = self.devices[:self.num_devices] print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}") for i, device in enumerate(self.devices): ip = self.get_ip_for_index(i) print(f" [{i:2d}] {device:14s} → {ip}") return True def get_ip_for_index(self, index): """Calculate IP address for device index""" return f"{self.ip_base}.{self.ip_start + index}" def flash_and_configure(self, index, device): """Flash and configure a single device""" ip_addr = self.get_ip_for_index(index) log_lines = [] def log(msg): log_lines.append(msg) if self.verbose or not self.parallel: print(f"[{index}] {msg}") for attempt in range(1, self.max_retries + 1): log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===") log(f"Target IP: {ip_addr}") # Flash firmware log("Flashing...") try: subprocess.run( ['idf.py', '-p', device, '-b', str(self.baud_rate), 'flash'], cwd=self.project_dir, check=True, capture_output=not (self.verbose or not self.parallel), timeout=300 # 5 minute timeout ) log("✓ Flash successful") except subprocess.CalledProcessError as e: log(f"✗ Flash failed on attempt {attempt}") if attempt == self.max_retries: return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'FAILED', 'log': log_lines } time.sleep(2) continue except subprocess.TimeoutExpired: log(f"✗ Flash timeout on attempt {attempt}") if attempt == self.max_retries: return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'TIMEOUT', 'log': log_lines } continue # Wait for device to boot log("Waiting for boot...") time.sleep(3) # Configure WiFi log("Configuring WiFi...") try: config = ( f"CFG\n" f"SSID:{self.ssid}\n" f"PASS:{self.password}\n" f"IP:{ip_addr}\n" f"MASK:{self.netmask}\n" f"GW:{self.gateway}\n" f"DHCP:0\n" f"END\n" ) with open(device, 'w') as f: f.write(config) log("✓ Config sent") except Exception as e: log(f"✗ Config error: {e}") # Wait for network to initialize log("Waiting for network...") time.sleep(5) # Verify connectivity if self.verify_ping: log("Verifying connectivity...") try: result = subprocess.run( ['ping', '-c', '2', '-W', '3', ip_addr], capture_output=True, timeout=10 ) if result.returncode == 0: log("✓ Ping successful") return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'SUCCESS', 'log': log_lines } else: log(f"✗ Ping failed on attempt {attempt}") if attempt == self.max_retries: return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'NO_PING', 'log': log_lines } except subprocess.TimeoutExpired: log(f"✗ Ping timeout on attempt {attempt}") else: return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'SUCCESS', 'log': log_lines } time.sleep(2) # If we get here, all retries failed return { 'index': index, 'device': device, 'ip': ip_addr, 'status': 'FAILED', 'log': log_lines } def deploy_all_parallel(self): """Deploy to all devices in parallel""" print() print(f"{Colors.YELLOW}[3/4] Flashing and configuring (parallel)...{Colors.NC}") print() # Use ThreadPoolExecutor for parallel execution max_workers = min(32, len(self.devices)) with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all jobs futures = { executor.submit(self.flash_and_configure, i, device): (i, device) for i, device in enumerate(self.devices) } # Collect results as they complete for future in as_completed(futures): result = future.result() self.results[result['index']] = result # Print immediate status self.print_device_status(result) def deploy_all_sequential(self): """Deploy to devices one at a time (sequential)""" print() print(f"{Colors.YELLOW}[3/4] Flashing and configuring (sequential)...{Colors.NC}") print() for i, device in enumerate(self.devices): print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}") result = self.flash_and_configure(i, device) self.results[result['index']] = result # Print status after each device self.print_device_status(result) print() def print_device_status(self, result): """Print status for a single device""" status_color = { 'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW, 'FAILED': Colors.RED, 'TIMEOUT': Colors.RED }.get(result['status'], Colors.RED) status_text = { 'SUCCESS': 'OK', 'NO_PING': 'FLASHED, NO PING', 'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT' }.get(result['status'], 'ERROR') print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s} → " f"{result['ip']:15s} [{status_text}]{Colors.NC}") def deploy_all(self): """Deploy to all devices (parallel or sequential)""" if self.parallel: self.deploy_all_parallel() else: self.deploy_all_sequential() def print_summary(self): """Print deployment summary""" print() print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") # Count statuses success_count = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS') no_ping_count = sum(1 for r in self.results.values() if r['status'] == 'NO_PING') failed_count = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT']) # Print all devices for i in range(len(self.devices)): if i in self.results: result = self.results[i] status_icon = { 'SUCCESS': f"{Colors.GREEN}✓{Colors.NC}", 'NO_PING': f"{Colors.YELLOW}⚠{Colors.NC}", 'FAILED': f"{Colors.RED}✗{Colors.NC}", 'TIMEOUT': f"{Colors.RED}✗{Colors.NC}" }.get(result['status'], f"{Colors.RED}?{Colors.NC}") status_msg = { 'NO_PING': " (no ping response)", 'FAILED': " (failed)", 'TIMEOUT': " (timeout)" }.get(result['status'], "") print(f"{status_icon} {result['device']:14s} → {result['ip']}{status_msg}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") print(f"Total: {len(self.devices)} devices") print(f"{Colors.GREEN}Success: {success_count}{Colors.NC}") if no_ping_count > 0: print(f"{Colors.YELLOW}Warning: {no_ping_count} (flashed but no ping){Colors.NC}") if failed_count > 0: print(f"{Colors.RED}Failed: {failed_count}{Colors.NC}") print(f"{Colors.BLUE}{'='*70}{Colors.NC}") # Print test commands print() print("Test commands:") print(f" # Ping all devices") ip_range = f"{self.ip_start}..{self.ip_start + len(self.devices) - 1}" print(f" for i in {{{ip_range}}}; do ping -c 1 {self.ip_base}.$i & done; wait") print() print(f" # Test first device with iperf") print(f" iperf -c {self.get_ip_for_index(0)}") print() print(f" # Check all device status") print(f" ./check_device_status.py --reset") print() return failed_count def main(): parser = argparse.ArgumentParser( description='ESP32 Mass Deployment Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Deploy to first 15 devices (parallel, default) %(prog)s -s ClubHouse2G -p mypassword # Deploy sequentially (easier debugging) %(prog)s -s ClubHouse2G -p mypassword --sequential # Deploy to 31 devices in parallel %(prog)s -s ClubHouse2G -p mypassword -n 31 # Custom IP range %(prog)s -s ClubHouse2G -p mypassword --start-ip 10.0.0.100 # From environment variables export WIFI_SSID="MyNetwork" export WIFI_PASSWORD="secret123" %(prog)s # Verbose sequential mode (see everything) %(prog)s -s ClubHouse2G -p mypassword --sequential -v # Skip ping verification (faster) %(prog)s -s ClubHouse2G -p mypassword --no-verify """ ) parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project directory (default: current dir)') parser.add_argument('-s', '--ssid', default=os.environ.get('WIFI_SSID', 'ClubHouse2G'), help='WiFi SSID (default: ClubHouse2G or $WIFI_SSID)') parser.add_argument('-p', '--password', default=os.environ.get('WIFI_PASSWORD', ''), help='WiFi password (default: $WIFI_PASSWORD)') parser.add_argument('--start-ip', default='192.168.1.51', help='Starting IP address (default: 192.168.1.51)') parser.add_argument('-n', '--num-devices', type=int, default=15, help='Number of devices to deploy (default: 15, use 0 for all)') parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP (default: 192.168.1.1)') parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Network mask (default: 255.255.255.0)') parser.add_argument('-b', '--baud', type=int, default=460800, help='Baud rate for flashing (default: 460800)') parser.add_argument('-r', '--retries', type=int, default=2, help='Max retries per device (default: 2)') parser.add_argument('--no-verify', action='store_true', help='Skip ping verification') parser.add_argument('--sequential', action='store_true', help='Flash devices sequentially instead of parallel (easier debugging)') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') args = parser.parse_args() # Validate password if not args.password: print(f"{Colors.RED}ERROR: WiFi password not set!{Colors.NC}") print() print("Provide password via:") print(" 1. Command line: -p 'your_password'") print(" 2. Environment: export WIFI_PASSWORD='your_password'") print() print("Example:") print(f" {sys.argv[0]} -s MyWiFi -p mypassword") sys.exit(1) # Create deployer deployer = DeviceDeployer( project_dir=args.dir, ssid=args.ssid, password=args.password, start_ip=args.start_ip, netmask=args.netmask, gateway=args.gateway, baud_rate=args.baud, max_retries=args.retries, verify_ping=not args.no_verify, num_devices=args.num_devices if args.num_devices > 0 else None, verbose=args.verbose, parallel=not args.sequential ) # Run deployment deployer.print_banner() if not deployer.build_firmware(): sys.exit(1) if not deployer.detect_devices(): sys.exit(1) deployer.deploy_all() failed_count = deployer.print_summary() sys.exit(failed_count) if __name__ == '__main__': main()