From 0a4bce5bf61bdfe1fbecab971fc8243c67684db5 Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 7 Dec 2025 20:57:08 -0800 Subject: [PATCH] add asyncio based scripts --- async_batch_config_pro.py | 0 async_find_failed.py | 220 ++++++++++++++++++++++++++++++ async_mass_config.py | 224 +++++++++++++++++++++++++++++++ async_mass_deploy.py | 276 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 720 insertions(+) mode change 100644 => 100755 async_batch_config_pro.py create mode 100755 async_find_failed.py create mode 100755 async_mass_config.py create mode 100755 async_mass_deploy.py diff --git a/async_batch_config_pro.py b/async_batch_config_pro.py old mode 100644 new mode 100755 diff --git a/async_find_failed.py b/async_find_failed.py new file mode 100755 index 0000000..13635bf --- /dev/null +++ b/async_find_failed.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" +ESP32 Async Fleet Monitor & Recovery Tool +Audits 30+ devices concurrently in seconds. +""" + +import asyncio +import serial_asyncio +import sys +import os +import argparse +import re +import detect_esp32 + +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + RESET = '\033[0m' + +# --- Logic: Single Device Audit --- +async def audit_device(port): + """ + Connects to a device, resets it, asks for status, and parses output. + """ + result = { + 'port': port, + 'mode': 'Unknown', + 'status': 'Unknown', + 'led': 'Unknown', + 'ip': None, + 'failed': False + } + + try: + reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) + except Exception as e: + result['status'] = 'Serial Error' + result['failed'] = True + return result + + try: + # 1. Reset (DTR/RTS) to ensure fresh state + writer.transport.serial.dtr = False; writer.transport.serial.rts = True + await asyncio.sleep(0.1) + writer.transport.serial.rts = False + await asyncio.sleep(0.1) + writer.transport.serial.dtr = True + + # 2. Wait for Boot (2s is usually enough for app_main to start) + await asyncio.sleep(2.0) + + # Clear buffer + try: await asyncio.wait_for(reader.read(10000), timeout=0.1) + except: pass + + # 3. Send Command + writer.write(b'\nmode_status\n') + await writer.drain() + + # 4. Read Response (Timeout 1.5s) + response_buffer = "" + end_time = asyncio.get_event_loop().time() + 1.5 + + while asyncio.get_event_loop().time() < end_time: + try: + data = await asyncio.wait_for(reader.read(1024), timeout=0.5) + response_buffer += data.decode('utf-8', errors='ignore') + if "GPS synced" in response_buffer: break + except asyncio.TimeoutError: + continue + + # 5. Parse Output + # Mode + m_mode = re.search(r"Current mode: (.*)", response_buffer) + if m_mode: result['mode'] = m_mode.group(1).strip() + + # LED + m_led = re.search(r"LED state: (.*)", response_buffer) + if m_led: result['led'] = m_led.group(1).strip() + + # Connection + m_conn = re.search(r"WiFi connected: (.*)", response_buffer) + if m_conn: + if "Yes" in m_conn.group(1): + result['status'] = "Connected" + else: + result['status'] = "Disconnected" + if result['mode'] == 'STA': result['failed'] = True + + # IP + m_ip = re.search(r"ip:(\d+\.\d+\.\d+\.\d+)", response_buffer, re.IGNORECASE) + if m_ip: result['ip'] = m_ip.group(1) + + # Failure Logic + if "Red" in result['led'] or "Failed" in result['led']: + result['failed'] = True + if result['mode'] == 'Unknown' and not result['ip']: + # Only mark as failed if we got absolutely nothing intelligible + if len(response_buffer) < 10: + result['status'] = "No Response" + result['failed'] = True + else: + result['status'] = "Logs Only (No Status)" + + except Exception as e: + result['status'] = f"Error: {e}" + result['failed'] = True + finally: + writer.close() + await writer.wait_closed() + + return result + +# --- Logic: Recovery Actions --- +async def send_command_async(port, command): + try: + reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) + writer.write(f"\n{command}\n".encode('utf-8')) + await writer.drain() + writer.close() + await writer.wait_closed() + return True + except: + return False + +async def reboot_async(port): + try: + reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) + writer.transport.serial.dtr = False; writer.transport.serial.rts = True + await asyncio.sleep(0.1) + writer.transport.serial.rts = False + await asyncio.sleep(0.1) + writer.transport.serial.dtr = True + writer.close() + await writer.wait_closed() + return True + except: + return False + +# --- Main --- +async def main_async(): + print(f"{Colors.BLUE}{'='*80}{Colors.RESET}") + print(f"ESP32 Async Fleet Auditor") + print(f"{Colors.BLUE}{'='*80}{Colors.RESET}") + + devices = detect_esp32.detect_esp32_devices() + if not devices: + print("No devices found.") + return + + # Sort + def natural_keys(d): + return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] + devices.sort(key=natural_keys) + + print(f"Scanning {len(devices)} devices concurrently...") + + # Run Audit + tasks = [audit_device(dev.device) for dev in devices] + results = await asyncio.gather(*tasks) + + # Print Table + print(f"\n{'PORT':<15} {'MODE':<10} {'STATUS':<25} {'LED STATE':<25}") + print("-" * 80) + + failed_list = [] + + for res in results: + # Colorize + c_stat = Colors.GREEN if res['status'] == 'Connected' else Colors.YELLOW + if res['failed']: c_stat = Colors.RED + + c_led = Colors.RESET + if "Red" in res['led']: c_led = Colors.RED + elif "Green" in res['led']: c_led = Colors.GREEN + elif "Blue" in res['led']: c_led = Colors.BLUE + + status_txt = res['status'] + if res['ip']: status_txt += f" ({res['ip']})" + + print(f"{res['port']:<15} {res['mode']:<10} {c_stat}{status_txt:<25}{Colors.RESET} {c_led}{res['led']:<25}{Colors.RESET}") + + if res['failed']: + failed_list.append(res['port']) + + # Recovery Menu + if not failed_list: + print(f"\n{Colors.GREEN}✓ All systems nominal.{Colors.RESET}") + return + + print(f"\n{Colors.RED}Found {len(failed_list)} failed device(s).{Colors.RESET}") + print("Options:") + print(" [1] Send 'mode_sta' (Soft Reconnect)") + print(" [2] Hardware Reboot") + print(" [3] Exit") + + # Note: Input is blocking, but that's fine for a menu at the end + choice = input("\nEnter choice: ").strip() + + if choice == '1': + print(f"Sending 'mode_sta' to {len(failed_list)} devices...") + tasks = [send_command_async(p, "mode_sta") for p in failed_list] + await asyncio.gather(*tasks) + print("Done.") + + elif choice == '2': + print(f"Rebooting {len(failed_list)} devices...") + tasks = [reboot_async(p) for p in failed_list] + await asyncio.gather(*tasks) + print("Done.") + +if __name__ == '__main__': + try: + if os.name == 'nt': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + asyncio.run(main_async()) + except KeyboardInterrupt: + print("\nExit.") diff --git a/async_mass_config.py b/async_mass_config.py new file mode 100755 index 0000000..30ebf41 --- /dev/null +++ b/async_mass_config.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +""" +ESP32 Async Batch Configuration Tool +Configures 30+ ESP32 devices concurrently using non-blocking I/O. +Features: +- Concurrent execution (configure 30 devices in <20 seconds) +- Regex-based state detection (Robust against noise) +- Context-aware logging +""" + +import asyncio +import serial_asyncio +import sys +import os +import argparse +import ipaddress +import re +import time +import logging + +# Ensure detection script is available +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +try: + import detect_esp32 +except ImportError: + print("Error: 'detect_esp32.py' not found.") + sys.exit(1) + +# --- Logging Setup --- +class DeviceLoggerAdapter(logging.LoggerAdapter): + """Prefixes log messages with the device port name""" + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + +# Configure clean logging format +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') +logger = logging.getLogger("BatchConfig") + +class AsyncConfigurator: + """ + Manages the lifecycle of configuring a single ESP32 device via Async Serial. + """ + def __init__(self, port, target_ip, args): + self.port = port + self.target_ip = target_ip + self.args = args + self.log = DeviceLoggerAdapter(logger, {'connid': port}) + + # Pre-compile Regex patterns for efficiency (Parsing logic) + self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) + self.regex_config_saved = re.compile(r'Config saved|saved to NVS', re.IGNORECASE) + # Prompts that indicate the device is alive and listening + self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) + self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE) + + async def run(self): + """Main execution workflow for this device""" + try: + reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) + except Exception as e: + self.log.error(f"Failed to open port: {e}") + return False + + try: + # 1. Hardware Reset + self.log.info("Resetting...") + writer.transport.serial.dtr = False + writer.transport.serial.rts = True + await asyncio.sleep(0.1) + writer.transport.serial.rts = False + await asyncio.sleep(0.1) + writer.transport.serial.dtr = True + + # 2. Wait for Boot (Regex detection) + if not await self._wait_for_boot(reader): + self.log.warning("Boot prompt missed, sending config anyway...") + + # 3. Send Configuration + await self._send_config(writer) + + # 4. Verify Success + return await self._verify_configuration(reader) + + except Exception as e: + self.log.error(f"Exception: {e}") + return False + finally: + writer.close() + await writer.wait_closed() + + async def _wait_for_boot(self, reader): + """Reads stream until a known 'ready' prompt appears""" + self.log.info("Waiting for boot...") + timeout = time.time() + 5 # 5 second boot timeout + + while time.time() < timeout: + try: + # Read line with a short timeout to keep checking total time + line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5) + line = line_bytes.decode('utf-8', errors='ignore').strip() + if not line: continue + + if self.regex_ready.search(line): + return True + except asyncio.TimeoutError: + continue + return False + + async def _send_config(self, writer): + """Builds and transmits the configuration command""" + self.log.info(f"Sending config for IP {self.target_ip}...") + + config_str = ( + f"CFG\n" + f"SSID:{self.args.ssid}\n" + f"PASS:{self.args.password}\n" + f"IP:{self.target_ip}\n" + f"MASK:{self.args.netmask}\n" + f"GW:{self.args.gateway}\n" + f"DHCP:0\n" + f"BAND:{self.args.band}\n" + f"BW:{self.args.bandwidth}\n" + f"POWERSAVE:{self.args.powersave}\n" + f"MODE:{self.args.mode}\n" + f"MON_CH:{self.args.monitor_channel}\n" + f"END\n" + ) + + writer.write(config_str.encode('utf-8')) + await writer.drain() + + async def _verify_configuration(self, reader): + """Monitors output for confirmation of IP assignment""" + self.log.info("Verifying configuration...") + timeout = time.time() + 15 # 15s connection timeout + + while time.time() < timeout: + try: + line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0) + line = line_bytes.decode('utf-8', errors='ignore').strip() + if not line: continue + + # Check for IP assignment + m_ip = self.regex_got_ip.search(line) + if m_ip: + got_ip = m_ip.group(1) + if got_ip == self.target_ip: + self.log.info(f"SUCCESS: Assigned {got_ip}") + return True + else: + self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}") + + # Check for errors + if self.regex_error.search(line): + self.log.warning(f"Device Error: {line}") + + except asyncio.TimeoutError: + continue + + self.log.error("Timeout: Device did not report IP address.") + return False + +async def main_async(): + parser = argparse.ArgumentParser(description='Async ESP32 Batch Config') + + # Arguments matching your existing tools + parser.add_argument('--start-ip', required=True, help='Starting Static IP') + parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='WiFi SSID') + parser.add_argument('-P', '--password', default='ez2remember', help='WiFi password') + parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP') + parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') + parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G']) + parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80']) + parser.add_argument('-ps', '--powersave', default='NONE') + parser.add_argument('-M', '--mode', default='STA') + parser.add_argument('-mc', '--monitor-channel', type=int, default=36) + + args = parser.parse_args() + + # 1. Detect + print("Step 1: Detecting Devices...") + devices = detect_esp32.detect_esp32_devices() + if not devices: + print("No devices found.") + return + + # Sort naturally (ttyUSB2 before ttyUSB10) + def natural_keys(d): + return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] + devices.sort(key=natural_keys) + + try: + start_ip_obj = ipaddress.IPv4Address(args.start_ip) + except: + print(f"Invalid IP: {args.start_ip}") + return + + # 2. Configure Concurrently + print(f"Step 2: Configuring {len(devices)} devices concurrently...") + tasks = [] + + for i, dev in enumerate(devices): + current_ip = str(start_ip_obj + i) + configurator = AsyncConfigurator(dev.device, current_ip, args) + tasks.append(configurator.run()) + + # Run everything at once + results = await asyncio.gather(*tasks) + + # 3. Report + success_count = results.count(True) + print("\n" + "="*40) + print(f"Total Devices: {len(devices)}") + print(f"Success: {success_count}") + print(f"Failed: {len(devices) - success_count}") + print("="*40) + +if __name__ == '__main__': + try: + if os.name == 'nt': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + asyncio.run(main_async()) + except KeyboardInterrupt: + print("\nAborted by user.") diff --git a/async_mass_deploy.py b/async_mass_deploy.py new file mode 100755 index 0000000..10008e1 --- /dev/null +++ b/async_mass_deploy.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +ESP32 Async Mass Deployment Tool +Combines parallel flashing (via esptool) with async configuration. +Features: +- Semaphore-limited flashing (prevents USB hub crashes) +- Regex-based boot detection (faster/reliable config) +- Parallel verification +""" + +import asyncio +import serial_asyncio +import sys +import os +import argparse +import ipaddress +import re +import time +import logging +from pathlib import Path + +# Ensure detection script is available +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +try: + import detect_esp32 +except ImportError: + print("Error: 'detect_esp32.py' not found.") + sys.exit(1) + +# --- Configuration --- +MAX_CONCURRENT_FLASH = 8 # Limit active flashes to prevent USB brownouts + +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + RESET = '\033[0m' + +# Logger Adapter for context +class DeviceLoggerAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') +logger = logging.getLogger("Deploy") + +class DeployWorker: + def __init__(self, port, target_ip, args, build_dir, flash_sem): + self.port = port + self.target_ip = target_ip + self.args = args + self.build_dir = build_dir + self.flash_sem = flash_sem + self.log = DeviceLoggerAdapter(logger, {'connid': port}) + + # Regex Patterns + self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) + self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) + + async def run(self): + """Main deployment workflow""" + try: + # 1. FLASHING PHASE (Protected by Semaphore) + async with self.flash_sem: + if self.args.erase: + if not await self._erase_flash(): return False + + if not await self._flash_firmware(): return False + + # 2. CONFIG PHASE (Serial interaction) + # We assume flash resets device. We open serial immediately to catch boot. + # Note: We wait a tiny bit to let esptool release the port handle + await asyncio.sleep(0.5) + + if self.args.ssid and self.args.password: + if not await self._configure_device(): return False + else: + self.log.info(f"{Colors.GREEN}Flash Complete (NVS Preserved){Colors.RESET}") + + return True + + except Exception as e: + self.log.error(f"Worker Exception: {e}") + return False + + async def _erase_flash(self): + self.log.info("Erasing flash...") + cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] + + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode == 0: + self.log.info("Erase successful.") + return True + else: + self.log.error(f"Erase failed: {stderr.decode()}") + return False + + async def _flash_firmware(self): + self.log.info("Flashing firmware...") + # Use relative path for flash_args (must be run from build_dir) + cmd = [ + 'esptool.py', '-p', self.port, '-b', str(self.args.baud), + '--before', 'default_reset', '--after', 'hard_reset', + 'write_flash', '@flash_args' + ] + + proc = await asyncio.create_subprocess_exec( + *cmd, + cwd=self.build_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + # Wait with timeout + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) + except asyncio.TimeoutError: + proc.kill() + self.log.error("Flash timed out.") + return False + + if proc.returncode == 0: + self.log.info("Flash successful.") + return True + else: + self.log.error(f"Flash failed: {stderr.decode()}") + return False + + async def _configure_device(self): + """Connects via Serial, waits for boot, sends config""" + self.log.info("Connecting to console...") + try: + reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) + except Exception as e: + self.log.error(f"Serial open failed: {e}") + return False + + try: + # A. Wait for Boot + self.log.info("Waiting for boot...") + booted = False + end_time = time.time() + 8 # 8s boot timeout + + while time.time() < end_time: + try: + line_b = await asyncio.wait_for(reader.readline(), timeout=0.5) + line = line_b.decode('utf-8', errors='ignore').strip() + if self.regex_ready.search(line): + booted = True + break + except asyncio.TimeoutError: + continue + + if not booted: + # Even if we didn't see the specific line, we might still be able to config + self.log.warning("Boot prompt not detected, trying to config anyway...") + + # B. Send Config + self.log.info(f"Sending config for {self.target_ip}...") + config_str = ( + f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\n" + f"IP:{self.target_ip}\nMASK:{self.args.netmask}\nGW:{self.args.gateway}\n" + f"DHCP:0\nEND\n" + ) + writer.write(config_str.encode('utf-8')) + await writer.drain() + + # C. Verify + self.log.info("Verifying IP...") + start_verify = time.time() + while time.time() < start_verify + 10: + try: + line_b = await asyncio.wait_for(reader.readline(), timeout=1.0) + line = line_b.decode('utf-8', errors='ignore') + m = self.regex_got_ip.search(line) + if m: + if m.group(1) == self.target_ip: + self.log.info(f"{Colors.GREEN}SUCCESS: Configured & Connected{Colors.RESET}") + return True + except asyncio.TimeoutError: + continue + + self.log.error("Config sent, but no IP confirmation received.") + return False + + except Exception as e: + self.log.error(f"Config error: {e}") + return False + finally: + writer.close() + await writer.wait_closed() + +async def main_async(): + parser = argparse.ArgumentParser(description='Async ESP32 Mass Deployment') + parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir') + parser.add_argument('-s', '--ssid', help='WiFi SSID') + parser.add_argument('-p', '--password', help='WiFi Password') + parser.add_argument('--start-ip', default='192.168.1.51', help='Start IP') + parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud') + parser.add_argument('--erase', action='store_true', help='Full erase first') + parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway') + parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') + + args = parser.parse_args() + + project_dir = Path(args.dir).resolve() + build_dir = project_dir / 'build' + + # 1. Build Firmware (Sync blocking) + print(f"{Colors.YELLOW}[1/3] Building Firmware...{Colors.RESET}") + proc = await asyncio.create_subprocess_exec( + 'idf.py', 'build', cwd=project_dir, + stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE + ) + _, stderr = await proc.communicate() + if proc.returncode != 0: + print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}") + return + + if not (build_dir / 'flash_args').exists(): + print(f"{Colors.RED}Error: build/flash_args missing.{Colors.RESET}") + return + print(f"{Colors.GREEN}Build Complete.{Colors.RESET}") + + # 2. Detect Devices + print(f"{Colors.YELLOW}[2/3] Scanning Devices...{Colors.RESET}") + devices = detect_esp32.detect_esp32_devices() + if not devices: + print(f"{Colors.RED}No devices found.{Colors.RESET}") + return + + # Sort naturally + def natural_keys(d): + return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] + devices.sort(key=natural_keys) + + # 3. Deploy + print(f"{Colors.YELLOW}[3/3] Deploying to {len(devices)} devices...{Colors.RESET}") + print(f"Max Concurrent Flashes: {MAX_CONCURRENT_FLASH}") + + try: + start_ip_obj = ipaddress.IPv4Address(args.start_ip) + except: + print("Invalid Start IP") + return + + flash_sem = asyncio.Semaphore(MAX_CONCURRENT_FLASH) + tasks = [] + + for i, dev in enumerate(devices): + target_ip = str(start_ip_obj + i) + worker = DeployWorker(dev.device, target_ip, args, build_dir, flash_sem) + tasks.append(worker.run()) + + results = await asyncio.gather(*tasks) + + # 4. Summary + success = results.count(True) + print(f"\n{Colors.BLUE}{'='*40}{Colors.RESET}") + print(f"Total: {len(devices)}") + print(f"Success: {Colors.GREEN}{success}{Colors.RESET}") + print(f"Failed: {Colors.RED}{len(devices) - success}{Colors.RESET}") + print(f"{Colors.BLUE}{'='*40}{Colors.RESET}") + +if __name__ == '__main__': + try: + # Windows loop fix if needed + if os.name == 'nt': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + asyncio.run(main_async()) + except KeyboardInterrupt: + print("\nCancelled.")