ESP32/async_mass_deploy.py

277 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ESP32 Async Mass Deployment Tool
Combines parallel flashing (via esptool) with async configuration.
Features:
- Semaphore-limited flashing (prevents USB hub crashes)
- Regex-based boot detection (faster/reliable config)
- Parallel verification
"""
import asyncio
import serial_asyncio
import sys
import os
import argparse
import ipaddress
import re
import time
import logging
from pathlib import Path
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Configuration ---
MAX_CONCURRENT_FLASH = 8 # Limit active flashes to prevent USB brownouts
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
# Logger Adapter for context
class DeviceLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("Deploy")
class DeployWorker:
def __init__(self, port, target_ip, args, build_dir, flash_sem):
self.port = port
self.target_ip = target_ip
self.args = args
self.build_dir = build_dir
self.flash_sem = flash_sem
self.log = DeviceLoggerAdapter(logger, {'connid': port})
# Regex Patterns
self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE)
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
async def run(self):
"""Main deployment workflow"""
try:
# 1. FLASHING PHASE (Protected by Semaphore)
async with self.flash_sem:
if self.args.erase:
if not await self._erase_flash(): return False
if not await self._flash_firmware(): return False
# 2. CONFIG PHASE (Serial interaction)
# We assume flash resets device. We open serial immediately to catch boot.
# Note: We wait a tiny bit to let esptool release the port handle
await asyncio.sleep(0.5)
if self.args.ssid and self.args.password:
if not await self._configure_device(): return False
else:
self.log.info(f"{Colors.GREEN}Flash Complete (NVS Preserved){Colors.RESET}")
return True
except Exception as e:
self.log.error(f"Worker Exception: {e}")
return False
async def _erase_flash(self):
self.log.info("Erasing flash...")
cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash']
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
self.log.info("Erase successful.")
return True
else:
self.log.error(f"Erase failed: {stderr.decode()}")
return False
async def _flash_firmware(self):
self.log.info("Flashing firmware...")
# Use relative path for flash_args (must be run from build_dir)
cmd = [
'esptool.py', '-p', self.port, '-b', str(self.args.baud),
'--before', 'default_reset', '--after', 'hard_reset',
'write_flash', '@flash_args'
]
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=self.build_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait with timeout
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
except asyncio.TimeoutError:
proc.kill()
self.log.error("Flash timed out.")
return False
if proc.returncode == 0:
self.log.info("Flash successful.")
return True
else:
self.log.error(f"Flash failed: {stderr.decode()}")
return False
async def _configure_device(self):
"""Connects via Serial, waits for boot, sends config"""
self.log.info("Connecting to console...")
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
self.log.error(f"Serial open failed: {e}")
return False
try:
# A. Wait for Boot
self.log.info("Waiting for boot...")
booted = False
end_time = time.time() + 8 # 8s boot timeout
while time.time() < end_time:
try:
line_b = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_b.decode('utf-8', errors='ignore').strip()
if self.regex_ready.search(line):
booted = True
break
except asyncio.TimeoutError:
continue
if not booted:
# Even if we didn't see the specific line, we might still be able to config
self.log.warning("Boot prompt not detected, trying to config anyway...")
# B. Send Config
self.log.info(f"Sending config for {self.target_ip}...")
config_str = (
f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\n"
f"IP:{self.target_ip}\nMASK:{self.args.netmask}\nGW:{self.args.gateway}\n"
f"DHCP:0\nEND\n"
)
writer.write(config_str.encode('utf-8'))
await writer.drain()
# C. Verify
self.log.info("Verifying IP...")
start_verify = time.time()
while time.time() < start_verify + 10:
try:
line_b = await asyncio.wait_for(reader.readline(), timeout=1.0)
line = line_b.decode('utf-8', errors='ignore')
m = self.regex_got_ip.search(line)
if m:
if m.group(1) == self.target_ip:
self.log.info(f"{Colors.GREEN}SUCCESS: Configured & Connected{Colors.RESET}")
return True
except asyncio.TimeoutError:
continue
self.log.error("Config sent, but no IP confirmation received.")
return False
except Exception as e:
self.log.error(f"Config error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def main_async():
parser = argparse.ArgumentParser(description='Async ESP32 Mass Deployment')
parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir')
parser.add_argument('-s', '--ssid', help='WiFi SSID')
parser.add_argument('-p', '--password', help='WiFi Password')
parser.add_argument('--start-ip', default='192.168.1.51', help='Start IP')
parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud')
parser.add_argument('--erase', action='store_true', help='Full erase first')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
args = parser.parse_args()
project_dir = Path(args.dir).resolve()
build_dir = project_dir / 'build'
# 1. Build Firmware (Sync blocking)
print(f"{Colors.YELLOW}[1/3] Building Firmware...{Colors.RESET}")
proc = await asyncio.create_subprocess_exec(
'idf.py', 'build', cwd=project_dir,
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}")
return
if not (build_dir / 'flash_args').exists():
print(f"{Colors.RED}Error: build/flash_args missing.{Colors.RESET}")
return
print(f"{Colors.GREEN}Build Complete.{Colors.RESET}")
# 2. Detect Devices
print(f"{Colors.YELLOW}[2/3] Scanning Devices...{Colors.RESET}")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
return
# Sort naturally
def natural_keys(d):
return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]
devices.sort(key=natural_keys)
# 3. Deploy
print(f"{Colors.YELLOW}[3/3] Deploying to {len(devices)} devices...{Colors.RESET}")
print(f"Max Concurrent Flashes: {MAX_CONCURRENT_FLASH}")
try:
start_ip_obj = ipaddress.IPv4Address(args.start_ip)
except:
print("Invalid Start IP")
return
flash_sem = asyncio.Semaphore(MAX_CONCURRENT_FLASH)
tasks = []
for i, dev in enumerate(devices):
target_ip = str(start_ip_obj + i)
worker = DeployWorker(dev.device, target_ip, args, build_dir, flash_sem)
tasks.append(worker.run())
results = await asyncio.gather(*tasks)
# 4. Summary
success = results.count(True)
print(f"\n{Colors.BLUE}{'='*40}{Colors.RESET}")
print(f"Total: {len(devices)}")
print(f"Success: {Colors.GREEN}{success}{Colors.RESET}")
print(f"Failed: {Colors.RED}{len(devices) - success}{Colors.RESET}")
print(f"{Colors.BLUE}{'='*40}{Colors.RESET}")
if __name__ == '__main__':
try:
# Windows loop fix if needed
if os.name == 'nt':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
asyncio.run(main_async())
except KeyboardInterrupt:
print("\nCancelled.")