ESP32/mass_deploy.py

320 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ESP32 Mass Deployment Tool (Fixed for Parallel Flashing & Path Issues)
Uses esptool.py from the build directory to resolve relative paths correctly.
"""
import os
import sys
import subprocess
import glob
import time
import argparse
import serial
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
NC = '\033[0m'
class DeviceDeployer:
def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51",
netmask="255.255.255.0", gateway="192.168.1.1",
baud_rate=460800, max_retries=2, verify_ping=True,
num_devices=None, verbose=False, parallel=True):
self.project_dir = Path(project_dir).resolve() # Absolute path is safer
self.build_dir = self.project_dir / 'build'
self.ssid = ssid
self.password = password
self.start_ip = start_ip
self.netmask = netmask
self.gateway = gateway
self.baud_rate = baud_rate
self.max_retries = max_retries
self.verify_ping = verify_ping
self.num_devices = num_devices
self.verbose = verbose
self.parallel = parallel
self.config_mode = (self.ssid is not None and self.password is not None)
if self.start_ip:
ip_parts = start_ip.split('.')
self.ip_base = '.'.join(ip_parts[:3])
self.ip_start = int(ip_parts[3])
else:
self.ip_base = "192.168.1"
self.ip_start = 51
self.devices = []
self.results = {}
self.log_dir = Path('/tmp')
def print_banner(self):
print()
print(f"{Colors.BLUE}{'='*70}")
print("ESP32 Mass Deployment Tool")
print(f"{'='*70}{Colors.NC}")
print(f"Project: {self.project_dir}")
print(f"Build Dir: {self.build_dir}")
if self.config_mode:
print(f"Mode: {Colors.YELLOW}FLASH + CONFIGURE{Colors.NC}")
print(f"SSID: {self.ssid}")
print(f"Password: {'*' * len(self.password)}")
print(f"Start IP: {self.start_ip}")
else:
print(f"Mode: {Colors.GREEN}FLASH ONLY (Preserve NVS){Colors.NC}")
print(f"Flash Baud: {self.baud_rate}")
print(f"Parallel: {self.parallel}")
if self.num_devices:
print(f"Max Devices: {self.num_devices}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
def build_firmware(self):
print()
print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}")
try:
subprocess.run(
['idf.py', 'build'],
cwd=self.project_dir,
check=True,
capture_output=not self.verbose
)
flash_args_path = self.build_dir / 'flash_args'
if not flash_args_path.exists():
print(f"{Colors.RED}Error: build/flash_args not found.{Colors.NC}")
return False
print(f"{Colors.GREEN}✓ Build complete{Colors.NC}")
return True
except subprocess.CalledProcessError as e:
print(f"{Colors.RED}✗ Build failed!{Colors.NC}")
if self.verbose: print(e.stderr.decode() if e.stderr else "")
return False
def detect_devices(self):
print()
print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}")
self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*'))
if not self.devices:
print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}")
return False
if self.num_devices and len(self.devices) > self.num_devices:
print(f"Limiting to first {self.num_devices} devices")
self.devices = self.devices[:self.num_devices]
print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}")
for i, device in enumerate(self.devices):
if self.config_mode:
print(f" [{i:2d}] {device:14s}{self.get_ip_for_index(i)}")
else:
print(f" [{i:2d}] {device:14s} → (Existing IP)")
return True
def get_ip_for_index(self, index):
return f"{self.ip_base}.{self.ip_start + index}"
def flash_and_configure(self, index, device):
target_ip = self.get_ip_for_index(index) if self.config_mode else "Existing IP"
log_file = self.log_dir / f"esp32_deploy_{index}.log"
log_lines = []
flash_args_file = 'flash_args' # Relative to build_dir
def log(msg):
log_lines.append(msg)
if self.verbose or not self.parallel:
print(f"[{index}] {msg}")
for attempt in range(1, self.max_retries + 1):
log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===")
# --- FLASHING ---
log("Flashing via esptool...")
try:
cmd = [
'esptool.py',
'-p', device,
'-b', str(self.baud_rate),
'--before', 'default_reset',
'--after', 'hard_reset',
'write_flash',
f"@{flash_args_file}"
]
# CRITICAL FIX: Run from build_dir so relative paths in flash_args are valid
result = subprocess.run(
cmd,
cwd=self.build_dir,
check=True,
capture_output=True,
timeout=300
)
log("✓ Flash successful")
except subprocess.CalledProcessError as e:
log(f"✗ Flash failed: {e.stderr.decode() if e.stderr else 'Unknown error'}")
if attempt == self.max_retries:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines}
time.sleep(2)
continue
except subprocess.TimeoutExpired:
log("✗ Flash timeout")
if attempt == self.max_retries:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'TIMEOUT', 'log': log_lines}
continue
# --- CONFIGURATION ---
log("Waiting for boot (3s)...")
time.sleep(3)
if self.config_mode:
log(f"Configuring WiFi ({target_ip})...")
try:
config = (
f"CFG\n"
f"SSID:{self.ssid}\n"
f"PASS:{self.password}\n"
f"IP:{target_ip}\n"
f"MASK:{self.netmask}\n"
f"GW:{self.gateway}\n"
f"DHCP:0\n"
f"END\n"
)
with serial.Serial(device, 115200, timeout=2, write_timeout=2) as ser:
ser.reset_input_buffer()
ser.write(config.encode('utf-8'))
ser.flush()
log("✓ Config sent")
if self.verify_ping:
log("Waiting for network (6s)...")
time.sleep(6)
log(f"Pinging {target_ip}...")
try:
res = subprocess.run(['ping', '-c', '2', '-W', '3', target_ip], capture_output=True, timeout=10)
if res.returncode == 0:
log("✓ Ping successful")
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
else:
log("✗ Ping failed")
except:
log("✗ Ping error")
else:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
except Exception as e:
log(f"✗ Config error: {e}")
else:
log("Configuration skipped (Preserving NVS)")
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
time.sleep(2)
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines}
def deploy_all_parallel(self):
print()
print(f"{Colors.YELLOW}[3/4] Flashing (parallel)...{Colors.NC}")
max_workers = min(10, len(self.devices))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.flash_and_configure, i, device): (i, device)
for i, device in enumerate(self.devices)
}
for future in as_completed(futures):
result = future.result()
self.results[result['index']] = result
self.print_device_status(result)
def deploy_all_sequential(self):
print()
print(f"{Colors.YELLOW}[3/4] Flashing (sequential)...{Colors.NC}")
for i, device in enumerate(self.devices):
print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}")
result = self.flash_and_configure(i, device)
self.results[result['index']] = result
self.print_device_status(result)
def print_device_status(self, result):
status_color = {
'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW,
'FAILED': Colors.RED, 'TIMEOUT': Colors.RED
}.get(result['status'], Colors.RED)
print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s}{result['ip']:15s} [{result['status']}]{Colors.NC}")
def deploy_all(self):
if self.parallel: self.deploy_all_parallel()
else: self.deploy_all_sequential()
def print_summary(self):
print()
print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
success = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS')
failed = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT'])
for i in range(len(self.devices)):
if i in self.results:
r = self.results[i]
icon = f"{Colors.GREEN}{Colors.NC}" if r['status'] == 'SUCCESS' else f"{Colors.RED}{Colors.NC}"
print(f"{icon} {r['device']:14s}{r['ip']}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
print(f"Total: {len(self.devices)}")
print(f"Success: {success}")
print(f"Failed: {failed}")
return failed
def main():
parser = argparse.ArgumentParser(description='ESP32 Mass Deployment Tool')
parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project dir')
parser.add_argument('-s', '--ssid', help='WiFi SSID')
parser.add_argument('-p', '--password', help='WiFi Password')
parser.add_argument('--start-ip', default='192.168.1.51', help='Starting IP')
parser.add_argument('-n', '--num-devices', type=int, default=30, help='Max devices')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud rate')
parser.add_argument('-r', '--retries', type=int, default=2, help='Retries')
parser.add_argument('--no-verify', action='store_true', help='Skip ping check')
parser.add_argument('--sequential', action='store_true', help='Run sequentially')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose')
args = parser.parse_args()
if (args.ssid and not args.password) or (args.password and not args.ssid):
print(f"{Colors.RED}ERROR: Provide both SSID and Password for config, or neither for flash-only.{Colors.NC}")
sys.exit(1)
deployer = DeviceDeployer(
project_dir=args.dir, ssid=args.ssid, password=args.password,
start_ip=args.start_ip, netmask=args.netmask, gateway=args.gateway,
baud_rate=args.baud, max_retries=args.retries, verify_ping=not args.no_verify,
num_devices=args.num_devices if args.num_devices > 0 else None,
verbose=args.verbose, parallel=not args.sequential
)
deployer.print_banner()
if not deployer.build_firmware(): sys.exit(1)
if not deployer.detect_devices(): sys.exit(1)
deployer.deploy_all()
failed_count = deployer.print_summary()
sys.exit(failed_count)
if __name__ == '__main__':
main()