#!/usr/bin/env python3 """ ESP32 Async Fleet Monitor & Recovery Tool Audits 30+ devices concurrently in seconds. """ import asyncio import serial_asyncio import sys import os import argparse import re import detect_esp32 class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' RESET = '\033[0m' # --- Logic: Single Device Audit --- async def audit_device(port): """ Connects to a device, resets it, asks for status, and parses output. """ result = { 'port': port, 'mode': 'Unknown', 'status': 'Unknown', 'led': 'Unknown', 'ip': None, 'failed': False } try: reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) except Exception as e: result['status'] = 'Serial Error' result['failed'] = True return result try: # 1. Reset (DTR/RTS) to ensure fresh state writer.transport.serial.dtr = False; writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False await asyncio.sleep(0.1) writer.transport.serial.dtr = True # 2. Wait for Boot (2s is usually enough for app_main to start) await asyncio.sleep(2.0) # Clear buffer try: await asyncio.wait_for(reader.read(10000), timeout=0.1) except: pass # 3. Send Command writer.write(b'\nmode_status\n') await writer.drain() # 4. Read Response (Timeout 1.5s) response_buffer = "" end_time = asyncio.get_event_loop().time() + 1.5 while asyncio.get_event_loop().time() < end_time: try: data = await asyncio.wait_for(reader.read(1024), timeout=0.5) response_buffer += data.decode('utf-8', errors='ignore') if "GPS synced" in response_buffer: break except asyncio.TimeoutError: continue # 5. Parse Output # Mode m_mode = re.search(r"Current mode: (.*)", response_buffer) if m_mode: result['mode'] = m_mode.group(1).strip() # LED m_led = re.search(r"LED state: (.*)", response_buffer) if m_led: result['led'] = m_led.group(1).strip() # Connection m_conn = re.search(r"WiFi connected: (.*)", response_buffer) if m_conn: if "Yes" in m_conn.group(1): result['status'] = "Connected" else: result['status'] = "Disconnected" if result['mode'] == 'STA': result['failed'] = True # IP m_ip = re.search(r"ip:(\d+\.\d+\.\d+\.\d+)", response_buffer, re.IGNORECASE) if m_ip: result['ip'] = m_ip.group(1) # Failure Logic if "Red" in result['led'] or "Failed" in result['led']: result['failed'] = True if result['mode'] == 'Unknown' and not result['ip']: # Only mark as failed if we got absolutely nothing intelligible if len(response_buffer) < 10: result['status'] = "No Response" result['failed'] = True else: result['status'] = "Logs Only (No Status)" except Exception as e: result['status'] = f"Error: {e}" result['failed'] = True finally: writer.close() await writer.wait_closed() return result # --- Logic: Recovery Actions --- async def send_command_async(port, command): try: reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) writer.write(f"\n{command}\n".encode('utf-8')) await writer.drain() writer.close() await writer.wait_closed() return True except: return False async def reboot_async(port): try: reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200) writer.transport.serial.dtr = False; writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False await asyncio.sleep(0.1) writer.transport.serial.dtr = True writer.close() await writer.wait_closed() return True except: return False # --- Main --- async def main_async(): print(f"{Colors.BLUE}{'='*80}{Colors.RESET}") print(f"ESP32 Async Fleet Auditor") print(f"{Colors.BLUE}{'='*80}{Colors.RESET}") devices = detect_esp32.detect_esp32_devices() if not devices: print("No devices found.") return # Sort def natural_keys(d): return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] devices.sort(key=natural_keys) print(f"Scanning {len(devices)} devices concurrently...") # Run Audit tasks = [audit_device(dev.device) for dev in devices] results = await asyncio.gather(*tasks) # Print Table print(f"\n{'PORT':<15} {'MODE':<10} {'STATUS':<25} {'LED STATE':<25}") print("-" * 80) failed_list = [] for res in results: # Colorize c_stat = Colors.GREEN if res['status'] == 'Connected' else Colors.YELLOW if res['failed']: c_stat = Colors.RED c_led = Colors.RESET if "Red" in res['led']: c_led = Colors.RED elif "Green" in res['led']: c_led = Colors.GREEN elif "Blue" in res['led']: c_led = Colors.BLUE status_txt = res['status'] if res['ip']: status_txt += f" ({res['ip']})" print(f"{res['port']:<15} {res['mode']:<10} {c_stat}{status_txt:<25}{Colors.RESET} {c_led}{res['led']:<25}{Colors.RESET}") if res['failed']: failed_list.append(res['port']) # Recovery Menu if not failed_list: print(f"\n{Colors.GREEN}✓ All systems nominal.{Colors.RESET}") return print(f"\n{Colors.RED}Found {len(failed_list)} failed device(s).{Colors.RESET}") print("Options:") print(" [1] Send 'mode_sta' (Soft Reconnect)") print(" [2] Hardware Reboot") print(" [3] Exit") # Note: Input is blocking, but that's fine for a menu at the end choice = input("\nEnter choice: ").strip() if choice == '1': print(f"Sending 'mode_sta' to {len(failed_list)} devices...") tasks = [send_command_async(p, "mode_sta") for p in failed_list] await asyncio.gather(*tasks) print("Done.") elif choice == '2': print(f"Rebooting {len(failed_list)} devices...") tasks = [reboot_async(p) for p in failed_list] await asyncio.gather(*tasks) print("Done.") if __name__ == '__main__': try: if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.run(main_async()) except KeyboardInterrupt: print("\nExit.")