From ddc0ab185f9b73b4c78e8d2da3126ca574c95f7d Mon Sep 17 00:00:00 2001 From: Bob Date: Thu, 11 Dec 2025 11:07:45 -0800 Subject: [PATCH] more on flashing using auto type --- esp32_deploy.py | 100 ++++++++++++++++-------------------------------- 1 file changed, 33 insertions(+), 67 deletions(-) diff --git a/esp32_deploy.py b/esp32_deploy.py index ccc414f..12d3e66 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -3,10 +3,10 @@ ESP32 Unified Deployment Tool (esp32_deploy) Combines firmware flashing and device configuration with full control. Updates: + - FIXED: Overlap error caused by swapping ota_data_initial.bin with main app - '--target auto' support for mixed-device flashing - 'target all' support (Build 12 configurations) - Unique binary naming and 'firmware/' persistence - - Safer flashing for mixed environments """ import asyncio @@ -72,9 +72,7 @@ class UnifiedDeployWorker: self.flash_sem = flash_sem self.log = DeviceLoggerAdapter(logger, {'connid': port}) - # Regex for chip detection (e.g. "Detecting chip type... ESP32-S3") self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') - self.regex_ready = re.compile(r'Initialization complete|GPS synced|GPS initialization aborted|No Config Found', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE) @@ -102,20 +100,13 @@ class UnifiedDeployWorker: return False async def _identify_chip(self): - """ - Runs esptool to auto-detect the connected chip type. - Returns: normalized target string (e.g., 'esp32', 'esp32s3', 'esp32c5') or None. - """ cmd = ['esptool.py', '-p', self.port, 'chip_id'] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() - output = stdout.decode() + stderr.decode() match = self.regex_chip_type.search(output) if match: - # Normalize names: ESP32-S3 -> esp32s3, ESP32 -> esp32 - raw_type = match.group(1).lower().replace('-', '') - return raw_type + return match.group(1).lower().replace('-', '') return None async def _erase_flash(self): @@ -126,8 +117,8 @@ class UnifiedDeployWorker: self.log.error(f"Erase failed: {stderr.decode()}") return False -async def _flash_firmware(self): - # 1. Determine Target (Auto-Detect vs Argument) + async def _flash_firmware(self): + # 1. Determine Target detected_target = None if self.args.target == 'auto': detected_target = await self._identify_chip() @@ -139,11 +130,11 @@ async def _flash_firmware(self): else: target_to_use = self.args.target - # 2. Locate Artifacts in 'firmware/' + # 2. Locate Artifacts suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu) firmware_dir = self.project_dir / "firmware" - # Find unique binary for this specific target config + # Find unique binary unique_app = None if firmware_dir.exists(): for f in os.listdir(firmware_dir): @@ -152,11 +143,12 @@ async def _flash_firmware(self): break if not unique_app: - self.log.error(f"Binary for config '{suffix}' not found in firmware/. Run --target all first?") + self.log.error(f"Binary for config '{suffix}' not found in firmware/.") return False unique_boot = f"bootloader_{suffix}.bin" unique_part = f"partition-table_{suffix}.bin" + unique_ota = f"ota_data_initial_{suffix}.bin" unique_args_file = f"flash_args_{suffix}" # 3. Read flash_args @@ -172,13 +164,21 @@ async def _flash_firmware(self): raw_args = [x for x in content.split(' ') if x] final_args = [] - # 4. Construct Flash Command (Swap paths) + # 4. Construct Flash Command (Swap paths safely) for arg in raw_args: if arg.endswith('bootloader.bin'): final_args.append(str(firmware_dir / unique_boot)) elif arg.endswith('partition-table.bin'): final_args.append(str(firmware_dir / unique_part)) - elif arg.endswith('.bin') and 'partition' not in arg and 'bootloader' not in arg: + elif arg.endswith('ota_data_initial.bin'): + # Fix: Handle OTA data specifically so it doesn't get swapped with app + if (firmware_dir / unique_ota).exists(): + final_args.append(str(firmware_dir / unique_ota)) + else: + # Fallback to standard if unique doesn't exist (though it should) + final_args.append(arg) + elif arg.endswith('.bin'): + # This catch-all must exclude partition/bootloader/ota final_args.append(str(firmware_dir / unique_app)) else: final_args.append(arg) @@ -187,7 +187,6 @@ async def _flash_firmware(self): '--before', 'default_reset', '--after', 'hard_reset', 'write_flash'] + final_args - # CHANGED: Log the full absolute path full_path = firmware_dir / unique_app self.log.info(f"Flashing {full_path}...") @@ -298,17 +297,11 @@ async def _flash_firmware(self): def parse_args(): parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') - parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options') - - # Updated choices to include 'auto' - parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all', 'auto'], - help="Target Chip (Use 'all' to build library, 'auto' to flash by detection)") - + parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all', 'auto'], help="Target Chip") parser.add_argument('--ampdu', action='store_true', help='Enable AMPDU in build') - parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU in build') + parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU') parser.set_defaults(ampdu=True) - parser.add_argument('--config-only', action='store_true') parser.add_argument('--flash-only', action='store_true') parser.add_argument('--flash-erase', action='store_true') @@ -338,15 +331,12 @@ def parse_args(): parser.add_argument('--csi', dest='csi_enable', action='store_true') args = parser.parse_args() - if args.target != 'all' and not args.start_ip: parser.error("the following arguments are required: --start-ip") - if args.config_only and args.flash_only: parser.error("Conflicting modes") if not args.config_only and not args.flash_only and args.target != 'all': if not args.ssid or not args.password: parser.error("SSID/PASS required") - return args def extract_device_number(device_path): @@ -380,11 +370,7 @@ def get_sdkconfig_defaults(target, csi_enabled, ampdu_enabled): async def build_task(project_dir, target, csi, ampdu, current_step=None, total_steps=None): defaults_str = get_sdkconfig_defaults(target, csi, ampdu) desc = f"Target={target}, CSI={'ON' if csi else 'OFF'}, AMPDU={'ON' if ampdu else 'OFF'}" - - prefix = "" - if current_step is not None and total_steps is not None: - prefix = f"[{current_step}/{total_steps}] " - + prefix = f"[{current_step}/{total_steps}] " if current_step else "" print(f" {prefix}Building [{desc}] ... ", end='', flush=True) try: @@ -393,34 +379,22 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s sdkconfig_path = project_dir / "sdkconfig" build_path = project_dir / "build" + if sdkconfig_path.exists(): os.remove(sdkconfig_path) + if build_path.exists(): shutil.rmtree(build_path) - if sdkconfig_path.exists(): - os.remove(sdkconfig_path) - - if build_path.exists(): - shutil.rmtree(build_path) - - proc = await asyncio.create_subprocess_exec( - 'idf.py', 'set-target', target, - cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) + proc = await asyncio.create_subprocess_exec('idf.py', 'set-target', target, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() if proc.returncode != 0: print(f"{Colors.RED}FAIL (Set Target){Colors.RESET}") - print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}") return False, f"Set Target Failed", 0 start_time = time.time() build_cmd = ['idf.py', '-D', f'SDKCONFIG_DEFAULTS={defaults_str}', 'build'] - proc = await asyncio.create_subprocess_exec( - *build_cmd, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) + proc = await asyncio.create_subprocess_exec(*build_cmd, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) _, stderr = await proc.communicate() duration = time.time() - start_time - if proc.returncode != 0: print(f"{Colors.RED}FAIL{Colors.RESET}") - print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}") return False, f"Build Failed", duration build_dir = project_dir / 'build' @@ -433,16 +407,17 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s shutil.copy2(build_dir / project_bin, output_dir / unique_app_name) boot_src = build_dir / "bootloader" / "bootloader.bin" - if boot_src.exists(): - shutil.copy2(boot_src, output_dir / f"bootloader_{suffix}.bin") + if boot_src.exists(): shutil.copy2(boot_src, output_dir / f"bootloader_{suffix}.bin") part_src = build_dir / "partition_table" / "partition-table.bin" - if part_src.exists(): - shutil.copy2(part_src, output_dir / f"partition-table_{suffix}.bin") + if part_src.exists(): shutil.copy2(part_src, output_dir / f"partition-table_{suffix}.bin") + + # Fix: Save OTA data binary if it exists + ota_src = build_dir / "ota_data_initial.bin" + if ota_src.exists(): shutil.copy2(ota_src, output_dir / f"ota_data_initial_{suffix}.bin") flash_src = build_dir / "flash_args" - if flash_src.exists(): - shutil.copy2(flash_src, output_dir / f"flash_args_{suffix}") + if flash_src.exists(): shutil.copy2(flash_src, output_dir / f"flash_args_{suffix}") full_path = output_dir / unique_app_name print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {full_path}{Colors.RESET}") @@ -456,36 +431,29 @@ async def run_deployment(args): print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}") project_dir = Path(args.dir).resolve() - # --- Target 'ALL' Mode --- if args.target == 'all': print(f"{Colors.YELLOW}Starting Batch Build Verification (12 Combinations){Colors.RESET}\n") targets = ['esp32', 'esp32s3', 'esp32c5'] booleans = [False, True] results = [] - - total_steps = len(targets) * len(booleans) * len(booleans) + total_steps = len(targets) * 4 current_step = 0 - for target in targets: for csi in booleans: for ampdu in booleans: current_step += 1 success, msg, dur = await build_task(project_dir, target, csi, ampdu, current_step, total_steps) results.append({"cfg": f"{target.ljust(9)} CSI:{'ON ' if csi else 'OFF'} AMPDU:{'ON ' if ampdu else 'OFF'}", "ok": success, "dur": dur}) - print(f"\n{Colors.BLUE}Batch Summary:{Colors.RESET}") for r in results: status = f"{Colors.GREEN}PASS{Colors.RESET}" if r['ok'] else f"{Colors.RED}FAIL{Colors.RESET}" print(f" {r['cfg']} : {status} ({r['dur']:.1f}s)") return - # --- Single Build Configuration --- - # Skip build if we are in AUTO mode (we assume binaries exist in firmware/) if not args.config_only and args.target != 'auto': target = args.target if args.target else 'esp32s3' csi = args.csi_enable ampdu = args.ampdu - if args.interactive: print(f"\n{Colors.YELLOW}--- Build Configuration ---{Colors.RESET}") target = ask_user("Target Chip", default=target, choices=['esp32', 'esp32s3', 'esp32c5']) @@ -494,7 +462,6 @@ async def run_deployment(args): args.csi_enable = csi args.target = target args.ampdu = ampdu - success, msg, _ = await build_task(project_dir, target, csi, ampdu, 1, 1) if not success: print(f"{Colors.RED}{msg}{Colors.RESET}") @@ -502,7 +469,6 @@ async def run_deployment(args): elif args.target == 'auto' and not args.config_only: print(f"{Colors.YELLOW}Target 'auto' selected. Skipping build step (assuming artifacts in firmware/).{Colors.RESET}") - # --- Device Detection & Flash --- if args.devices: devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')] else: