ESP32/mass_deploy.py

504 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ESP32 Mass Deployment Tool
Parallel flashing and WiFi configuration for multiple ESP32 devices
"""
import os
import sys
import subprocess
import glob
import time
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
NC = '\033[0m' # No Color
class DeviceDeployer:
def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51",
netmask="255.255.255.0", gateway="192.168.1.1",
baud_rate=460800, max_retries=2, verify_ping=True,
num_devices=None, verbose=False, parallel=True):
self.project_dir = Path(project_dir)
self.ssid = ssid
self.password = password
self.start_ip = start_ip
self.netmask = netmask
self.gateway = gateway
self.baud_rate = baud_rate
self.max_retries = max_retries
self.verify_ping = verify_ping
self.num_devices = num_devices
self.verbose = verbose
self.parallel = parallel
# Parse IP address
ip_parts = start_ip.split('.')
self.ip_base = '.'.join(ip_parts[:3])
self.ip_start = int(ip_parts[3])
self.devices = []
self.results = {}
self.log_dir = Path('/tmp')
def print_banner(self):
"""Print deployment configuration"""
print()
print(f"{Colors.BLUE}{'='*70}")
print("ESP32 Mass Deployment Tool")
print(f"{'='*70}{Colors.NC}")
print(f"Project: {self.project_dir}")
print(f"SSID: {self.ssid}")
print(f"Password: {'*' * len(self.password)}")
print(f"Start IP: {self.start_ip}")
print(f"Gateway: {self.gateway}")
print(f"Netmask: {self.netmask}")
print(f"Baud Rate: {self.baud_rate}")
print(f"Max Retries: {self.max_retries}")
print(f"Verify Ping: {self.verify_ping}")
print(f"Mode: {'Parallel' if self.parallel else 'Sequential'}")
if self.num_devices:
print(f"Max Devices: {self.num_devices}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
def build_firmware(self):
"""Build the firmware using idf.py"""
print()
print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}")
try:
result = subprocess.run(
['idf.py', 'build'],
cwd=self.project_dir,
check=True,
capture_output=not self.verbose
)
print(f"{Colors.GREEN}✓ Build complete{Colors.NC}")
return True
except subprocess.CalledProcessError as e:
print(f"{Colors.RED}✗ Build failed!{Colors.NC}")
if self.verbose:
print(e.stderr.decode() if e.stderr else "")
return False
def detect_devices(self):
"""Detect connected ESP32 devices"""
print()
print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}")
# Find all USB serial devices
self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*'))
if not self.devices:
print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}")
print("Connect ESP32 devices via USB and try again.")
return False
# Limit to num_devices if specified
if self.num_devices and len(self.devices) > self.num_devices:
print(f"Limiting to first {self.num_devices} devices")
self.devices = self.devices[:self.num_devices]
print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}")
for i, device in enumerate(self.devices):
ip = self.get_ip_for_index(i)
print(f" [{i:2d}] {device:14s}{ip}")
return True
def get_ip_for_index(self, index):
"""Calculate IP address for device index"""
return f"{self.ip_base}.{self.ip_start + index}"
def flash_and_configure(self, index, device):
"""Flash and configure a single device"""
ip_addr = self.get_ip_for_index(index)
log_file = self.log_dir / f"esp32_deploy_{index}.log"
log_lines = []
def log(msg):
log_lines.append(msg)
if self.verbose or not self.parallel:
print(f"[{index}] {msg}")
for attempt in range(1, self.max_retries + 1):
log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===")
log(f"Target IP: {ip_addr}")
# Flash firmware
log("Flashing...")
try:
result = subprocess.run(
['idf.py', '-p', device, '-b', str(self.baud_rate), 'flash'],
cwd=self.project_dir,
check=True,
capture_output=True,
timeout=300 # 5 minute timeout
)
log("✓ Flash successful")
except subprocess.CalledProcessError as e:
log(f"✗ Flash failed on attempt {attempt}")
if attempt == self.max_retries:
# Write log file
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'FAILED',
'log': log_lines
}
time.sleep(2)
continue
except subprocess.TimeoutExpired:
log(f"✗ Flash timeout on attempt {attempt}")
if attempt == self.max_retries:
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'TIMEOUT',
'log': log_lines
}
continue
# Wait for device to boot
log("Waiting for boot...")
time.sleep(3)
# Configure WiFi
log("Configuring WiFi...")
try:
config = (
f"CFG\n"
f"SSID:{self.ssid}\n"
f"PASS:{self.password}\n"
f"IP:{ip_addr}\n"
f"MASK:{self.netmask}\n"
f"GW:{self.gateway}\n"
f"DHCP:0\n"
f"END\n"
)
with open(device, 'w') as f:
f.write(config)
log("✓ Config sent")
except Exception as e:
log(f"✗ Config error: {e}")
# Wait for network to initialize
log("Waiting for network...")
time.sleep(5)
# Verify connectivity
if self.verify_ping:
log("Verifying connectivity...")
try:
result = subprocess.run(
['ping', '-c', '2', '-W', '3', ip_addr],
capture_output=True,
timeout=10
)
if result.returncode == 0:
log("✓ Ping successful")
# Write log file
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'SUCCESS',
'log': log_lines
}
else:
log(f"✗ Ping failed on attempt {attempt}")
if attempt == self.max_retries:
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'NO_PING',
'log': log_lines
}
except subprocess.TimeoutExpired:
log(f"✗ Ping timeout on attempt {attempt}")
else:
# Write log file
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'SUCCESS',
'log': log_lines
}
time.sleep(2)
# If we get here, all retries failed
with open(log_file, 'w') as f:
f.write('\n'.join(log_lines))
return {
'index': index,
'device': device,
'ip': ip_addr,
'status': 'FAILED',
'log': log_lines
}
def deploy_all_parallel(self):
"""Deploy to all devices in parallel"""
print()
print(f"{Colors.YELLOW}[3/4] Flashing and configuring (parallel)...{Colors.NC}")
print()
# Clean old log files
for f in self.log_dir.glob('esp32_deploy_*.log'):
f.unlink()
# Use ThreadPoolExecutor for parallel execution
max_workers = min(32, len(self.devices))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all jobs
futures = {
executor.submit(self.flash_and_configure, i, device): (i, device)
for i, device in enumerate(self.devices)
}
# Collect results as they complete
for future in as_completed(futures):
result = future.result()
self.results[result['index']] = result
# Print immediate status
self.print_device_status(result)
def deploy_all_sequential(self):
"""Deploy to devices one at a time (sequential)"""
print()
print(f"{Colors.YELLOW}[3/4] Flashing and configuring (sequential)...{Colors.NC}")
print()
# Clean old log files
for f in self.log_dir.glob('esp32_deploy_*.log'):
f.unlink()
for i, device in enumerate(self.devices):
print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}")
result = self.flash_and_configure(i, device)
self.results[result['index']] = result
# Print status after each device
self.print_device_status(result)
print()
def print_device_status(self, result):
"""Print status for a single device"""
status_color = {
'SUCCESS': Colors.GREEN,
'NO_PING': Colors.YELLOW,
'FAILED': Colors.RED,
'TIMEOUT': Colors.RED
}.get(result['status'], Colors.RED)
status_text = {
'SUCCESS': 'OK',
'NO_PING': 'FLASHED, NO PING',
'FAILED': 'FAILED',
'TIMEOUT': 'TIMEOUT'
}.get(result['status'], 'ERROR')
print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s}"
f"{result['ip']:15s} [{status_text}]{Colors.NC}")
def deploy_all(self):
"""Deploy to all devices (parallel or sequential)"""
if self.parallel:
self.deploy_all_parallel()
else:
self.deploy_all_sequential()
def print_summary(self):
"""Print deployment summary"""
print()
print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
# Count statuses
success_count = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS')
no_ping_count = sum(1 for r in self.results.values() if r['status'] == 'NO_PING')
failed_count = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT'])
# Print all devices
for i in range(len(self.devices)):
if i in self.results:
result = self.results[i]
status_icon = {
'SUCCESS': f"{Colors.GREEN}{Colors.NC}",
'NO_PING': f"{Colors.YELLOW}{Colors.NC}",
'FAILED': f"{Colors.RED}{Colors.NC}",
'TIMEOUT': f"{Colors.RED}{Colors.NC}"
}.get(result['status'], f"{Colors.RED}?{Colors.NC}")
status_msg = {
'NO_PING': " (no ping response)",
'FAILED': " (failed)",
'TIMEOUT': " (timeout)"
}.get(result['status'], "")
print(f"{status_icon} {result['device']:14s}{result['ip']}{status_msg}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
print(f"Total: {len(self.devices)} devices")
print(f"{Colors.GREEN}Success: {success_count}{Colors.NC}")
if no_ping_count > 0:
print(f"{Colors.YELLOW}Warning: {no_ping_count} (flashed but no ping){Colors.NC}")
if failed_count > 0:
print(f"{Colors.RED}Failed: {failed_count}{Colors.NC}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
# Print log location
print()
print(f"Logs: /tmp/esp32_deploy_*.log")
# Print test commands
print()
print("Test commands:")
print(f" # Test first device")
print(f" iperf -c {self.get_ip_for_index(0)}")
print()
print(f" # Ping all devices")
ip_range = f"{self.ip_start}..{self.ip_start + len(self.devices) - 1}"
print(f" for i in {{{ip_range}}}; do ping -c 1 {self.ip_base}.$i & done; wait")
print()
return failed_count
def main():
parser = argparse.ArgumentParser(
description='ESP32 Mass Deployment Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Deploy to first 30 devices (default)
%(prog)s -s ClubHouse2G -p mypassword
# Deploy sequentially (easier debugging)
%(prog)s -s ClubHouse2G -p mypassword --sequential
# Deploy to all connected devices
%(prog)s -s ClubHouse2G -p mypassword -n 0
# Custom IP range
%(prog)s -s ClubHouse2G -p mypassword --start-ip 10.0.0.100
# From environment variables
export SSID="MyNetwork"
export PASSWORD="secret123"
export START_IP="192.168.1.51"
%(prog)s
# Verbose sequential mode (see everything)
%(prog)s -s ClubHouse2G -p mypassword --sequential -v
"""
)
parser.add_argument('-d', '--dir', default=os.getcwd(),
help='ESP-IDF project directory (default: current dir)')
parser.add_argument('-s', '--ssid',
default=os.environ.get('SSID', 'ClubHouse2G'),
help='WiFi SSID (default: ClubHouse2G or $SSID)')
parser.add_argument('-p', '--password',
default=os.environ.get('PASSWORD', ''),
help='WiFi password (default: $PASSWORD)')
parser.add_argument('--start-ip',
default=os.environ.get('START_IP', '192.168.1.51'),
help='Starting IP address (default: 192.168.1.51 or $START_IP)')
parser.add_argument('-n', '--num-devices', type=int, default=30,
help='Number of devices to deploy (default: 30, use 0 for all)')
parser.add_argument('-g', '--gateway',
default=os.environ.get('GATEWAY', '192.168.1.1'),
help='Gateway IP (default: 192.168.1.1 or $GATEWAY)')
parser.add_argument('-m', '--netmask',
default=os.environ.get('NETMASK', '255.255.255.0'),
help='Network mask (default: 255.255.255.0 or $NETMASK)')
parser.add_argument('-b', '--baud', type=int,
default=int(os.environ.get('BAUD_RATE', 460800)),
help='Baud rate for flashing (default: 460800 or $BAUD_RATE)')
parser.add_argument('-r', '--retries', type=int,
default=int(os.environ.get('MAX_RETRIES', 2)),
help='Max retries per device (default: 2 or $MAX_RETRIES)')
parser.add_argument('--no-verify', action='store_true',
help='Skip ping verification')
parser.add_argument('--sequential', action='store_true',
help='Flash devices sequentially instead of parallel')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
# Validate password
if not args.password:
print(f"{Colors.RED}ERROR: WiFi password not set!{Colors.NC}")
print()
print("Provide password via:")
print(" 1. Command line: -p 'your_password'")
print(" 2. Environment: export PASSWORD='your_password'")
print()
print("Example:")
print(f" {sys.argv[0]} -s MyWiFi -p mypassword")
sys.exit(1)
# Create deployer
deployer = DeviceDeployer(
project_dir=args.dir,
ssid=args.ssid,
password=args.password,
start_ip=args.start_ip,
netmask=args.netmask,
gateway=args.gateway,
baud_rate=args.baud,
max_retries=args.retries,
verify_ping=not args.no_verify,
num_devices=args.num_devices if args.num_devices > 0 else None,
verbose=args.verbose,
parallel=not args.sequential
)
# Run deployment
deployer.print_banner()
if not deployer.build_firmware():
sys.exit(1)
if not deployer.detect_devices():
sys.exit(1)
deployer.deploy_all()
failed_count = deployer.print_summary()
sys.exit(failed_count)
if __name__ == '__main__':
main()