From 6b8498ad459ff1868b2fa53a52a5afc0447e4089 Mon Sep 17 00:00:00 2001 From: Bob Date: Thu, 11 Dec 2025 10:52:18 -0800 Subject: [PATCH] flashing work --- esp32_deploy.py | 203 ++++++++++++++++++++++++++---------------------- 1 file changed, 110 insertions(+), 93 deletions(-) diff --git a/esp32_deploy.py b/esp32_deploy.py index d961691..ccc414f 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 """ -ESP32 Unified Deployment Tool +ESP32 Unified Deployment Tool (esp32_deploy) Combines firmware flashing and device configuration with full control. Updates: + - '--target auto' support for mixed-device flashing - 'target all' support (Build 12 configurations) - - Unique binary naming for Main, Bootloader, and Partition Table + - Unique binary naming and 'firmware/' persistence - Safer flashing for mixed environments - - Progress counter [1/12] for batch builds """ import asyncio @@ -48,10 +48,6 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefm logger = logging.getLogger("Deploy") def get_project_binary_name(build_dir): - """ - Heuristic to find the main project binary in the build folder. - Excludes standard ESP-IDF binaries. - """ ignored = {'bootloader.bin', 'partition-table.bin', 'ota_data_initial.bin'} found = [] try: @@ -68,14 +64,17 @@ def generate_config_suffix(target, csi, ampdu): return f"{target}_{csi_str}_{ampdu_str}" class UnifiedDeployWorker: - def __init__(self, port, target_ip, args, build_dir, flash_sem): + def __init__(self, port, target_ip, args, project_dir, flash_sem): self.port = port self.target_ip = target_ip self.args = args - self.build_dir = build_dir + self.project_dir = Path(project_dir) self.flash_sem = flash_sem self.log = DeviceLoggerAdapter(logger, {'connid': port}) + # Regex for chip detection (e.g. "Detecting chip type... ESP32-S3") + self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') + self.regex_ready = re.compile(r'Initialization complete|GPS synced|GPS initialization aborted|No Config Found', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE) @@ -102,6 +101,23 @@ class UnifiedDeployWorker: self.log.error(f"Worker Exception: {e}") return False + async def _identify_chip(self): + """ + Runs esptool to auto-detect the connected chip type. + Returns: normalized target string (e.g., 'esp32', 'esp32s3', 'esp32c5') or None. + """ + cmd = ['esptool.py', '-p', self.port, 'chip_id'] + proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + stdout, stderr = await proc.communicate() + + output = stdout.decode() + stderr.decode() + match = self.regex_chip_type.search(output) + if match: + # Normalize names: ESP32-S3 -> esp32s3, ESP32 -> esp32 + raw_type = match.group(1).lower().replace('-', '') + return raw_type + return None + async def _erase_flash(self): cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) @@ -110,27 +126,43 @@ class UnifiedDeployWorker: self.log.error(f"Erase failed: {stderr.decode()}") return False - async def _flash_firmware(self): - """ - Parses flash_args to inject UNIQUE binary filenames for App, Bootloader, and Partitions. - """ - suffix = generate_config_suffix(self.args.target, self.args.csi_enable, self.args.ampdu) +async def _flash_firmware(self): + # 1. Determine Target (Auto-Detect vs Argument) + detected_target = None + if self.args.target == 'auto': + detected_target = await self._identify_chip() + if not detected_target: + self.log.error("Failed to auto-detect chip type.") + return False + self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}") + target_to_use = detected_target + else: + target_to_use = self.args.target - # 1. Identify Main Binary - project_bin = get_project_binary_name(self.build_dir) - if not project_bin: - self.log.error("Could not determine project binary name") + # 2. Locate Artifacts in 'firmware/' + suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu) + firmware_dir = self.project_dir / "firmware" + + # Find unique binary for this specific target config + unique_app = None + if firmware_dir.exists(): + for f in os.listdir(firmware_dir): + if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition"): + unique_app = f + break + + if not unique_app: + self.log.error(f"Binary for config '{suffix}' not found in firmware/. Run --target all first?") return False - # Define expected unique names (Files should exist in build_dir root) - unique_app = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin" unique_boot = f"bootloader_{suffix}.bin" unique_part = f"partition-table_{suffix}.bin" + unique_args_file = f"flash_args_{suffix}" - # 2. Read flash_args - flash_args_path = self.build_dir / "flash_args" + # 3. Read flash_args + flash_args_path = firmware_dir / unique_args_file if not flash_args_path.exists(): - self.log.error("flash_args not found") + self.log.error(f"flash_args for {suffix} not found") return False try: @@ -140,26 +172,26 @@ class UnifiedDeployWorker: raw_args = [x for x in content.split(' ') if x] final_args = [] - # 3. Swap standard paths for unique paths + # 4. Construct Flash Command (Swap paths) for arg in raw_args: if arg.endswith('bootloader.bin'): - # Check if unique exists, else fallback - final_args.append(unique_boot if (self.build_dir / unique_boot).exists() else arg) + final_args.append(str(firmware_dir / unique_boot)) elif arg.endswith('partition-table.bin'): - final_args.append(unique_part if (self.build_dir / unique_part).exists() else arg) - elif arg.endswith(project_bin): # Main binary match - final_args.append(unique_app if (self.build_dir / unique_app).exists() else arg) + final_args.append(str(firmware_dir / unique_part)) + elif arg.endswith('.bin') and 'partition' not in arg and 'bootloader' not in arg: + final_args.append(str(firmware_dir / unique_app)) else: final_args.append(arg) - # 4. Flash cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud), '--before', 'default_reset', '--after', 'hard_reset', 'write_flash'] + final_args - self.log.info(f"Flashing {unique_app}...") + # CHANGED: Log the full absolute path + full_path = firmware_dir / unique_app + self.log.info(f"Flashing {full_path}...") - proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.build_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) except asyncio.TimeoutError: @@ -267,67 +299,49 @@ class UnifiedDeployWorker: def parse_args(): parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') - # --- Interactive Mode --- parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options') - # --- Build Options --- - parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all'], help="Target Chip (use 'all' to build all variants)") + # Updated choices to include 'auto' + parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all', 'auto'], + help="Target Chip (Use 'all' to build library, 'auto' to flash by detection)") + parser.add_argument('--ampdu', action='store_true', help='Enable AMPDU in build') parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU in build') parser.set_defaults(ampdu=True) - # Operation Mode - parser.add_argument('--config-only', action='store_true', help='Configure only') - parser.add_argument('--flash-only', action='store_true', help='Flash only') - parser.add_argument('--flash-erase', action='store_true', help='Erase flash first') - - # Build/Flash - parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir') - parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud') - parser.add_argument('--devices', type=str, help='Device list /dev/ttyUSB0,/dev/ttyUSB1') - parser.add_argument('--max-concurrent', type=int, default=None, help='Max concurrent flash') - - # Network - # CHANGE: Removed required=True + parser.add_argument('--config-only', action='store_true') + parser.add_argument('--flash-only', action='store_true') + parser.add_argument('--flash-erase', action='store_true') + parser.add_argument('-d', '--dir', default=os.getcwd()) + parser.add_argument('-b', '--baud', type=int, default=460800) + parser.add_argument('--devices', type=str) + parser.add_argument('--max-concurrent', type=int, default=None) parser.add_argument('--start-ip', help='Start IP (Required unless --target all)') - - parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='SSID') - parser.add_argument('-P', '--password', default='ez2remember', help='Password') - parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway') - parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') - - # WiFi - parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'], help='Band') - parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'], help='BW') - parser.add_argument('-ps', '--powersave', default='NONE', help='Power save') - - # Iperf - parser.add_argument('--iperf-period', type=float, default=0.01, help='Seconds between bursts') - parser.add_argument('--iperf-burst', type=int, default=1, help='Packets/tick') - parser.add_argument('--iperf-len', type=int, default=1470, help='Payload len') - parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'], help='Proto') - parser.add_argument('--iperf-dest-ip', default='192.168.1.50', help='Dest IP') - parser.add_argument('--iperf-port', type=int, default=5001, help='Dest Port') - parser.add_argument('--no-iperf', action='store_true', help='Disable Iperf start') - - g = parser.add_mutually_exclusive_group() - g.add_argument('--iperf-client', action='store_true') - g.add_argument('--iperf-server', action='store_true') - - # Mode - parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR']) + parser.add_argument('-s', '--ssid', default='ClubHouse2G') + parser.add_argument('-P', '--password', default='ez2remember') + parser.add_argument('-g', '--gateway', default='192.168.1.1') + parser.add_argument('-m', '--netmask', default='255.255.255.0') + parser.add_argument('--band', default='2.4G') + parser.add_argument('-B', '--bandwidth', default='HT20') + parser.add_argument('-ps', '--powersave', default='NONE') + parser.add_argument('--iperf-period', type=float, default=0.01) + parser.add_argument('--iperf-burst', type=int, default=1) + parser.add_argument('--iperf-len', type=int, default=1470) + parser.add_argument('--iperf-proto', default='UDP') + parser.add_argument('--iperf-dest-ip', default='192.168.1.50') + parser.add_argument('--iperf-port', type=int, default=5001) + parser.add_argument('--no-iperf', action='store_true') + parser.add_argument('--iperf-client', action='store_true') + parser.add_argument('--iperf-server', action='store_true') + parser.add_argument('-M', '--mode', default='STA') parser.add_argument('-mc', '--monitor-channel', type=int, default=36) - parser.add_argument('--csi', dest='csi_enable', action='store_true', help="Enable CSI (Runtime & Build if Interactive)") + parser.add_argument('--csi', dest='csi_enable', action='store_true') args = parser.parse_args() - # --- VALIDATION LOGIC --- - - # 1. Enforce Start IP for normal operations (flashing/configuring) if args.target != 'all' and not args.start_ip: parser.error("the following arguments are required: --start-ip") - # 2. Existing checks if args.config_only and args.flash_only: parser.error("Conflicting modes") if not args.config_only and not args.flash_only and args.target != 'all': if not args.ssid or not args.password: @@ -364,9 +378,6 @@ def get_sdkconfig_defaults(target, csi_enabled, ampdu_enabled): return ";".join(defaults) async def build_task(project_dir, target, csi, ampdu, current_step=None, total_steps=None): - """ - Builds firmware with a full clean to prevent target conflicts. - """ defaults_str = get_sdkconfig_defaults(target, csi, ampdu) desc = f"Target={target}, CSI={'ON' if csi else 'OFF'}, AMPDU={'ON' if ampdu else 'OFF'}" @@ -377,7 +388,9 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s print(f" {prefix}Building [{desc}] ... ", end='', flush=True) try: - # 1. FULL CLEAN (Critical for switching targets) + output_dir = project_dir / "firmware" + output_dir.mkdir(exist_ok=True) + sdkconfig_path = project_dir / "sdkconfig" build_path = project_dir / "build" @@ -387,7 +400,6 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s if build_path.exists(): shutil.rmtree(build_path) - # 2. Set Target proc = await asyncio.create_subprocess_exec( 'idf.py', 'set-target', target, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE @@ -398,7 +410,6 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}") return False, f"Set Target Failed", 0 - # 3. Build start_time = time.time() build_cmd = ['idf.py', '-D', f'SDKCONFIG_DEFAULTS={defaults_str}', 'build'] proc = await asyncio.create_subprocess_exec( @@ -412,7 +423,6 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}") return False, f"Build Failed", duration - # 4. Create Unique Artifacts build_dir = project_dir / 'build' suffix = generate_config_suffix(target, csi, ampdu) unique_app_name = "Unknown" @@ -420,17 +430,22 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s project_bin = get_project_binary_name(build_dir) if project_bin: unique_app_name = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin" - shutil.copy2(build_dir / project_bin, build_dir / unique_app_name) + shutil.copy2(build_dir / project_bin, output_dir / unique_app_name) boot_src = build_dir / "bootloader" / "bootloader.bin" if boot_src.exists(): - shutil.copy2(boot_src, build_dir / f"bootloader_{suffix}.bin") + shutil.copy2(boot_src, output_dir / f"bootloader_{suffix}.bin") part_src = build_dir / "partition_table" / "partition-table.bin" if part_src.exists(): - shutil.copy2(part_src, build_dir / f"partition-table_{suffix}.bin") + shutil.copy2(part_src, output_dir / f"partition-table_{suffix}.bin") - print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {unique_app_name}{Colors.RESET}") + flash_src = build_dir / "flash_args" + if flash_src.exists(): + shutil.copy2(flash_src, output_dir / f"flash_args_{suffix}") + + full_path = output_dir / unique_app_name + print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {full_path}{Colors.RESET}") return True, "Success", duration except Exception as e: @@ -440,7 +455,6 @@ async def build_task(project_dir, target, csi, ampdu, current_step=None, total_s async def run_deployment(args): print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}") project_dir = Path(args.dir).resolve() - build_dir = project_dir / 'build' # --- Target 'ALL' Mode --- if args.target == 'all': @@ -466,7 +480,8 @@ async def run_deployment(args): return # --- Single Build Configuration --- - if not args.config_only: + # Skip build if we are in AUTO mode (we assume binaries exist in firmware/) + if not args.config_only and args.target != 'auto': target = args.target if args.target else 'esp32s3' csi = args.csi_enable ampdu = args.ampdu @@ -484,6 +499,8 @@ async def run_deployment(args): if not success: print(f"{Colors.RED}{msg}{Colors.RESET}") return + elif args.target == 'auto' and not args.config_only: + print(f"{Colors.YELLOW}Target 'auto' selected. Skipping build step (assuming artifacts in firmware/).{Colors.RESET}") # --- Device Detection & Flash --- if args.devices: @@ -502,7 +519,7 @@ async def run_deployment(args): for i, dev in enumerate(devs): offset = extract_device_number(dev.device) if args.devices else i target_ip = str(start_ip + offset) - tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, build_dir, flash_sem).run()) + tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, project_dir, flash_sem).run()) results = await asyncio.gather(*tasks) success = results.count(True)