From d1fc5b5a26eb979fb0a0833ee162c85cbd20773f Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 7 Dec 2025 20:49:30 -0800 Subject: [PATCH] more on batching --- async_batch_config_pro.py | 229 ++++++++++++++++++++++++++++++++++++++ mass_deploy.py | 43 +++---- 2 files changed, 244 insertions(+), 28 deletions(-) create mode 100644 async_batch_config_pro.py diff --git a/async_batch_config_pro.py b/async_batch_config_pro.py new file mode 100644 index 0000000..14027c1 --- /dev/null +++ b/async_batch_config_pro.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +""" +ESP32 Async Batch Configuration Tool (Pro Version) +Incorporates architectural patterns from flows.py: +- Regex-based state detection +- Event-driven synchronization +- Context-aware logging +""" + +import asyncio +import serial_asyncio +import sys +import os +import argparse +import ipaddress +import re +import time +import logging + +# Ensure detection script is available +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +try: + import detect_esp32 +except ImportError: + print("Error: 'detect_esp32.py' not found.") + sys.exit(1) + +# --- Logging Setup (Borrowed concept from flows.py) --- +class DeviceLoggerAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') +logger = logging.getLogger("BatchConfig") + +class Esp32Configurator: + """ + Manages the lifecycle of configuring a single ESP32 device. + Uses regex patterns similar to iperf_client in flows.py. + """ + def __init__(self, port, target_ip, config_args): + self.port = port + self.target_ip = target_ip + self.args = config_args + self.log = DeviceLoggerAdapter(logger, {'connid': port}) + + # Regex Patterns (Inspired by flows.py lines 350+) + # We pre-compile these for efficiency + self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) + self.regex_wifi_connected = re.compile(r'WiFi connected: Yes', re.IGNORECASE) + self.regex_config_saved = re.compile(r'Config saved', re.IGNORECASE) + self.regex_ready_prompt = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) + self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE) + + async def run(self): + """Main coroutine for this device""" + try: + reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) + except Exception as e: + self.log.error(f"Failed to open port: {e}") + return False + + try: + # 1. Hardware Reset via DTR/RTS + self.log.info("Resetting...") + writer.transport.serial.dtr = False + writer.transport.serial.rts = True + await asyncio.sleep(0.1) + writer.transport.serial.rts = False + await asyncio.sleep(0.1) + writer.transport.serial.dtr = True + + # 2. Monitor Boot Stream + # We wait until the device settles or we see a prompt + await self._wait_for_boot(reader) + + # 3. Send Configuration + await self._send_config(writer) + + # 4. Verification Loop + success = await self._verify_connection(reader) + return success + + except Exception as e: + self.log.error(f"Process Exception: {e}") + return False + finally: + writer.close() + await writer.wait_closed() + + async def _wait_for_boot(self, reader): + """Consumes boot logs until device looks ready""" + self.log.info("Waiting for boot...") + # Give it a max of 5 seconds to settle or show a prompt + end_time = time.time() + 5 + + while time.time() < end_time: + try: + line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5) + line = line_bytes.decode('utf-8', errors='ignore').strip() + if not line: continue + + # Check if device is ready to accept commands + if self.regex_ready_prompt.search(line): + self.log.info("Device ready detected.") + return + except asyncio.TimeoutError: + # If silence for 0.5s, it's probably waiting + continue + + async def _send_config(self, writer): + """Constructs and writes the config block""" + self.log.info(f"Sending config for IP {self.target_ip}...") + + config_str = ( + f"CFG\n" + f"SSID:{self.args.ssid}\n" + f"PASS:{self.args.password}\n" + f"IP:{self.target_ip}\n" + f"MASK:{self.args.netmask}\n" + f"GW:{self.args.gateway}\n" + f"DHCP:0\n" + f"BAND:{self.args.band}\n" + f"BW:{self.args.bandwidth}\n" + f"POWERSAVE:{self.args.powersave}\n" + f"MODE:{self.args.mode}\n" + f"MON_CH:{self.args.monitor_channel}\n" + f"END\n" + ) + + # Flush input buffer before writing to ensure clean state + # (Note: asyncio streams don't have a direct flush_input, relies on OS) + writer.write(config_str.encode('utf-8')) + await writer.drain() + + async def _verify_connection(self, reader): + """Reads stream verifying IP assignment""" + self.log.info("Verifying configuration...") + end_time = time.time() + 15 # 15s Timeout + + while time.time() < end_time: + try: + line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0) + line = line_bytes.decode('utf-8', errors='ignore').strip() + if not line: continue + + # Regex Checks + m_ip = self.regex_got_ip.search(line) + if m_ip: + got_ip = m_ip.group(1) + if got_ip == self.target_ip: + self.log.info(f"SUCCESS: Assigned {got_ip}") + return True + else: + self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}") + + if self.regex_error.search(line): + self.log.warning(f"Device reported error: {line}") + + except asyncio.TimeoutError: + continue + + self.log.error("Timeout waiting for IP confirmation.") + return False + +async def main_async(): + parser = argparse.ArgumentParser(description='Async ESP32 Batch Config (Pro)') + parser.add_argument('--start-ip', required=True, help='Start IP') + parser.add_argument('-s', '--ssid', default='ClubHouse2G') + parser.add_argument('-P', '--password', default='ez2remember') + parser.add_argument('-g', '--gateway', default='192.168.1.1') + parser.add_argument('-m', '--netmask', default='255.255.255.0') + parser.add_argument('-b', '--band', default='2.4G') + parser.add_argument('-B', '--bandwidth', default='HT20') + parser.add_argument('-ps', '--powersave', default='NONE') + parser.add_argument('-M', '--mode', default='STA') + parser.add_argument('-mc', '--monitor-channel', type=int, default=36) + + args = parser.parse_args() + + # 1. Detect + print("Scanning devices...") + devices = detect_esp32.detect_esp32_devices() + if not devices: + print("No devices found.") + return + + # Sort naturally + def natural_keys(d): + return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] + devices.sort(key=natural_keys) + + # 2. Parse IP + try: + start_ip_obj = ipaddress.IPv4Address(args.start_ip) + except: + print("Invalid IP") + return + + # 3. Create Tasks + tasks = [] + print(f"Configuring {len(devices)} devices concurrently...") + + for i, dev in enumerate(devices): + current_ip = str(start_ip_obj + i) + configurator = Esp32Configurator(dev.device, current_ip, args) + tasks.append(configurator.run()) + + # 4. Run All + results = await asyncio.gather(*tasks) + + # 5. Summary + success_count = results.count(True) + print("\n" + "="*40) + print(f"Total: {len(devices)}") + print(f"Success: {success_count}") + print(f"Failed: {len(devices) - success_count}") + print("="*40) + +if __name__ == '__main__': + try: + # Windows/Linux loop compatibility handling (borrowed from ssh_node.py lines 60-65) + if os.name == 'nt': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + + asyncio.run(main_async()) + except KeyboardInterrupt: + print("\nCancelled.") diff --git a/mass_deploy.py b/mass_deploy.py index e5abe39..9cf3f87 100755 --- a/mass_deploy.py +++ b/mass_deploy.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -ESP32 Mass Deployment Tool (Fixed for Parallel Flashing) -Uses esptool.py directly to bypass CMake locking issues. +ESP32 Mass Deployment Tool (Fixed for Parallel Flashing & Path Issues) +Uses esptool.py from the build directory to resolve relative paths correctly. """ import os @@ -27,7 +27,9 @@ class DeviceDeployer: baud_rate=460800, max_retries=2, verify_ping=True, num_devices=None, verbose=False, parallel=True): - self.project_dir = Path(project_dir) + self.project_dir = Path(project_dir).resolve() # Absolute path is safer + self.build_dir = self.project_dir / 'build' + self.ssid = ssid self.password = password self.start_ip = start_ip @@ -40,7 +42,6 @@ class DeviceDeployer: self.verbose = verbose self.parallel = parallel - # Mode detection self.config_mode = (self.ssid is not None and self.password is not None) if self.start_ip: @@ -61,6 +62,7 @@ class DeviceDeployer: print("ESP32 Mass Deployment Tool") print(f"{'='*70}{Colors.NC}") print(f"Project: {self.project_dir}") + print(f"Build Dir: {self.build_dir}") if self.config_mode: print(f"Mode: {Colors.YELLOW}FLASH + CONFIGURE{Colors.NC}") print(f"SSID: {self.ssid}") @@ -76,11 +78,9 @@ class DeviceDeployer: print(f"{Colors.BLUE}{'='*70}{Colors.NC}") def build_firmware(self): - """Build firmware once to generate flash_args""" print() print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}") try: - # We run build to ensure binary and flash_args exist subprocess.run( ['idf.py', 'build'], cwd=self.project_dir, @@ -88,13 +88,12 @@ class DeviceDeployer: capture_output=not self.verbose ) - # Verify flash_args exists (critical for parallel flashing) - flash_args_path = self.project_dir / 'build' / 'flash_args' + flash_args_path = self.build_dir / 'flash_args' if not flash_args_path.exists(): print(f"{Colors.RED}Error: build/flash_args not found.{Colors.NC}") return False - print(f"{Colors.GREEN}✓ Build complete (ready for parallel flash){Colors.NC}") + print(f"{Colors.GREEN}✓ Build complete{Colors.NC}") return True except subprocess.CalledProcessError as e: print(f"{Colors.RED}✗ Build failed!{Colors.NC}") @@ -129,7 +128,7 @@ class DeviceDeployer: target_ip = self.get_ip_for_index(index) if self.config_mode else "Existing IP" log_file = self.log_dir / f"esp32_deploy_{index}.log" log_lines = [] - flash_args_file = self.project_dir / 'build' / 'flash_args' + flash_args_file = 'flash_args' # Relative to build_dir def log(msg): log_lines.append(msg) @@ -139,10 +138,9 @@ class DeviceDeployer: for attempt in range(1, self.max_retries + 1): log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===") - # --- FLASHING (Using esptool.py directly) --- + # --- FLASHING --- log("Flashing via esptool...") try: - # Construct command: esptool.py -p PORT -b BAUD --before default_reset --after hard_reset write_flash @build/flash_args cmd = [ 'esptool.py', '-p', device, @@ -153,9 +151,10 @@ class DeviceDeployer: f"@{flash_args_file}" ] + # CRITICAL FIX: Run from build_dir so relative paths in flash_args are valid result = subprocess.run( cmd, - cwd=self.project_dir, # Run from project dir so relative paths in flash_args work + cwd=self.build_dir, check=True, capture_output=True, timeout=300 @@ -212,7 +211,6 @@ class DeviceDeployer: return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines} else: log("✗ Ping failed") - # Fall through to retry loop or fail except: log("✗ Ping error") else: @@ -228,23 +226,18 @@ class DeviceDeployer: time.sleep(2) - # Final failure return with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines} def deploy_all_parallel(self): print() print(f"{Colors.YELLOW}[3/4] Flashing (parallel)...{Colors.NC}") - - # 10 workers is a safe limit for USB hubs max_workers = min(10, len(self.devices)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(self.flash_and_configure, i, device): (i, device) for i, device in enumerate(self.devices) } - for future in as_completed(futures): result = future.result() self.results[result['index']] = result @@ -264,9 +257,7 @@ class DeviceDeployer: 'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW, 'FAILED': Colors.RED, 'TIMEOUT': Colors.RED }.get(result['status'], Colors.RED) - - print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s} → " - f"{result['ip']:15s} [{result['status']}]{Colors.NC}") + print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s} → {result['ip']:15s} [{result['status']}]{Colors.NC}") def deploy_all(self): if self.parallel: self.deploy_all_parallel() @@ -278,13 +269,11 @@ class DeviceDeployer: print(f"{Colors.BLUE}{'='*70}{Colors.NC}") success = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS') failed = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT']) - for i in range(len(self.devices)): if i in self.results: r = self.results[i] icon = f"{Colors.GREEN}✓{Colors.NC}" if r['status'] == 'SUCCESS' else f"{Colors.RED}✗{Colors.NC}" print(f"{icon} {r['device']:14s} → {r['ip']}") - print(f"{Colors.BLUE}{'='*70}{Colors.NC}") print(f"Total: {len(self.devices)}") print(f"Success: {success}") @@ -294,8 +283,8 @@ class DeviceDeployer: def main(): parser = argparse.ArgumentParser(description='ESP32 Mass Deployment Tool') parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project dir') - parser.add_argument('-s', '--ssid', help='WiFi SSID (Optional)') - parser.add_argument('-p', '--password', help='WiFi Password (Optional)') + parser.add_argument('-s', '--ssid', help='WiFi SSID') + parser.add_argument('-p', '--password', help='WiFi Password') parser.add_argument('--start-ip', default='192.168.1.51', help='Starting IP') parser.add_argument('-n', '--num-devices', type=int, default=30, help='Max devices') parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP') @@ -305,10 +294,8 @@ def main(): parser.add_argument('--no-verify', action='store_true', help='Skip ping check') parser.add_argument('--sequential', action='store_true', help='Run sequentially') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose') - args = parser.parse_args() - # Validate arguments: Must have both SSID+Pass or neither if (args.ssid and not args.password) or (args.password and not args.ssid): print(f"{Colors.RED}ERROR: Provide both SSID and Password for config, or neither for flash-only.{Colors.NC}") sys.exit(1)