#!/usr/bin/env python3 """ ESP32 Unified Deployment Tool Combines firmware flashing and device configuration with full control. """ import asyncio import serial_asyncio import sys import os import argparse import ipaddress import re import time import logging from pathlib import Path # Ensure detection script is available sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: import detect_esp32 except ImportError: print("Error: 'detect_esp32.py' not found.") sys.exit(1) # --- Configuration --- DEFAULT_MAX_CONCURRENT_FLASH = 4 class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' RESET = '\033[0m' class DeviceLoggerAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') logger = logging.getLogger("Deploy") class UnifiedDeployWorker: def __init__(self, port, target_ip, args, build_dir, flash_sem): self.port = port self.target_ip = target_ip self.args = args self.build_dir = build_dir self.flash_sem = flash_sem self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.regex_ready = re.compile(r'Initialization complete|GPS synced|GPS initialization aborted|No Config Found', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_monitor_success = re.compile(r'Monitor mode active', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE) self.regex_status_connected = re.compile(r'WiFi connected: Yes', re.IGNORECASE) self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE) async def run(self): try: if not self.args.config_only: async with self.flash_sem: if self.args.flash_erase: if not await self._erase_flash(): return False if not await self._flash_firmware(): return False await asyncio.sleep(1.0) if not self.args.flash_only: if self.args.ssid and self.args.password: if not await self._configure_device(): self.log.warning(f"{Colors.YELLOW}Config verify failed. Marking SUCCESS (Flash OK).{Colors.RESET}") else: self.log.warning("No SSID/Password provided, skipping config") if self.args.config_only: return False return True except Exception as e: self.log.error(f"Worker Exception: {e}") return False async def _erase_flash(self): cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if proc.returncode == 0: return True self.log.error(f"Erase failed: {stderr.decode()}") return False async def _flash_firmware(self): cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud), '--before', 'default_reset', '--after', 'hard_reset', 'write_flash', '@flash_args'] proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.build_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) except asyncio.TimeoutError: proc.kill() return False if proc.returncode == 0: return True self.log.error(f"Flash failed: {stderr.decode()}") return False async def _configure_device(self): try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) except Exception as e: return False try: if self.args.config_only: writer.transport.serial.dtr = False writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False await asyncio.sleep(0.1) writer.transport.serial.dtr = True if not await self._wait_for_boot(reader): self.log.warning("Boot prompt missed...") await self._send_config(writer) return await self._verify_configuration(reader) except Exception as e: return False finally: writer.close(); await writer.wait_closed() async def _wait_for_boot(self, reader): timeout = time.time() + 10 while time.time() < timeout: try: line = (await asyncio.wait_for(reader.readline(), timeout=0.5)).decode('utf-8', errors='ignore').strip() if self.regex_ready.search(line): return True except asyncio.TimeoutError: continue return False async def _send_config(self, writer): csi_val = '1' if self.args.csi_enable else '0' role_str = "CLIENT" if self.args.iperf_server: role_str = "SERVER" elif self.args.iperf_client: role_str = "CLIENT" # Enable Logic: 1=Yes, 0=No iperf_enable_val = '0' if self.args.no_iperf else '1' config_str = ( f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\nIP:{self.target_ip}\n" f"MASK:{self.args.netmask}\nGW:{self.args.gateway}\nDHCP:0\nBAND:{self.args.band}\n" f"BW:{self.args.bandwidth}\nPOWERSAVE:{self.args.powersave}\nMODE:{self.args.mode}\n" f"MON_CH:{self.args.monitor_channel}\nCSI:{csi_val}\n" f"IPERF_RATE:{self.args.iperf_rate}\nIPERF_ROLE:{role_str}\n" f"IPERF_PROTO:{self.args.iperf_proto}\nIPERF_DEST_IP:{self.args.iperf_dest_ip}\n" f"IPERF_BURST:{self.args.iperf_burst}\nIPERF_LEN:{self.args.iperf_len}\n" f"IPERF_ENABLED:{iperf_enable_val}\n" f"END\n" ) writer.write(config_str.encode('utf-8')) await writer.drain() async def _verify_configuration(self, reader): timeout = time.time() + 20 while time.time() < timeout: try: line = (await asyncio.wait_for(reader.readline(), timeout=1.0)).decode('utf-8', errors='ignore').strip() if not line: continue if self.regex_csi_saved.search(line) or self.regex_monitor_success.search(line) or self.regex_status_connected.search(line): return True m = self.regex_got_ip.search(line) if m and m.group(1) == self.target_ip: return True except asyncio.TimeoutError: continue return False def parse_args(): parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') # Operation Mode parser.add_argument('--config-only', action='store_true', help='Configure only') parser.add_argument('--flash-only', action='store_true', help='Flash only') parser.add_argument('--flash-erase', action='store_true', help='Erase flash first') # Build/Flash parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir') parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud') parser.add_argument('--devices', type=str, help='Device list /dev/ttyUSB0,/dev/ttyUSB1') parser.add_argument('--max-concurrent', type=int, default=None, help='Max concurrent flash') # Network parser.add_argument('--start-ip', required=True, help='Start IP') parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='SSID') parser.add_argument('-P', '--password', default='ez2remember', help='Password') parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway') parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') # WiFi parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'], help='Band') parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'], help='BW') parser.add_argument('-ps', '--powersave', default='NONE', help='Power save') # Iperf parser.add_argument('--iperf-rate', type=int, default=10, help='Mbps') parser.add_argument('--iperf-burst', type=int, default=1, help='Packets/tick') parser.add_argument('--iperf-len', type=int, default=1470, help='Payload len') parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'], help='Proto') parser.add_argument('--iperf-dest-ip', default='192.168.1.50', help='Dest IP') parser.add_argument('--no-iperf', action='store_true', help='Disable Iperf start') g = parser.add_mutually_exclusive_group() g.add_argument('--iperf-client', action='store_true') g.add_argument('--iperf-server', action='store_true') # Mode parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR']) parser.add_argument('-mc', '--monitor-channel', type=int, default=36) parser.add_argument('--csi', dest='csi_enable', action='store_true') args = parser.parse_args() if args.config_only and args.flash_only: parser.error("Conflicting modes") if not args.config_only and not args.flash_only and (not args.ssid or not args.password): parser.error("SSID/PASS required") return args def extract_device_number(device_path): match = re.search(r'(\d+)$', device_path) return int(match.group(1)) if match else 0 async def run_deployment(args): print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}") project_dir = Path(args.dir).resolve() build_dir = project_dir / 'build' if not args.config_only: print(f"{Colors.YELLOW}Building Firmware...{Colors.RESET}") proc = await asyncio.create_subprocess_exec('idf.py', 'build', cwd=project_dir, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() if proc.returncode != 0: print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}"); return print(f"{Colors.GREEN}Build Complete{Colors.RESET}") # Detect Devices if args.devices: devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')] else: devs = detect_esp32.detect_esp32_devices() if not devs: print("No devices found"); return devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]) print(f"{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}") start_ip = ipaddress.IPv4Address(args.start_ip) # Concurrency max_c = args.max_concurrent if args.max_concurrent else (1 if args.devices and not args.config_only else DEFAULT_MAX_CONCURRENT_FLASH) flash_sem = asyncio.Semaphore(max_c) tasks = [] for i, dev in enumerate(devs): offset = extract_device_number(dev.device) if args.devices else i target_ip = str(start_ip + offset) tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, build_dir, flash_sem).run()) results = await asyncio.gather(*tasks) success = results.count(True) print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}") def main(): if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) try: asyncio.run(run_deployment(parse_args())) except KeyboardInterrupt: sys.exit(1) if __name__ == '__main__': main()