#!/usr/bin/env python3 """ ESP32 Unified Deployment Tool (esp32_deploy) Combines firmware flashing and device configuration with full control. Updates: - FIXED: Reset logic (DTR=False) to ensure App boot instead of Bootloader - AUTO-DETECT: Prioritizes /dev/esp_port_* (udev rules) - ROBUSTNESS: Merged "poking" and "retry" logic - 'target all' support """ import asyncio import serial_asyncio import sys import os import argparse import ipaddress import re import time import shutil import logging import glob from pathlib import Path # Ensure detection script is available sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: import detect_esp32 except ImportError: print("Error: 'detect_esp32.py' not found.") sys.exit(1) # --- Configuration --- DEFAULT_MAX_CONCURRENT_FLASH = 4 class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' RESET = '\033[0m' class DeviceLoggerAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') logger = logging.getLogger("Deploy") def get_project_binary_name(build_dir): ignored = {'bootloader.bin', 'partition-table.bin', 'ota_data_initial.bin'} found = [] try: for f in os.listdir(build_dir): if f.endswith('.bin') and f not in ignored and 'partition' not in f: found.append(f) except FileNotFoundError: return None return found[0] if found else None def generate_config_suffix(target, csi, ampdu): csi_str = "csi_on" if csi else "csi_off" ampdu_str = "ampdu_on" if ampdu else "ampdu_off" return f"{target}_{csi_str}_{ampdu_str}" def auto_detect_devices(): """Prioritizes static udev paths (/dev/esp_port_XX) if they exist.""" try: ports = glob.glob('/dev/esp_port_*') if ports: # Sort by suffix number ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0) print(f"{Colors.CYAN}Auto-detected {len(ports)} devices using static udev rules.{Colors.RESET}") return [type('obj', (object,), {'device': p}) for p in ports] except Exception: pass return detect_esp32.detect_esp32_devices() class UnifiedDeployWorker: def __init__(self, port, target_ip, args, project_dir, flash_sem): self.port = port self.target_ip = target_ip self.args = args self.project_dir = Path(project_dir) self.flash_sem = flash_sem self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') # Matches the log from your updated main.c self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved|Config saved', re.IGNORECASE) async def run(self): try: if not self.args.config_only: async with self.flash_sem: if self.args.flash_erase: if not await self._erase_flash(): return False if not await self._flash_firmware(): return False # Give it a moment to stabilize after flash reset await asyncio.sleep(2.0) if not self.args.flash_only: if self.args.ssid and self.args.password: # Retry logic success = False for attempt in range(1, 4): self.log.info(f"Configuring (Attempt {attempt}/3)...") if await self._configure_device(): success = True break self.log.warning(f"Config failed on attempt {attempt}. Retrying...") await asyncio.sleep(2.0) if not success: self.log.error(f"{Colors.RED}Config verify failed after 3 attempts.{Colors.RESET}") return False else: self.log.warning("No SSID/Password provided, skipping config") if self.args.config_only: return False return True except Exception as e: self.log.error(f"Worker Exception: {e}") return False async def _identify_chip(self): cmd = ['esptool.py', '-p', self.port, 'chip_id'] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() output = stdout.decode() + stderr.decode() match = self.regex_chip_type.search(output) if match: return match.group(1).lower().replace('-', '') return None async def _erase_flash(self): cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() if proc.returncode == 0: return True self.log.error(f"Erase failed: {stderr.decode()}") return False async def _flash_firmware(self): detected_target = None if self.args.target == 'auto': detected_target = await self._identify_chip() if not detected_target: self.log.error("Failed to auto-detect chip type.") return False self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}") target_to_use = detected_target else: target_to_use = self.args.target suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu) firmware_dir = self.project_dir / "firmware" unique_app = None if firmware_dir.exists(): for f in os.listdir(firmware_dir): if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition") and not f.startswith("ota_data") and not f.startswith("phy_init"): unique_app = f break if not unique_app: self.log.error(f"Binary for config '{suffix}' not found in firmware/.") return False unique_boot = f"bootloader_{suffix}.bin" unique_part = f"partition-table_{suffix}.bin" unique_ota = f"ota_data_initial_{suffix}.bin" unique_args_file = f"flash_args_{suffix}" flash_args_path = firmware_dir / unique_args_file if not flash_args_path.exists(): self.log.error(f"flash_args for {suffix} not found") return False try: with open(flash_args_path, 'r') as f: content = f.read().replace('\n', ' ').strip() raw_args = [x for x in content.split(' ') if x] final_args = [] for arg in raw_args: if arg.endswith('bootloader.bin'): final_args.append(str(firmware_dir / unique_boot)) elif arg.endswith('partition-table.bin'): final_args.append(str(firmware_dir / unique_part)) elif arg.endswith('ota_data_initial.bin'): if (firmware_dir / unique_ota).exists(): final_args.append(str(firmware_dir / unique_ota)) else: continue elif arg.endswith('phy_init_data.bin'): final_args.append(arg) elif arg.endswith('.bin'): final_args.append(str(firmware_dir / unique_app)) else: final_args.append(arg) cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud), '--before', 'default_reset', '--after', 'hard_reset', 'write_flash'] + final_args full_path = firmware_dir / unique_app self.log.info(f"Flashing {full_path}...") proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) except asyncio.TimeoutError: proc.kill() return False if proc.returncode == 0: return True self.log.error(f"Flash failed: {stderr.decode()}") return False except Exception as e: self.log.error(f"Flash Prep Error: {e}") return False async def _configure_device(self): try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) except Exception as e: return False try: # 1. Reset writer.transport.serial.dtr = False writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False # FIX: DTR Must be False to allow Booting (True=Low=Bootloader Mode) writer.transport.serial.dtr = False # 2. Robust Wait (with Poke) if not await self._wait_for_boot(reader, writer): self.log.warning("Boot prompt missed (sending blindly)...") # 3. Send await self._send_config(writer) # 4. Verify is_configured = await self._verify_configuration(reader) if is_configured: self.log.info(f"{Colors.GREEN}Config verified.{Colors.RESET}") # Final Reset to apply writer.transport.serial.dtr = False writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False return True else: self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}") return False except Exception as e: self.log.error(f"Config Error: {e}") return False finally: writer.close() await writer.wait_closed() async def _wait_for_boot(self, reader, writer): # Timeout covers GPS delay (~3.5s) + boot overhead end_time = time.time() + 12 last_poke = time.time() while time.time() < end_time: try: # Poke every 1.5 seconds if we haven't seen the prompt if time.time() - last_poke > 1.5: writer.write(b'\n') await writer.drain() last_poke = time.time() try: # Short timeout to allow polling loop line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1) line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue if self.regex_ready.search(line): return True except asyncio.TimeoutError: continue except Exception as e: self.log.error(f"Read error: {e}") return False return False async def _send_config(self, writer): # Wait a moment for any last boot logs to clear await asyncio.sleep(0.5) # Wake up console writer.write(b'\n') await writer.drain() await asyncio.sleep(0.2) csi_val = '1' if self.args.csi_enable else '0' role_str = "SERVER" if self.args.iperf_server else "CLIENT" iperf_enable_val = '0' if self.args.no_iperf else '1' period_us = int(self.args.iperf_period * 1000000) config_lines = [ "CFG", f"SSID:{self.args.ssid}", f"PASS:{self.args.password}", f"IP:{self.target_ip}", f"MASK:{self.args.netmask}", f"GW:{self.args.gateway}", f"DHCP:0", f"BAND:{self.args.band}", f"BW:{self.args.bandwidth}", f"POWERSAVE:{self.args.powersave}", f"MODE:{self.args.mode}", f"MON_CH:{self.args.monitor_channel}", f"CSI:{csi_val}", f"IPERF_PERIOD_US:{period_us}", f"IPERF_ROLE:{role_str}", f"IPERF_PROTO:{self.args.iperf_proto}", f"IPERF_DST_IP:{self.args.iperf_dest_ip}", f"IPERF_PORT:{self.args.iperf_port}", f"IPERF_BURST:{self.args.iperf_burst}", f"IPERF_LEN:{self.args.iperf_len}", f"IPERF_ENABLED:{iperf_enable_val}", "END" ] # CHANGED: Send line-by-line with a delay to prevent UART FIFO overflow for line in config_lines: cmd = line + "\r\n" writer.write(cmd.encode('utf-8')) await writer.drain() # 50ms delay allows the ESP32 (running at 115200 baud) to process the line await asyncio.sleep(0.1) async def _verify_configuration(self, reader): timeout = time.time() + 15 while time.time() < timeout: try: line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0) line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue if self.regex_csi_saved.search(line): return True m = self.regex_got_ip.search(line) if m and m.group(1) == self.target_ip: return True except asyncio.TimeoutError: continue return False def parse_args(): parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options') parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all', 'auto'], help="Target Chip") parser.add_argument('--ampdu', action='store_true', help='Enable AMPDU in build') parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU') parser.set_defaults(ampdu=True) parser.add_argument('--config-only', action='store_true') parser.add_argument('--flash-only', action='store_true') parser.add_argument('--flash-erase', action='store_true') parser.add_argument('-d', '--dir', default=os.getcwd()) parser.add_argument('-b', '--baud', type=int, default=460800) parser.add_argument('--devices', type=str) parser.add_argument('--max-concurrent', type=int, default=None) parser.add_argument('--start-ip', help='Start IP (Required unless --target all)') parser.add_argument('-s', '--ssid', default='ClubHouse2G') parser.add_argument('-P', '--password', default='ez2remember') parser.add_argument('-g', '--gateway', default='192.168.1.1') parser.add_argument('-m', '--netmask', default='255.255.255.0') parser.add_argument('--band', default='2.4G') parser.add_argument('-B', '--bandwidth', default='HT20') parser.add_argument('-ps', '--powersave', default='NONE') parser.add_argument('--iperf-period', type=float, default=0.01) parser.add_argument('--iperf-burst', type=int, default=1) parser.add_argument('--iperf-len', type=int, default=1470) parser.add_argument('--iperf-proto', default='UDP') parser.add_argument('--iperf-dest-ip', default='192.168.1.50') parser.add_argument('--iperf-port', type=int, default=5001) parser.add_argument('--no-iperf', action='store_true') parser.add_argument('--iperf-client', action='store_true') parser.add_argument('--iperf-server', action='store_true') parser.add_argument('-M', '--mode', default='STA') parser.add_argument('-mc', '--monitor-channel', type=int, default=36) parser.add_argument('--csi', dest='csi_enable', action='store_true') args = parser.parse_args() if args.target != 'all' and not args.start_ip: parser.error("the following arguments are required: --start-ip") if args.config_only and args.flash_only: parser.error("Conflicting modes") if not args.config_only and not args.flash_only and args.target != 'all': if not args.ssid or not args.password: parser.error("SSID/PASS required") return args def extract_device_number(device_path): match = re.search(r'(\d+)$', device_path) return int(match.group(1)) if match else 0 def ask_user(prompt, default=None, choices=None): choice_str = f" [{'|'.join(choices)}]" if choices else "" default_str = f" [{default}]" if default else "" while True: val = input(f"{Colors.CYAN}{prompt}{choice_str}{default_str}: {Colors.RESET}").strip() if not val and default: return default if choices: if val in choices: return val print(f"{Colors.RED}Invalid choice.{Colors.RESET}") else: return val def ask_bool(prompt, default=True): choice_str = " [Y/n]" if default else " [y/N]" val = input(f"{Colors.CYAN}{prompt}{choice_str}: {Colors.RESET}").strip().lower() if not val: return default return val.startswith('y') def get_sdkconfig_defaults(target, csi_enabled, ampdu_enabled): defaults = ["sdkconfig.defaults"] suffix = "csi" if csi_enabled else "" defaults.append(f"sdkconfig.defaults.{target}{suffix}") defaults.append("sdkconfig.defaults.ampdu" if ampdu_enabled else "sdkconfig.defaults.noampdu") return ";".join(defaults) async def build_task(project_dir, target, csi, ampdu, current_step=None, total_steps=None): defaults_str = get_sdkconfig_defaults(target, csi, ampdu) desc = f"Target={target}, CSI={'ON' if csi else 'OFF'}, AMPDU={'ON' if ampdu else 'OFF'}" prefix = f"[{current_step}/{total_steps}] " if current_step else "" print(f" {prefix}Building [{desc}] ... ", end='', flush=True) try: output_dir = project_dir / "firmware" output_dir.mkdir(exist_ok=True) sdkconfig_path = project_dir / "sdkconfig" build_path = project_dir / "build" if sdkconfig_path.exists(): os.remove(sdkconfig_path) if build_path.exists(): shutil.rmtree(build_path) proc = await asyncio.create_subprocess_exec('idf.py', 'set-target', target, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() if proc.returncode != 0: print(f"{Colors.RED}FAIL (Set Target){Colors.RESET}") return False, f"Set Target Failed", 0 start_time = time.time() build_cmd = ['idf.py', '-D', f'SDKCONFIG_DEFAULTS={defaults_str}', 'build'] proc = await asyncio.create_subprocess_exec(*build_cmd, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() duration = time.time() - start_time if proc.returncode != 0: print(f"{Colors.RED}FAIL{Colors.RESET}") return False, f"Build Failed", duration build_dir = project_dir / 'build' suffix = generate_config_suffix(target, csi, ampdu) unique_app_name = "Unknown" project_bin = get_project_binary_name(build_dir) if project_bin: unique_app_name = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin" shutil.copy2(build_dir / project_bin, output_dir / unique_app_name) boot_src = build_dir / "bootloader" / "bootloader.bin" if boot_src.exists(): shutil.copy2(boot_src, output_dir / f"bootloader_{suffix}.bin") part_src = build_dir / "partition_table" / "partition-table.bin" if part_src.exists(): shutil.copy2(part_src, output_dir / f"partition-table_{suffix}.bin") # Fix: Save OTA data binary if it exists ota_src = build_dir / "ota_data_initial.bin" if ota_src.exists(): shutil.copy2(ota_src, output_dir / f"ota_data_initial_{suffix}.bin") flash_src = build_dir / "flash_args" if flash_src.exists(): shutil.copy2(flash_src, output_dir / f"flash_args_{suffix}") full_path = output_dir / unique_app_name print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {full_path}{Colors.RESET}") return True, "Success", duration except Exception as e: print(f"{Colors.RED}ERROR: {e}{Colors.RESET}") return False, str(e), 0 async def run_deployment(args): print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}") project_dir = Path(args.dir).resolve() # --- Target 'ALL' Mode --- if args.target == 'all': print(f"{Colors.YELLOW}Starting Batch Build Verification (12 Combinations){Colors.RESET}") # SAFETY: Wipe firmware dir to ensure no stale binaries exist firmware_dir = project_dir / "firmware" if firmware_dir.exists(): try: shutil.rmtree(firmware_dir) print(f"{Colors.YELLOW} [Clean] Removed old firmware/ directory.{Colors.RESET}") except Exception as e: print(f"{Colors.RED} [Error] Could not clean firmware dir: {e}{Colors.RESET}") return # Re-create it fresh firmware_dir.mkdir(exist_ok=True) print("") # Spacer targets = ['esp32', 'esp32s3', 'esp32c5'] booleans = [False, True] results = [] total_steps = len(targets) * len(booleans) * len(booleans) current_step = 0 for target in targets: for csi in booleans: for ampdu in booleans: current_step += 1 success, msg, dur = await build_task(project_dir, target, csi, ampdu, current_step, total_steps) results.append({"cfg": f"{target.ljust(9)} CSI:{'ON ' if csi else 'OFF'} AMPDU:{'ON ' if ampdu else 'OFF'}", "ok": success, "dur": dur}) print(f"\n{Colors.BLUE}Batch Summary:{Colors.RESET}") for r in results: status = f"{Colors.GREEN}PASS{Colors.RESET}" if r['ok'] else f"{Colors.RED}FAIL{Colors.RESET}" print(f" {r['cfg']} : {status} ({r['dur']:.1f}s)") return # --- Single Build Configuration --- # Skip build if we are in AUTO mode (we assume binaries exist in firmware/) if not args.config_only and args.target != 'auto': target = args.target if args.target else 'esp32s3' csi = args.csi_enable ampdu = args.ampdu if args.interactive: print(f"\n{Colors.YELLOW}--- Build Configuration ---{Colors.RESET}") target = ask_user("Target Chip", default=target, choices=['esp32', 'esp32s3', 'esp32c5']) csi = ask_bool(f"Enable CSI Support?", default=csi) ampdu = ask_bool(f"Enable AMPDU Aggregation?", default=ampdu) args.csi_enable = csi args.target = target args.ampdu = ampdu success, msg, _ = await build_task(project_dir, target, csi, ampdu, 1, 1) if not success: print(f"{Colors.RED}{msg}{Colors.RESET}") return elif args.target == 'auto' and not args.config_only: print(f"{Colors.YELLOW}Target 'auto' selected. Skipping build step (assuming artifacts in firmware/).{Colors.RESET}") # --- Device Detection & Flash --- if args.devices: devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')] else: # Use AUTO DETECT first (for static names), then standard fallback devs = auto_detect_devices() if not devs: print("No devices found"); return # Sort naturally (esp_port_01 before esp_port_10) # We rely on the internal sort of auto_detect or detect_esp32, # but a final sort by string length/digits helps with mixing types. devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]) print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}") start_ip = ipaddress.IPv4Address(args.start_ip) max_c = args.max_concurrent if args.max_concurrent else (1 if args.devices and not args.config_only else DEFAULT_MAX_CONCURRENT_FLASH) flash_sem = asyncio.Semaphore(max_c) tasks = [] for i, dev in enumerate(devs): offset = extract_device_number(dev.device) if args.devices else i target_ip = str(start_ip + offset) tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, project_dir, flash_sem).run()) results = await asyncio.gather(*tasks) success = results.count(True) print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}") def main(): if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) try: asyncio.run(run_deployment(parse_args())) except KeyboardInterrupt: sys.exit(1) if __name__ == '__main__': main()