#!/usr/bin/env python3 """ ESP32 Parallel Mass Flash Script Build and flash multiple ESP32 devices concurrently for much faster deployment """ import subprocess import sys import os import time import argparse import shutil from pathlib import Path from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from multiprocessing import cpu_count # Import the detection script sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: import detect_esp32 except ImportError: print("Error: detect_esp32.py must be in the same directory") sys.exit(1) def detect_device_type(port_info): """Detect ESP32 variant based on USB chip""" if port_info.vid == 0x303A: return 'esp32s3' return 'esp32' def probe_chip_type(port): """Probe the actual chip type using esptool.py""" try: result = subprocess.run( ['esptool.py', '--port', port, 'chip_id'], capture_output=True, text=True, timeout=10 ) output = result.stdout + result.stderr if 'ESP32-S3' in output: return 'esp32s3' elif 'ESP32-S2' in output: return 'esp32s2' elif 'ESP32-C3' in output: return 'esp32c3' elif 'ESP32' in output: return 'esp32' except Exception as e: print(f" Warning: Could not probe {port}: {e}") return 'esp32' def create_sdkconfig(build_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'): """Create sdkconfig.defaults file with WiFi and IP configuration""" sdkconfig_path = os.path.join(build_dir, 'sdkconfig.defaults') config_content = f"""# WiFi Configuration CONFIG_WIFI_SSID="{ssid}" CONFIG_WIFI_PASSWORD="{password}" CONFIG_WIFI_MAXIMUM_RETRY=5 # Static IP Configuration CONFIG_USE_STATIC_IP=y CONFIG_STATIC_IP_ADDR="{ip_addr}" CONFIG_STATIC_GATEWAY_ADDR="{gateway}" CONFIG_STATIC_NETMASK_ADDR="{netmask}" """ with open(sdkconfig_path, 'w') as f: f.write(config_content) def build_firmware(device_info, project_dir, build_dir, ssid, password): """Build firmware for a single device with unique configuration""" dev_num = device_info['number'] chip_type = device_info['chip'] ip_addr = device_info['ip'] print(f"[Device {dev_num}] Building for {chip_type} with IP {ip_addr}") try: # Create build directory os.makedirs(build_dir, exist_ok=True) # Copy project files to build directory for item in ['main', 'CMakeLists.txt']: src = os.path.join(project_dir, item) dst = os.path.join(build_dir, item) if os.path.isdir(src): if os.path.exists(dst): shutil.rmtree(dst) shutil.copytree(src, dst) else: shutil.copy2(src, dst) # Create sdkconfig.defaults create_sdkconfig(build_dir, ssid, password, ip_addr) # Set target result = subprocess.run( ['idf.py', 'set-target', chip_type], cwd=build_dir, capture_output=True, text=True ) if result.returncode != 0: return { 'success': False, 'device': dev_num, 'error': f"Set target failed: {result.stderr[:200]}" } # Build result = subprocess.run( ['idf.py', 'build'], cwd=build_dir, capture_output=True, text=True ) if result.returncode != 0: return { 'success': False, 'device': dev_num, 'error': f"Build failed: {result.stderr[-500:]}" } print(f"[Device {dev_num}] ✓ Build complete") return { 'success': True, 'device': dev_num, 'build_dir': build_dir } except Exception as e: return { 'success': False, 'device': dev_num, 'error': str(e) } def flash_device(device_info, build_dir): """Flash a single device""" dev_num = device_info['number'] port = device_info['port'] ip_addr = device_info['ip'] print(f"[Device {dev_num}] Flashing {port} -> {ip_addr}") try: result = subprocess.run( ['idf.py', '-p', port, 'flash'], cwd=build_dir, capture_output=True, text=True, timeout=120 ) if result.returncode != 0: return { 'success': False, 'device': dev_num, 'port': port, 'error': f"Flash failed: {result.stderr[-500:]}" } print(f"[Device {dev_num}] ✓ Flash complete at {ip_addr}") return { 'success': True, 'device': dev_num, 'port': port, 'ip': ip_addr } except subprocess.TimeoutExpired: return { 'success': False, 'device': dev_num, 'port': port, 'error': "Flash timeout" } except Exception as e: return { 'success': False, 'device': dev_num, 'port': port, 'error': str(e) } def build_and_flash(device_info, project_dir, work_dir, ssid, password): """Combined build and flash for a single device""" dev_num = device_info['number'] build_dir = os.path.join(work_dir, f'build_device_{dev_num}') # Build build_result = build_firmware(device_info, project_dir, build_dir, ssid, password) if not build_result['success']: return build_result # Flash flash_result = flash_device(device_info, build_dir) # Clean up build directory to save space try: shutil.rmtree(build_dir) except: pass return flash_result def main(): parser = argparse.ArgumentParser(description='Parallel mass flash ESP32 devices') parser.add_argument('--ssid', required=True, help='WiFi SSID') parser.add_argument('--password', required=True, help='WiFi password') parser.add_argument('--start-ip', default='192.168.1.50', help='Starting IP address (default: 192.168.1.50)') parser.add_argument('--gateway', default='192.168.1.1', help='Gateway IP (default: 192.168.1.1)') parser.add_argument('--project-dir', default=None, help='ESP32 iperf project directory') parser.add_argument('--probe', action='store_true', help='Probe each device to detect exact chip type (slower)') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without building/flashing') parser.add_argument('--build-parallel', type=int, default=None, help='Number of parallel builds (default: CPU cores)') parser.add_argument('--flash-parallel', type=int, default=8, help='Number of parallel flash operations (default: 8)') parser.add_argument('--strategy', choices=['build-then-flash', 'build-and-flash'], default='build-and-flash', help='Deployment strategy (default: build-and-flash)') args = parser.parse_args() # Determine parallelism if args.build_parallel is None: args.build_parallel = max(1, cpu_count() - 1) # Find project directory if args.project_dir: project_dir = args.project_dir else: script_dir = os.path.dirname(os.path.abspath(__file__)) project_dir = script_dir if not os.path.exists(os.path.join(project_dir, 'main')): project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf') if not os.path.exists(project_dir): print(f"ERROR: Project directory not found: {project_dir}") sys.exit(1) # Create work directory for builds work_dir = os.path.join(project_dir, '.builds') os.makedirs(work_dir, exist_ok=True) print(f"Using project directory: {project_dir}") print(f"Work directory: {work_dir}") # Detect devices print("\nDetecting ESP32 devices...") devices = detect_esp32.detect_esp32_devices() if not devices: print("No ESP32 devices detected!") sys.exit(1) print(f"Found {len(devices)} device(s)") # Prepare device list with IPs base_parts = args.start_ip.split('.') device_list = [] for idx, device in enumerate(devices, 1): if args.probe: print(f"Probing {device.device}...") chip_type = probe_chip_type(device.device) else: chip_type = detect_device_type(device) ip_last = int(base_parts[3]) + idx - 1 ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}" device_list.append({ 'number': idx, 'port': device.device, 'chip': chip_type, 'ip': ip, 'info': device }) # Display plan print(f"\n{'='*70}") print("PARALLEL FLASH PLAN") print(f"{'='*70}") print(f"SSID: {args.ssid}") print(f"Strategy: {args.strategy}") print(f"Build parallelism: {args.build_parallel}") print(f"Flash parallelism: {args.flash_parallel}") print() for dev in device_list: print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {dev['ip']}") if args.dry_run: print("\nDry run - no devices will be built or flashed") return # Confirm print(f"\n{'='*70}") response = input("Proceed with parallel flashing? (yes/no): ").strip().lower() if response != 'yes': print("Aborted.") return print(f"\n{'='*70}") print("STARTING PARALLEL DEPLOYMENT") print(f"{'='*70}\n") start_time = time.time() if args.strategy == 'build-then-flash': # Strategy 1: Build all, then flash all print(f"Phase 1: Building {len(device_list)} configurations with {args.build_parallel} parallel builds...") build_results = [] with ProcessPoolExecutor(max_workers=args.build_parallel) as executor: futures = {} for dev in device_list: build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}') future = executor.submit( build_firmware, dev, project_dir, build_dir, args.ssid, args.password ) futures[future] = dev for future in as_completed(futures): result = future.result() build_results.append(result) if not result['success']: print(f"[Device {result['device']}] ✗ Build failed: {result['error']}") # Flash phase successful_builds = [r for r in build_results if r['success']] print(f"\nPhase 2: Flashing {len(successful_builds)} devices with {args.flash_parallel} parallel operations...") flash_results = [] with ThreadPoolExecutor(max_workers=args.flash_parallel) as executor: futures = {} for result in successful_builds: dev = device_list[result['device'] - 1] build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}') future = executor.submit(flash_device, dev, build_dir) futures[future] = dev for future in as_completed(futures): result = future.result() flash_results.append(result) if not result['success']: print(f"[Device {result['device']}] ✗ Flash failed: {result['error']}") # Cleanup print("\nCleaning up build directories...") try: shutil.rmtree(work_dir) except: pass final_results = flash_results else: # Strategy 2: Build and flash together (limited parallelism) print(f"Building and flashing with {args.build_parallel} parallel operations...") final_results = [] with ProcessPoolExecutor(max_workers=args.build_parallel) as executor: futures = {} for dev in device_list: future = executor.submit( build_and_flash, dev, project_dir, work_dir, args.ssid, args.password ) futures[future] = dev for future in as_completed(futures): result = future.result() final_results.append(result) if not result['success']: print(f"[Device {result['device']}] ✗ Failed: {result['error']}") # Summary elapsed_time = time.time() - start_time success_count = sum(1 for r in final_results if r['success']) failed_devices = [r['device'] for r in final_results if not r['success']] print(f"\n{'='*70}") print("DEPLOYMENT SUMMARY") print(f"{'='*70}") print(f"Successfully deployed: {success_count}/{len(device_list)} devices") print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)") print(f"Average time per device: {elapsed_time/len(device_list):.1f} seconds") if failed_devices: print(f"\nFailed devices: {', '.join(map(str, failed_devices))}") print(f"{'='*70}") if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\nInterrupted by user") sys.exit(1) except Exception as e: print(f"\nFATAL ERROR: {e}") import traceback traceback.print_exc() sys.exit(1)