431 lines
14 KiB
Python
Executable File
431 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 Parallel Mass Flash Script
|
|
Build and flash multiple ESP32 devices concurrently for much faster deployment
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import time
|
|
import argparse
|
|
import shutil
|
|
from pathlib import Path
|
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
|
|
from multiprocessing import cpu_count
|
|
|
|
# Import the detection script
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
try:
|
|
import detect_esp32
|
|
except ImportError:
|
|
print("Error: detect_esp32.py must be in the same directory")
|
|
sys.exit(1)
|
|
|
|
|
|
def detect_device_type(port_info):
|
|
"""Detect ESP32 variant based on USB chip"""
|
|
if port_info.vid == 0x303A:
|
|
return 'esp32s3'
|
|
return 'esp32'
|
|
|
|
|
|
def probe_chip_type(port):
|
|
"""Probe the actual chip type using esptool.py"""
|
|
try:
|
|
result = subprocess.run(
|
|
['esptool.py', '--port', port, 'chip_id'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
output = result.stdout + result.stderr
|
|
|
|
if 'ESP32-S3' in output:
|
|
return 'esp32s3'
|
|
elif 'ESP32-S2' in output:
|
|
return 'esp32s2'
|
|
elif 'ESP32-C3' in output:
|
|
return 'esp32c3'
|
|
elif 'ESP32' in output:
|
|
return 'esp32'
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Could not probe {port}: {e}")
|
|
|
|
return 'esp32'
|
|
|
|
|
|
def create_sdkconfig(build_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'):
|
|
"""Create sdkconfig.defaults file with WiFi and IP configuration"""
|
|
sdkconfig_path = os.path.join(build_dir, 'sdkconfig.defaults')
|
|
|
|
config_content = f"""# WiFi Configuration
|
|
CONFIG_WIFI_SSID="{ssid}"
|
|
CONFIG_WIFI_PASSWORD="{password}"
|
|
CONFIG_WIFI_MAXIMUM_RETRY=5
|
|
|
|
# Static IP Configuration
|
|
CONFIG_USE_STATIC_IP=y
|
|
CONFIG_STATIC_IP_ADDR="{ip_addr}"
|
|
CONFIG_STATIC_GATEWAY_ADDR="{gateway}"
|
|
CONFIG_STATIC_NETMASK_ADDR="{netmask}"
|
|
"""
|
|
|
|
with open(sdkconfig_path, 'w') as f:
|
|
f.write(config_content)
|
|
|
|
|
|
def build_firmware(device_info, project_dir, build_dir, ssid, password):
|
|
"""Build firmware for a single device with unique configuration"""
|
|
dev_num = device_info['number']
|
|
chip_type = device_info['chip']
|
|
ip_addr = device_info['ip']
|
|
|
|
print(f"[Device {dev_num}] Building for {chip_type} with IP {ip_addr}")
|
|
|
|
try:
|
|
# Create build directory
|
|
os.makedirs(build_dir, exist_ok=True)
|
|
|
|
# Copy project files to build directory
|
|
for item in ['main', 'CMakeLists.txt']:
|
|
src = os.path.join(project_dir, item)
|
|
dst = os.path.join(build_dir, item)
|
|
if os.path.isdir(src):
|
|
if os.path.exists(dst):
|
|
shutil.rmtree(dst)
|
|
shutil.copytree(src, dst)
|
|
else:
|
|
shutil.copy2(src, dst)
|
|
|
|
# Create sdkconfig.defaults
|
|
create_sdkconfig(build_dir, ssid, password, ip_addr)
|
|
|
|
# Set target
|
|
result = subprocess.run(
|
|
['idf.py', 'set-target', chip_type],
|
|
cwd=build_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'error': f"Set target failed: {result.stderr[:200]}"
|
|
}
|
|
|
|
# Build
|
|
result = subprocess.run(
|
|
['idf.py', 'build'],
|
|
cwd=build_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'error': f"Build failed: {result.stderr[-500:]}"
|
|
}
|
|
|
|
print(f"[Device {dev_num}] ✓ Build complete")
|
|
|
|
return {
|
|
'success': True,
|
|
'device': dev_num,
|
|
'build_dir': build_dir
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
def flash_device(device_info, build_dir):
|
|
"""Flash a single device"""
|
|
dev_num = device_info['number']
|
|
port = device_info['port']
|
|
ip_addr = device_info['ip']
|
|
|
|
print(f"[Device {dev_num}] Flashing {port} -> {ip_addr}")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
['idf.py', '-p', port, 'flash'],
|
|
cwd=build_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'port': port,
|
|
'error': f"Flash failed: {result.stderr[-500:]}"
|
|
}
|
|
|
|
print(f"[Device {dev_num}] ✓ Flash complete at {ip_addr}")
|
|
|
|
return {
|
|
'success': True,
|
|
'device': dev_num,
|
|
'port': port,
|
|
'ip': ip_addr
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'port': port,
|
|
'error': "Flash timeout"
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'device': dev_num,
|
|
'port': port,
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
def build_and_flash(device_info, project_dir, work_dir, ssid, password):
|
|
"""Combined build and flash for a single device"""
|
|
dev_num = device_info['number']
|
|
build_dir = os.path.join(work_dir, f'build_device_{dev_num}')
|
|
|
|
# Build
|
|
build_result = build_firmware(device_info, project_dir, build_dir, ssid, password)
|
|
if not build_result['success']:
|
|
return build_result
|
|
|
|
# Flash
|
|
flash_result = flash_device(device_info, build_dir)
|
|
|
|
# Clean up build directory to save space
|
|
try:
|
|
shutil.rmtree(build_dir)
|
|
except:
|
|
pass
|
|
|
|
return flash_result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Parallel mass flash ESP32 devices')
|
|
parser.add_argument('--ssid', required=True, help='WiFi SSID')
|
|
parser.add_argument('--password', required=True, help='WiFi password')
|
|
parser.add_argument('--start-ip', default='192.168.1.50',
|
|
help='Starting IP address (default: 192.168.1.50)')
|
|
parser.add_argument('--gateway', default='192.168.1.1',
|
|
help='Gateway IP (default: 192.168.1.1)')
|
|
parser.add_argument('--project-dir', default=None,
|
|
help='ESP32 iperf project directory')
|
|
parser.add_argument('--probe', action='store_true',
|
|
help='Probe each device to detect exact chip type (slower)')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be done without building/flashing')
|
|
parser.add_argument('--build-parallel', type=int, default=None,
|
|
help='Number of parallel builds (default: CPU cores)')
|
|
parser.add_argument('--flash-parallel', type=int, default=8,
|
|
help='Number of parallel flash operations (default: 8)')
|
|
parser.add_argument('--strategy', choices=['build-then-flash', 'build-and-flash'],
|
|
default='build-and-flash',
|
|
help='Deployment strategy (default: build-and-flash)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine parallelism
|
|
if args.build_parallel is None:
|
|
args.build_parallel = max(1, cpu_count() - 1)
|
|
|
|
# Find project directory
|
|
if args.project_dir:
|
|
project_dir = args.project_dir
|
|
else:
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_dir = script_dir
|
|
|
|
if not os.path.exists(os.path.join(project_dir, 'main')):
|
|
project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf')
|
|
|
|
if not os.path.exists(project_dir):
|
|
print(f"ERROR: Project directory not found: {project_dir}")
|
|
sys.exit(1)
|
|
|
|
# Create work directory for builds
|
|
work_dir = os.path.join(project_dir, '.builds')
|
|
os.makedirs(work_dir, exist_ok=True)
|
|
|
|
print(f"Using project directory: {project_dir}")
|
|
print(f"Work directory: {work_dir}")
|
|
|
|
# Detect devices
|
|
print("\nDetecting ESP32 devices...")
|
|
devices = detect_esp32.detect_esp32_devices()
|
|
|
|
if not devices:
|
|
print("No ESP32 devices detected!")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(devices)} device(s)")
|
|
|
|
# Prepare device list with IPs
|
|
base_parts = args.start_ip.split('.')
|
|
device_list = []
|
|
|
|
for idx, device in enumerate(devices, 1):
|
|
if args.probe:
|
|
print(f"Probing {device.device}...")
|
|
chip_type = probe_chip_type(device.device)
|
|
else:
|
|
chip_type = detect_device_type(device)
|
|
|
|
ip_last = int(base_parts[3]) + idx - 1
|
|
ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}"
|
|
|
|
device_list.append({
|
|
'number': idx,
|
|
'port': device.device,
|
|
'chip': chip_type,
|
|
'ip': ip,
|
|
'info': device
|
|
})
|
|
|
|
# Display plan
|
|
print(f"\n{'='*70}")
|
|
print("PARALLEL FLASH PLAN")
|
|
print(f"{'='*70}")
|
|
print(f"SSID: {args.ssid}")
|
|
print(f"Strategy: {args.strategy}")
|
|
print(f"Build parallelism: {args.build_parallel}")
|
|
print(f"Flash parallelism: {args.flash_parallel}")
|
|
print()
|
|
|
|
for dev in device_list:
|
|
print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {dev['ip']}")
|
|
|
|
if args.dry_run:
|
|
print("\nDry run - no devices will be built or flashed")
|
|
return
|
|
|
|
# Confirm
|
|
print(f"\n{'='*70}")
|
|
response = input("Proceed with parallel flashing? (yes/no): ").strip().lower()
|
|
if response != 'yes':
|
|
print("Aborted.")
|
|
return
|
|
|
|
print(f"\n{'='*70}")
|
|
print("STARTING PARALLEL DEPLOYMENT")
|
|
print(f"{'='*70}\n")
|
|
|
|
start_time = time.time()
|
|
|
|
if args.strategy == 'build-then-flash':
|
|
# Strategy 1: Build all, then flash all
|
|
print(f"Phase 1: Building {len(device_list)} configurations with {args.build_parallel} parallel builds...")
|
|
|
|
build_results = []
|
|
with ProcessPoolExecutor(max_workers=args.build_parallel) as executor:
|
|
futures = {}
|
|
for dev in device_list:
|
|
build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}')
|
|
future = executor.submit(
|
|
build_firmware, dev, project_dir, build_dir, args.ssid, args.password
|
|
)
|
|
futures[future] = dev
|
|
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
build_results.append(result)
|
|
if not result['success']:
|
|
print(f"[Device {result['device']}] ✗ Build failed: {result['error']}")
|
|
|
|
# Flash phase
|
|
successful_builds = [r for r in build_results if r['success']]
|
|
print(f"\nPhase 2: Flashing {len(successful_builds)} devices with {args.flash_parallel} parallel operations...")
|
|
|
|
flash_results = []
|
|
with ThreadPoolExecutor(max_workers=args.flash_parallel) as executor:
|
|
futures = {}
|
|
for result in successful_builds:
|
|
dev = device_list[result['device'] - 1]
|
|
build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}')
|
|
future = executor.submit(flash_device, dev, build_dir)
|
|
futures[future] = dev
|
|
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
flash_results.append(result)
|
|
if not result['success']:
|
|
print(f"[Device {result['device']}] ✗ Flash failed: {result['error']}")
|
|
|
|
# Cleanup
|
|
print("\nCleaning up build directories...")
|
|
try:
|
|
shutil.rmtree(work_dir)
|
|
except:
|
|
pass
|
|
|
|
final_results = flash_results
|
|
|
|
else:
|
|
# Strategy 2: Build and flash together (limited parallelism)
|
|
print(f"Building and flashing with {args.build_parallel} parallel operations...")
|
|
|
|
final_results = []
|
|
with ProcessPoolExecutor(max_workers=args.build_parallel) as executor:
|
|
futures = {}
|
|
for dev in device_list:
|
|
future = executor.submit(
|
|
build_and_flash, dev, project_dir, work_dir, args.ssid, args.password
|
|
)
|
|
futures[future] = dev
|
|
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
final_results.append(result)
|
|
if not result['success']:
|
|
print(f"[Device {result['device']}] ✗ Failed: {result['error']}")
|
|
|
|
# Summary
|
|
elapsed_time = time.time() - start_time
|
|
success_count = sum(1 for r in final_results if r['success'])
|
|
failed_devices = [r['device'] for r in final_results if not r['success']]
|
|
|
|
print(f"\n{'='*70}")
|
|
print("DEPLOYMENT SUMMARY")
|
|
print(f"{'='*70}")
|
|
print(f"Successfully deployed: {success_count}/{len(device_list)} devices")
|
|
print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)")
|
|
print(f"Average time per device: {elapsed_time/len(device_list):.1f} seconds")
|
|
|
|
if failed_devices:
|
|
print(f"\nFailed devices: {', '.join(map(str, failed_devices))}")
|
|
|
|
print(f"{'='*70}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\nInterrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\nFATAL ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|