ESP32/esp32_deploy.py

675 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import os
import asyncio
import serial_asyncio
import argparse
import ipaddress
import re
import time
import shutil
import logging
import glob
import random
from pathlib import Path
from serial.tools import list_ports
import subprocess
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Configuration ---
DEFAULT_MAX_CONCURRENT_FLASH = 4
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
class DeviceLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("Deploy")
def get_project_binary_name(build_dir):
ignored = {'bootloader.bin', 'partition-table.bin', 'ota_data_initial.bin'}
found = []
try:
for f in os.listdir(build_dir):
if f.endswith('.bin') and f not in ignored and 'partition' not in f:
found.append(f)
except FileNotFoundError:
return None
return found[0] if found else None
def generate_config_suffix(target, csi, ampdu):
csi_str = "csi_on" if csi else "csi_off"
ampdu_str = "ampdu_on" if ampdu else "ampdu_off"
return f"{target}_{csi_str}_{ampdu_str}"
def auto_detect_devices():
"""Prioritizes static udev paths (/dev/esp_port_XX) and removes duplicates."""
try:
ports = glob.glob('/dev/esp_port_*')
if ports:
# --- New Deduplication Logic ---
unique_map = {}
for p in ports:
try:
# Resolve symlink (e.g., /dev/esp_port_01 -> /dev/ttyUSB0)
real_path = os.path.realpath(p)
if real_path not in unique_map:
unique_map[real_path] = p
else:
# Conflict! We have both esp_port_1 and esp_port_01.
# Keep the "shorter" one (esp_port_1) to match your new scheme.
current_alias = unique_map[real_path]
if len(p) < len(current_alias):
unique_map[real_path] = p
except OSError:
continue
# Use the filtered list
ports = list(unique_map.values())
# -------------------------------
# Sort by suffix number
ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0)
print(f"{Colors.CYAN}Auto-detected {len(ports)} devices (filtered from {len(unique_map) + (len(glob.glob('/dev/esp_port_*')) - len(unique_map))} aliases).{Colors.RESET}")
return [type('obj', (object,), {'device': p}) for p in ports]
except Exception:
pass
return detect_esp32.detect_esp32_devices()
class UnifiedDeployWorker:
def __init__(self, port, target_ip, args, project_dir, flash_sem, total_devs):
self.port = port
self.target_ip = target_ip
self.args = args
self.project_dir = Path(project_dir)
self.flash_sem = flash_sem
self.total_devs = total_devs
self.log = DeviceLoggerAdapter(logger, {'connid': port})
self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)')
# Updated regex to look for the Shell Prompt
self.regex_prompt = re.compile(r'esp32>', re.IGNORECASE)
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_version = re.compile(r'APP_VERSION:\s*([0-9\.]+)', re.IGNORECASE)
async def run(self):
try:
if self.args.check_version:
return await self._query_version()
# --- Acquire Semaphore EARLY to protect Chip ID Detection ---
async with self.flash_sem:
detected_target = None
if self.args.target == 'auto' and not self.args.config_only:
detected_target = await self._identify_chip()
if not detected_target:
self.log.error("Failed to auto-detect chip type.")
return False
self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}")
target_to_use = detected_target
else:
target_to_use = self.args.target
if not self.args.config_only:
if self.args.flash_erase:
if not await self._erase_flash(): return False
self.target_for_flash = target_to_use
if not await self._flash_firmware(): return False
# --- Semaphore Released Here ---
await asyncio.sleep(2.0)
if not self.args.flash_only:
if self.args.ssid and self.args.password:
# Thundering Herd Mitigation
if self.total_devs > 1:
delay = random.uniform(0, self.total_devs * 0.5)
self.log.info(f"Staggering config start by {delay:.1f}s...")
await asyncio.sleep(delay)
success = False
for attempt in range(1, 4):
self.log.info(f"Configuring (Attempt {attempt}/3)...")
if await self._configure_device():
success = True
break
self.log.warning(f"Config failed on attempt {attempt}. Retrying...")
await asyncio.sleep(2.0)
if not success:
self.log.error(f"{Colors.RED}Config verify failed after 3 attempts.{Colors.RESET}")
return False
else:
self.log.warning("No SSID/Password provided, skipping config")
return True
except Exception as e:
self.log.error(f"Worker Exception: {e}")
return False
async def _configure_device(self):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
return False
try:
# Reset DTR/RTS logic
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
writer.transport.serial.dtr = False
# 1. Wait for Shell Prompt
if not await self._wait_for_prompt(reader, writer, timeout=15):
self.log.error("Shell prompt not detected.")
return False
# 2. Send Configuration via CLI
# Command: wifi_config -s "SSID" -p "PASS" -i "IP"
# Note: The Shell will auto-reboot after this command.
cmd = f'wifi_config -s "{self.args.ssid}" -p "{self.args.password}" -i "{self.target_ip}"'
if not self.args.iperf_client and not self.args.iperf_server:
# If just connecting, maybe we want DHCP?
# But if target_ip is set, we force static.
pass
self.log.info(f"Sending: {cmd}")
writer.write(f"{cmd}\n".encode())
await writer.drain()
# 3. Wait for the reboot and new prompt
# The device prints "Rebooting..." then restarts.
self.log.info("Waiting for reboot...")
await asyncio.sleep(3.0) # Give it time to actually reset
if not await self._wait_for_prompt(reader, writer, timeout=20):
self.log.error("Device did not return to prompt after reboot.")
return False
self.log.info(f"{Colors.GREEN}Reboot complete. Shell Ready.{Colors.RESET}")
# 4. (Optional) Start iperf if requested
# The new firmware does not auto-start iperf on boot unless commanded.
if not self.args.no_iperf:
self.log.info("Starting iperf listener...")
writer.write(b"iperf start\n")
await writer.drain()
await asyncio.sleep(0.5)
return True
except Exception as e:
self.log.error(f"Config Error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _wait_for_prompt(self, reader, writer, timeout):
end_time = time.time() + timeout
last_poke = time.time()
while time.time() < end_time:
# Poke 'enter' occasionally to solicit a prompt
if time.time() - last_poke > 1.0:
writer.write(b'\n')
await writer.drain()
last_poke = time.time()
try:
line_bytes = await asyncio.wait_for(reader.read(1024), timeout=0.1)
output = line_bytes.decode('utf-8', errors='ignore')
if "esp32>" in output:
return True
except asyncio.TimeoutError:
continue
except Exception:
break
return False
# [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware AS IS]
async def _query_version(self):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
writer.transport.serial.dtr = False
writer.transport.serial.rts = False
writer.write(b'\n')
await writer.drain()
await asyncio.sleep(0.1)
writer.write(b'version\n')
await writer.drain()
found_version = "Unknown"
timeout = time.time() + 2.0
while time.time() < timeout:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_bytes.decode('utf-8', errors='ignore').strip()
m = self.regex_version.search(line)
if m:
found_version = m.group(1)
break
except asyncio.TimeoutError:
continue
writer.close()
await writer.wait_closed()
return found_version
except Exception as e:
self.log.error(f"Version Check Error: {e}")
return "Error"
async def _identify_chip(self):
for attempt in range(1, 4):
cmd = ['esptool.py', '-p', self.port, 'chip_id']
try:
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
output = stdout.decode() + stderr.decode()
match = self.regex_chip_type.search(output)
if match:
return match.group(1).lower().replace('-', '')
if attempt < 3: await asyncio.sleep(1.0)
except Exception as e:
self.log.warning(f"Chip ID check exception (Attempt {attempt}): {e}")
return None
async def _erase_flash(self):
cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash']
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
_, stderr = await proc.communicate()
if proc.returncode == 0: return True
self.log.error(f"Erase failed: {stderr.decode()}")
return False
async def _flash_firmware(self):
target_to_use = getattr(self, 'target_for_flash', self.args.target)
if target_to_use == 'auto':
self.log.error("Logic Error: Target is still 'auto' inside flash firmware.")
return False
suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu)
firmware_dir = self.project_dir / "firmware"
unique_app = None
if firmware_dir.exists():
for f in os.listdir(firmware_dir):
if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition") and not f.startswith("ota_data") and not f.startswith("phy_init"):
unique_app = f
break
if not unique_app:
self.log.error(f"Binary for config '{suffix}' not found in firmware/.")
return False
unique_boot = f"bootloader_{suffix}.bin"
unique_part = f"partition-table_{suffix}.bin"
unique_ota = f"ota_data_initial_{suffix}.bin"
flash_args_path = firmware_dir / f"flash_args_{suffix}"
if not flash_args_path.exists():
self.log.error(f"flash_args for {suffix} not found")
return False
try:
with open(flash_args_path, 'r') as f:
content = f.read().replace('\n', ' ').strip()
raw_args = [x for x in content.split(' ') if x]
final_args = []
for arg in raw_args:
if arg.endswith('bootloader.bin'): final_args.append(str(firmware_dir / unique_boot))
elif arg.endswith('partition-table.bin'): final_args.append(str(firmware_dir / unique_part))
elif arg.endswith('ota_data_initial.bin'):
if (firmware_dir / unique_ota).exists(): final_args.append(str(firmware_dir / unique_ota))
else: continue
elif arg.endswith('phy_init_data.bin'): final_args.append(arg)
elif arg.endswith('.bin'): final_args.append(str(firmware_dir / unique_app))
else: final_args.append(arg)
cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud),
'--before', 'default_reset', '--after', 'hard_reset',
'write_flash'] + final_args
self.log.info(f"Flashing {firmware_dir / unique_app}...")
proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
except asyncio.TimeoutError:
proc.kill()
return False
if proc.returncode == 0: return True
self.log.error(f"Flash failed: {stderr.decode()}")
return False
except Exception as e:
self.log.error(f"Flash Prep Error: {e}")
return False
def update_udev_map(dry_run=False):
"""
Scans all USB serial devices, sorts them by physical topology (Bus/Port),
and generates a udev rule file to map them to /dev/esp_port_XX.
"""
print(f"{Colors.BLUE}Scanning USB topology to generate stable port maps...{Colors.RESET}")
# Get all USB serial devices
devices = list(list_ports.grep("USB|ACM|CP210|FT232"))
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
return
# Sort by "location" (Physical USB path: e.g., 1-1.2.3)
# This guarantees esp_port_01 is always the first physical port.
devices.sort(key=lambda x: x.location if x.location else x.device)
generated_rules = []
print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
print("-" * 65)
for i, dev in enumerate(devices):
port_num = i + 1
symlink = f"esp_port_{port_num}" # e.g., esp_port_1
# Get detailed udev info to find the stable physical path ID
try:
cmd = ['udevadm', 'info', '--name', dev.device, '--query=property']
proc = subprocess.run(cmd, capture_output=True, text=True)
props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line)
# ID_PATH is the robust physical identifier (e.g., pci-0000:00:14.0-usb-0:1.4.3:1.0)
dev_path = props.get('ID_PATH', '')
if not dev_path:
print(f"{Colors.YELLOW}Skipping {dev.device} (No ID_PATH found){Colors.RESET}")
continue
# Generate the rule
rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev_path}", SYMLINK+="{symlink}"'
generated_rules.append(rule)
print(f"{dev.location:<20} | {dev.device:<15} | {symlink}")
except Exception as e:
print(f"Error inspecting {dev.device}: {e}")
print("-" * 65)
rules_content = "# Auto-generated by esp32_deploy.py\n" + "\n".join(generated_rules) + "\n"
rule_file = "/etc/udev/rules.d/99-esp32-stable.rules"
if dry_run:
print(f"\n{Colors.YELLOW}--- DRY RUN: Rules that would be written to {rule_file} ---{Colors.RESET}")
print(rules_content)
else:
if os.geteuid() != 0:
print(f"\n{Colors.RED}ERROR: Root privileges required to write udev rules.{Colors.RESET}")
print(f"Run: sudo ./esp32_deploy.py --map-ports")
return
print(f"\nWriting rules to {rule_file}...")
try:
with open(rule_file, 'w') as f:
f.write(rules_content)
print("Reloading udev rules...")
subprocess.run(['udevadm', 'control', '--reload-rules'], check=True)
subprocess.run(['udevadm', 'trigger'], check=True)
print(f"{Colors.GREEN}Success! Devices re-mapped.{Colors.RESET}")
except Exception as e:
print(f"{Colors.RED}Failed to write rules: {e}{Colors.RESET}")
def parse_args():
parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool')
parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options')
parser.add_argument('--target', default='auto', choices=['esp32', 'esp32s3', 'esp32c5', 'all', 'auto'], help="Target Chip")
parser.add_argument('--ampdu', action='store_true', help='Enable AMPDU in build')
parser.add_argument('--no-ampdu', action='store_false', dest='ampdu', help='Disable AMPDU')
parser.set_defaults(ampdu=True)
parser.add_argument('--config-only', action='store_true')
parser.add_argument('--flash-only', action='store_true')
parser.add_argument('--flash-erase', action='store_true')
parser.add_argument('--check-version', action='store_true', help='Check version of connected devices')
parser.add_argument('-d', '--dir', default=os.getcwd())
parser.add_argument('-b', '--baud', type=int, default=460800)
parser.add_argument('--devices', type=str)
parser.add_argument('--max-concurrent', type=int, default=None)
parser.add_argument('--start-ip', help='Start IP (Required unless --target all or --check-version)')
parser.add_argument('--ip-device-based', action='store_true', help="Use /dev/ttyUSBx number as IP offset")
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
parser.add_argument('-P', '--password', default='ez2remember')
parser.add_argument('-g', '--gateway', default='192.168.1.1')
parser.add_argument('-m', '--netmask', default='255.255.255.0')
parser.add_argument('--band', default='2.4G')
parser.add_argument('-B', '--bandwidth', default='HT20')
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('--iperf-period', type=float, default=0.01)
parser.add_argument('--iperf-burst', type=int, default=1)
parser.add_argument('--iperf-len', type=int, default=1470)
parser.add_argument('--iperf-proto', default='UDP')
parser.add_argument('--iperf-dest-ip', default='192.168.1.50')
parser.add_argument('--iperf-port', type=int, default=5001)
parser.add_argument('--no-iperf', action='store_true')
parser.add_argument('--iperf-client', action='store_true')
parser.add_argument('--iperf-server', action='store_true')
parser.add_argument('-M', '--mode', default='STA')
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true')
parser.add_argument('--map-ports', action='store_true', help="Rescan USB topology and generate udev rules for esp_port_xx")
args = parser.parse_args()
if args.target != 'all' and not args.start_ip and not args.check_version and not args.map_ports:
parser.error("the following arguments are required: --start-ip")
if args.config_only and args.flash_only: parser.error("Conflicting modes")
return args
def extract_device_number(device_path):
match = re.search(r'(\d+)$', device_path)
return int(match.group(1)) if match else 0
def ask_user(prompt, default=None, choices=None):
choice_str = f" [{'|'.join(choices)}]" if choices else ""
default_str = f" [{default}]" if default else ""
while True:
val = input(f"{Colors.CYAN}{prompt}{choice_str}{default_str}: {Colors.RESET}").strip()
if not val and default: return default
if choices:
if val in choices: return val
print(f"{Colors.RED}Invalid choice.{Colors.RESET}")
else: return val
def ask_bool(prompt, default=True):
choice_str = " [Y/n]" if default else " [y/N]"
val = input(f"{Colors.CYAN}{prompt}{choice_str}: {Colors.RESET}").strip().lower()
if not val: return default
return val.startswith('y')
def get_sdkconfig_defaults(target, csi_enabled, ampdu_enabled):
defaults = ["sdkconfig.defaults"]
suffix = "csi" if csi_enabled else ""
defaults.append(f"sdkconfig.defaults.{target}{suffix}")
defaults.append("sdkconfig.defaults.ampdu" if ampdu_enabled else "sdkconfig.defaults.noampdu")
return ";".join(defaults)
async def build_task(project_dir, target, csi, ampdu, current_step=None, total_steps=None):
defaults_str = get_sdkconfig_defaults(target, csi, ampdu)
desc = f"Target={target}, CSI={'ON' if csi else 'OFF'}, AMPDU={'ON' if ampdu else 'OFF'}"
prefix = f"[{current_step}/{total_steps}] " if current_step else ""
print(f" {prefix}Building [{desc}] ... ", end='', flush=True)
try:
output_dir = project_dir / "firmware"
output_dir.mkdir(exist_ok=True)
sdkconfig_path = project_dir / "sdkconfig"
build_path = project_dir / "build"
if sdkconfig_path.exists(): os.remove(sdkconfig_path)
if build_path.exists(): shutil.rmtree(build_path)
proc = await asyncio.create_subprocess_exec('idf.py', 'set-target', target, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
_, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"{Colors.RED}FAIL (Set Target){Colors.RESET}")
return False, f"Set Target Failed", 0
start_time = time.time()
build_cmd = ['idf.py', '-D', f'SDKCONFIG_DEFAULTS={defaults_str}', 'build']
proc = await asyncio.create_subprocess_exec(*build_cmd, cwd=project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
_, stderr = await proc.communicate()
duration = time.time() - start_time
if proc.returncode != 0:
print(f"{Colors.RED}FAIL{Colors.RESET}")
return False, f"Build Failed", duration
build_dir = project_dir / 'build'
suffix = generate_config_suffix(target, csi, ampdu)
unique_app_name = "Unknown"
project_bin = get_project_binary_name(build_dir)
if project_bin:
unique_app_name = f"{os.path.splitext(project_bin)[0]}_{suffix}.bin"
shutil.copy2(build_dir / project_bin, output_dir / unique_app_name)
boot_src = build_dir / "bootloader" / "bootloader.bin"
if boot_src.exists(): shutil.copy2(boot_src, output_dir / f"bootloader_{suffix}.bin")
part_src = build_dir / "partition_table" / "partition-table.bin"
if part_src.exists(): shutil.copy2(part_src, output_dir / f"partition-table_{suffix}.bin")
ota_src = build_dir / "ota_data_initial.bin"
if ota_src.exists(): shutil.copy2(ota_src, output_dir / f"ota_data_initial_{suffix}.bin")
flash_src = build_dir / "flash_args"
if flash_src.exists(): shutil.copy2(flash_src, output_dir / f"flash_args_{suffix}")
full_path = output_dir / unique_app_name
print(f"{Colors.GREEN}OK ({duration:.1f}s) -> {full_path}{Colors.RESET}")
return True, "Success", duration
except Exception as e:
print(f"{Colors.RED}ERROR: {e}{Colors.RESET}")
return False, str(e), 0
async def run_deployment(args):
print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}\n ESP32 Unified Deployment Tool\n{Colors.BLUE}{'='*60}{Colors.RESET}")
project_dir = Path(args.dir).resolve()
if args.target == 'all':
print(f"{Colors.YELLOW}Starting Batch Build Verification (12 Combinations){Colors.RESET}")
firmware_dir = project_dir / "firmware"
if firmware_dir.exists():
try: shutil.rmtree(firmware_dir)
except Exception as e: return
firmware_dir.mkdir(exist_ok=True)
targets = ['esp32', 'esp32s3', 'esp32c5']
booleans = [False, True]
results = []
total_steps = len(targets) * len(booleans) * len(booleans)
current_step = 0
for target in targets:
for csi in booleans:
for ampdu in booleans:
current_step += 1
success, msg, dur = await build_task(project_dir, target, csi, ampdu, current_step, total_steps)
results.append({"cfg": f"{target.ljust(9)} CSI:{'ON ' if csi else 'OFF'} AMPDU:{'ON ' if ampdu else 'OFF'}", "ok": success, "dur": dur})
print(f"\n{Colors.BLUE}Batch Summary:{Colors.RESET}")
for r in results:
status = f"{Colors.GREEN}PASS{Colors.RESET}" if r['ok'] else f"{Colors.RED}FAIL{Colors.RESET}"
print(f" {r['cfg']} : {status} ({r['dur']:.1f}s)")
return
if not args.config_only and args.target != 'auto' and not args.check_version:
target = args.target if args.target else 'esp32s3'
csi = args.csi_enable
ampdu = args.ampdu
if args.interactive:
print(f"\n{Colors.YELLOW}--- Build Configuration ---{Colors.RESET}")
target = ask_user("Target Chip", default=target, choices=['esp32', 'esp32s3', 'esp32c5'])
csi = ask_bool(f"Enable CSI Support?", default=csi)
ampdu = ask_bool(f"Enable AMPDU Aggregation?", default=ampdu)
args.csi_enable = csi
args.target = target
args.ampdu = ampdu
success, msg, _ = await build_task(project_dir, target, csi, ampdu, 1, 1)
if not success:
print(f"{Colors.RED}{msg}{Colors.RESET}")
return
elif args.target == 'auto' and not args.config_only and not args.check_version:
print(f"{Colors.YELLOW}Target 'auto' selected. Skipping build step (assuming artifacts in firmware/).{Colors.RESET}")
if args.devices and args.devices.lower() != 'all':
devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')]
else:
devs = auto_detect_devices()
if not devs: print("No devices found"); return
devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)])
print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}")
start_ip = ipaddress.IPv4Address(args.start_ip) if args.start_ip else ipaddress.IPv4Address('0.0.0.0')
max_c = args.max_concurrent if args.max_concurrent else (1 if args.devices and not args.config_only else DEFAULT_MAX_CONCURRENT_FLASH)
flash_sem = asyncio.Semaphore(max_c)
tasks = []
for i, dev in enumerate(devs):
raw_port_number = extract_device_number(dev.device)
if args.ip_device_based:
if "esp_port" in dev.device:
offset = raw_port_number - 1
else:
offset = raw_port_number
target_ip = str(start_ip + offset)
if not args.check_version:
print(f" [{dev.device}] Device-based IP: {target_ip} (Raw: {raw_port_number}, Offset: {offset})")
else:
offset = i
target_ip = str(start_ip + offset)
if not args.check_version:
print(f" [{dev.device}] Sequential IP: {target_ip} (Offset: +{offset})")
tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, project_dir, flash_sem, len(devs)).run())
results = await asyncio.gather(*tasks)
if args.check_version:
print(f"\n{Colors.BLUE}--- FIRMWARE VERSION AUDIT ---{Colors.RESET}")
print(f"{'Device':<20} | {'Version':<15}")
print("-" * 40)
for dev, res in zip(devs, results):
ver_color = Colors.GREEN if res != "Unknown" and res != "Error" else Colors.RED
print(f"{dev.device:<20} | {ver_color}{res:<15}{Colors.RESET}")
return
success = results.count(True)
print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}")
def main():
args = parse_args()
# --- INTERCEPT --map-ports HERE ---
if args.map_ports:
# Run synchronously, no async loop needed
update_udev_map(dry_run=False)
sys.exit(0)
# Standard async deployment flow
if os.name == 'nt':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
try:
asyncio.run(run_deployment(args))
except KeyboardInterrupt:
sys.exit(1)
if __name__ == '__main__':
main()