ESP32/esp32_deploy.py

518 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ESP32 Unified Deployment Tool
Combines firmware flashing and device configuration with full control.
Updates:
- 'target all' support (Build 12 configurations)
- Unique binary naming for Main, Bootloader, and Partition Table
- Safer flashing for mixed environments
- Progress counter [1/12] for batch builds
"""
import asyncio
import serial_asyncio
import sys
import os
import argparse
import ipaddress
import re
import time
import shutil
import logging
from pathlib import Path
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Configuration ---
DEFAULT_MAX_CONCURRENT_FLASH = 4
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
class DeviceLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("Deploy")
def get_project_binary_name(build_dir):
"""
Heuristic to find the main project binary in the build folder.
Excludes standard ESP-IDF binaries.
"""
ignored = {'bootloader.bin', 'partition-table.bin', 'ota_data_initial.bin'}
found = []
try:
for f in os.listdir(build_dir):
if f.endswith('.bin') and f not in ignored and 'partition' not in f:
found.append(f)
except FileNotFoundError:
return None
return found[0] if found else None
def generate_config_suffix(target, csi, ampdu):
csi_str = "csi_on" if csi else "csi_off"
ampdu_str = "ampdu_on" if ampdu else "ampdu_off"
return f"{target}_{csi_str}_{ampdu_str}"
class UnifiedDeployWorker:
def __init__(self, port, target_ip, args, build_dir, flash_sem):
self.port = port
self.target_ip = target_ip
self.args = args
self.build_dir = build_dir
self.flash_sem = flash_sem
self.log = DeviceLoggerAdapter(logger, {'connid': port})
self.regex_ready = re.compile(r'Initialization complete|GPS synced|GPS initialization aborted|No Config Found', re.IGNORECASE)
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE)
async def run(self):
try:
if not self.args.config_only:
async with self.flash_sem:
if self.args.flash_erase:
if not await self._erase_flash(): return False
if not await self._flash_firmware(): return False
await asyncio.sleep(1.0)
if not self.args.flash_only:
if self.args.ssid and self.args.password:
if not await self._configure_device():
self.log.warning(f"{Colors.YELLOW}Config verify failed. Marking SUCCESS (Flash OK).{Colors.RESET}")
else:
self.log.warning("No SSID/Password provided, skipping config")
if self.args.config_only: return False
return True
except Exception as e:
self.log.error(f"Worker Exception: {e}")
return False
async def _erase_flash(self):
cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash']
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
_, stderr = await proc.communicate()
if proc.returncode == 0: return True
self.log.error(f"Erase failed: {stderr.decode()}")
return False
async def _flash_firmware(self):
"""
Parses flash_args to inject UNIQUE binary filenames for App, Bootloader, and Partitions.
"""
suffix = generate_config_suffix(self.args.target, self.args.csi_enable, self.args.ampdu)
# 1. Identify Main Binary
project_bin = get_project_binary_name(self.build_dir)
if not project_bin:
self.log.error("Could not determine project binary name")
return False
# Define expected unique names (Files should exist in build_dir root)
unique_app = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin"
unique_boot = f"bootloader_{suffix}.bin"
unique_part = f"partition-table_{suffix}.bin"
# 2. Read flash_args
flash_args_path = self.build_dir / "flash_args"
if not flash_args_path.exists():
self.log.error("flash_args not found")
return False
try:
with open(flash_args_path, 'r') as f:
content = f.read().replace('\n', ' ').strip()
raw_args = [x for x in content.split(' ') if x]
final_args = []
# 3. Swap standard paths for unique paths
for arg in raw_args:
if arg.endswith('bootloader.bin'):
# Check if unique exists, else fallback
final_args.append(unique_boot if (self.build_dir / unique_boot).exists() else arg)
elif arg.endswith('partition-table.bin'):
final_args.append(unique_part if (self.build_dir / unique_part).exists() else arg)
elif arg.endswith(project_bin): # Main binary match
final_args.append(unique_app if (self.build_dir / unique_app).exists() else arg)
else:
final_args.append(arg)
# 4. Flash
cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud),
'--before', 'default_reset', '--after', 'hard_reset',
'write_flash'] + final_args
self.log.info(f"Flashing {unique_app}...")
proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.build_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
except asyncio.TimeoutError:
proc.kill()
return False
if proc.returncode == 0: return True
self.log.error(f"Flash failed: {stderr.decode()}")
return False
except Exception as e:
self.log.error(f"Flash Prep Error: {e}")
return False
async def _configure_device(self):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e: return False
try:
if self.args.config_only:
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
await asyncio.sleep(0.1)
writer.transport.serial.dtr = True
if not await self._wait_for_boot(reader):
self.log.warning("Boot prompt missed...")
await self._send_config(writer)
is_configured = await self._verify_configuration(reader)
if is_configured:
self.log.info(f"{Colors.GREEN}Config verified.{Colors.RESET}")
await self._perform_reset(writer)
return True
else:
self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}")
return False
except Exception as e:
self.log.error(f"Config Error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _perform_reset(self, writer):
try:
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.2)
writer.transport.serial.rts = False
await asyncio.sleep(0.1)
except Exception as e:
self.log.error(f"Failed to reset device: {e}")
async def _wait_for_boot(self, reader):
timeout = time.time() + 10
while time.time() < timeout:
try:
line = (await asyncio.wait_for(reader.readline(), timeout=0.5)).decode('utf-8', errors='ignore').strip()
if self.regex_ready.search(line): return True
except asyncio.TimeoutError: continue
return False
async def _send_config(self, writer):
csi_val = '1' if self.args.csi_enable else '0'
role_str = "SERVER" if self.args.iperf_server else "CLIENT"
iperf_enable_val = '0' if self.args.no_iperf else '1'
period_us = int(self.args.iperf_period * 1000000)
config_str = (
f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\nIP:{self.target_ip}\n"
f"MASK:{self.args.netmask}\nGW:{self.args.gateway}\nDHCP:0\nBAND:{self.args.band}\n"
f"BW:{self.args.bandwidth}\nPOWERSAVE:{self.args.powersave}\nMODE:{self.args.mode}\n"
f"MON_CH:{self.args.monitor_channel}\nCSI:{csi_val}\n"
f"IPERF_PERIOD_US:{period_us}\n"
f"IPERF_ROLE:{role_str}\n"
f"IPERF_PROTO:{self.args.iperf_proto}\n"
f"IPERF_DEST_IP:{self.args.iperf_dest_ip}\n"
f"IPERF_PORT:{self.args.iperf_port}\n"
f"IPERF_BURST:{self.args.iperf_burst}\n"
f"IPERF_LEN:{self.args.iperf_len}\n"
f"IPERF_ENABLED:{iperf_enable_val}\n"
f"END\n"
)
writer.write(config_str.encode('utf-8'))
await writer.drain()
async def _verify_configuration(self, reader):
timeout = time.time() + 20
while time.time() < timeout:
try:
line = (await asyncio.wait_for(reader.readline(), timeout=1.0)).decode('utf-8', errors='ignore').strip()
if not line: continue
if "Config saved" in line or self.regex_csi_saved.search(line): return True
m = self.regex_got_ip.search(line)
if m and m.group(1) == self.target_ip: return True
except asyncio.TimeoutError: continue
return False
def parse_args():
parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool')
# --- Interactive Mode ---
parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options')
# --- Build Options ---
parser.add_argument('--target', choices=['esp32', 'esp32s3', 'esp32c5', 'all'], help="Target Chip (use 'all' to build all variants)")
parser.add_argument('--ampdu', action='store_true', help='Enable AMPDU in build')
parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU in build')
parser.set_defaults(ampdu=True)
# Operation Mode
parser.add_argument('--config-only', action='store_true', help='Configure only')
parser.add_argument('--flash-only', action='store_true', help='Flash only')
parser.add_argument('--flash-erase', action='store_true', help='Erase flash first')
# Build/Flash
parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir')
parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud')
parser.add_argument('--devices', type=str, help='Device list /dev/ttyUSB0,/dev/ttyUSB1')
parser.add_argument('--max-concurrent', type=int, default=None, help='Max concurrent flash')
# Network
# CHANGE: Removed required=True
parser.add_argument('--start-ip', help='Start IP (Required unless --target all)')
parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='SSID')
parser.add_argument('-P', '--password', default='ez2remember', help='Password')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
# WiFi
parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'], help='Band')
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'], help='BW')
parser.add_argument('-ps', '--powersave', default='NONE', help='Power save')
# Iperf
parser.add_argument('--iperf-period', type=float, default=0.01, help='Seconds between bursts')
parser.add_argument('--iperf-burst', type=int, default=1, help='Packets/tick')
parser.add_argument('--iperf-len', type=int, default=1470, help='Payload len')
parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'], help='Proto')
parser.add_argument('--iperf-dest-ip', default='192.168.1.50', help='Dest IP')
parser.add_argument('--iperf-port', type=int, default=5001, help='Dest Port')
parser.add_argument('--no-iperf', action='store_true', help='Disable Iperf start')
g = parser.add_mutually_exclusive_group()
g.add_argument('--iperf-client', action='store_true')
g.add_argument('--iperf-server', action='store_true')
# Mode
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'])
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true', help="Enable CSI (Runtime & Build if Interactive)")
args = parser.parse_args()
# --- VALIDATION LOGIC ---
# 1. Enforce Start IP for normal operations (flashing/configuring)
if args.target != 'all' and not args.start_ip:
parser.error("the following arguments are required: --start-ip")
# 2. Existing checks
if args.config_only and args.flash_only: parser.error("Conflicting modes")
if not args.config_only and not args.flash_only and args.target != 'all':
if not args.ssid or not args.password:
parser.error("SSID/PASS required")
return args
def extract_device_number(device_path):
match = re.search(r'(\d+)$', device_path)
return int(match.group(1)) if match else 0
def ask_user(prompt, default=None, choices=None):
choice_str = f" [{'|'.join(choices)}]" if choices else ""
default_str = f" [{default}]" if default else ""
while True:
val = input(f"{Colors.CYAN}{prompt}{choice_str}{default_str}: {Colors.RESET}").strip()
if not val and default: return default
if choices:
if val in choices: return val
print(f"{Colors.RED}Invalid choice.{Colors.RESET}")
else: return val
def ask_bool(prompt, default=True):
choice_str = " [Y/n]" if default else " [y/N]"
val = input(f"{Colors.CYAN}{prompt}{choice_str}: {Colors.RESET}").strip().lower()
if not val: return default
return val.startswith('y')
def get_sdkconfig_defaults(target, csi_enabled, ampdu_enabled):
defaults = ["sdkconfig.defaults"]
suffix = "csi" if csi_enabled else ""
defaults.append(f"sdkconfig.defaults.{target}{suffix}")
defaults.append("sdkconfig.defaults.ampdu" if ampdu_enabled else "sdkconfig.defaults.noampdu")
return ";".join(defaults)
async def build_task(project_dir, target, csi, ampdu, current_step=None, total_steps=None):
"""
Builds firmware with a full clean to prevent target conflicts.
"""
defaults_str = get_sdkconfig_defaults(target, csi, ampdu)
desc = f"Target={target}, CSI={'ON' if csi else 'OFF'}, AMPDU={'ON' if ampdu else 'OFF'}"
prefix = ""
if current_step is not None and total_steps is not None:
prefix = f"[{current_step}/{total_steps}] "
print(f" {prefix}Building [{desc}] ... ", end='', flush=True)
try:
# 1. FULL CLEAN (Critical for switching targets)
sdkconfig_path = project_dir / "sdkconfig"
build_path = project_dir / "build"
if sdkconfig_path.exists():
os.remove(sdkconfig_path)
if build_path.exists():
shutil.rmtree(build_path)
# 2. Set Target
proc = await asyncio.create_subprocess_exec(
'idf.py', 'set-target', target,
cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"{Colors.RED}FAIL (Set Target){Colors.RESET}")
print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}")
return False, f"Set Target Failed", 0
# 3. Build
start_time = time.time()
build_cmd = ['idf.py', '-D', f'SDKCONFIG_DEFAULTS={defaults_str}', 'build']
proc = await asyncio.create_subprocess_exec(
*build_cmd, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
duration = time.time() - start_time
if proc.returncode != 0:
print(f"{Colors.RED}FAIL{Colors.RESET}")
print(f"{Colors.RED} >> {stderr.decode().strip()[-500:]}{Colors.RESET}")
return False, f"Build Failed", duration
# 4. Create Unique Artifacts
build_dir = project_dir / 'build'
suffix = generate_config_suffix(target, csi, ampdu)
unique_app_name = "Unknown"
project_bin = get_project_binary_name(build_dir)
if project_bin:
unique_app_name = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin"
shutil.copy2(build_dir / project_bin, build_dir / unique_app_name)
boot_src = build_dir / "bootloader" / "bootloader.bin"
if boot_src.exists():
shutil.copy2(boot_src, build_dir / f"bootloader_{suffix}.bin")
part_src = build_dir / "partition_table" / "partition-table.bin"
if part_src.exists():
shutil.copy2(part_src, build_dir / f"partition-table_{suffix}.bin")
print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {unique_app_name}{Colors.RESET}")
return True, "Success", duration
except Exception as e:
print(f"{Colors.RED}ERROR: {e}{Colors.RESET}")
return False, str(e), 0
async def run_deployment(args):
print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}")
project_dir = Path(args.dir).resolve()
build_dir = project_dir / 'build'
# --- Target 'ALL' Mode ---
if args.target == 'all':
print(f"{Colors.YELLOW}Starting Batch Build Verification (12 Combinations){Colors.RESET}\n")
targets = ['esp32', 'esp32s3', 'esp32c5']
booleans = [False, True]
results = []
total_steps = len(targets) * len(booleans) * len(booleans)
current_step = 0
for target in targets:
for csi in booleans:
for ampdu in booleans:
current_step += 1
success, msg, dur = await build_task(project_dir, target, csi, ampdu, current_step, total_steps)
results.append({"cfg": f"{target.ljust(9)} CSI:{'ON ' if csi else 'OFF'} AMPDU:{'ON ' if ampdu else 'OFF'}", "ok": success, "dur": dur})
print(f"\n{Colors.BLUE}Batch Summary:{Colors.RESET}")
for r in results:
status = f"{Colors.GREEN}PASS{Colors.RESET}" if r['ok'] else f"{Colors.RED}FAIL{Colors.RESET}"
print(f" {r['cfg']} : {status} ({r['dur']:.1f}s)")
return
# --- Single Build Configuration ---
if not args.config_only:
target = args.target if args.target else 'esp32s3'
csi = args.csi_enable
ampdu = args.ampdu
if args.interactive:
print(f"\n{Colors.YELLOW}--- Build Configuration ---{Colors.RESET}")
target = ask_user("Target Chip", default=target, choices=['esp32', 'esp32s3', 'esp32c5'])
csi = ask_bool(f"Enable CSI Support?", default=csi)
ampdu = ask_bool(f"Enable AMPDU Aggregation?", default=ampdu)
args.csi_enable = csi
args.target = target
args.ampdu = ampdu
success, msg, _ = await build_task(project_dir, target, csi, ampdu, 1, 1)
if not success:
print(f"{Colors.RED}{msg}{Colors.RESET}")
return
# --- Device Detection & Flash ---
if args.devices:
devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')]
else:
devs = detect_esp32.detect_esp32_devices()
if not devs: print("No devices found"); return
devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)])
print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}")
start_ip = ipaddress.IPv4Address(args.start_ip)
max_c = args.max_concurrent if args.max_concurrent else (1 if args.devices and not args.config_only else DEFAULT_MAX_CONCURRENT_FLASH)
flash_sem = asyncio.Semaphore(max_c)
tasks = []
for i, dev in enumerate(devs):
offset = extract_device_number(dev.device) if args.devices else i
target_ip = str(start_ip + offset)
tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, build_dir, flash_sem).run())
results = await asyncio.gather(*tasks)
success = results.count(True)
print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}")
def main():
if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop())
try: asyncio.run(run_deployment(parse_args()))
except KeyboardInterrupt: sys.exit(1)
if __name__ == '__main__':
main()