221 lines
6.7 KiB
Python
Executable File
221 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 Async Fleet Monitor & Recovery Tool
|
|
Audits 30+ devices concurrently in seconds.
|
|
"""
|
|
|
|
import asyncio
|
|
import serial_asyncio
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import re
|
|
import detect_esp32
|
|
|
|
class Colors:
|
|
GREEN = '\033[92m'
|
|
RED = '\033[91m'
|
|
YELLOW = '\033[93m'
|
|
BLUE = '\033[94m'
|
|
RESET = '\033[0m'
|
|
|
|
# --- Logic: Single Device Audit ---
|
|
async def audit_device(port):
|
|
"""
|
|
Connects to a device, resets it, asks for status, and parses output.
|
|
"""
|
|
result = {
|
|
'port': port,
|
|
'mode': 'Unknown',
|
|
'status': 'Unknown',
|
|
'led': 'Unknown',
|
|
'ip': None,
|
|
'failed': False
|
|
}
|
|
|
|
try:
|
|
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200)
|
|
except Exception as e:
|
|
result['status'] = 'Serial Error'
|
|
result['failed'] = True
|
|
return result
|
|
|
|
try:
|
|
# 1. Reset (DTR/RTS) to ensure fresh state
|
|
writer.transport.serial.dtr = False; writer.transport.serial.rts = True
|
|
await asyncio.sleep(0.1)
|
|
writer.transport.serial.rts = False
|
|
await asyncio.sleep(0.1)
|
|
writer.transport.serial.dtr = True
|
|
|
|
# 2. Wait for Boot (2s is usually enough for app_main to start)
|
|
await asyncio.sleep(2.0)
|
|
|
|
# Clear buffer
|
|
try: await asyncio.wait_for(reader.read(10000), timeout=0.1)
|
|
except: pass
|
|
|
|
# 3. Send Command
|
|
writer.write(b'\nmode_status\n')
|
|
await writer.drain()
|
|
|
|
# 4. Read Response (Timeout 1.5s)
|
|
response_buffer = ""
|
|
end_time = asyncio.get_event_loop().time() + 1.5
|
|
|
|
while asyncio.get_event_loop().time() < end_time:
|
|
try:
|
|
data = await asyncio.wait_for(reader.read(1024), timeout=0.5)
|
|
response_buffer += data.decode('utf-8', errors='ignore')
|
|
if "GPS synced" in response_buffer: break
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
# 5. Parse Output
|
|
# Mode
|
|
m_mode = re.search(r"Current mode: (.*)", response_buffer)
|
|
if m_mode: result['mode'] = m_mode.group(1).strip()
|
|
|
|
# LED
|
|
m_led = re.search(r"LED state: (.*)", response_buffer)
|
|
if m_led: result['led'] = m_led.group(1).strip()
|
|
|
|
# Connection
|
|
m_conn = re.search(r"WiFi connected: (.*)", response_buffer)
|
|
if m_conn:
|
|
if "Yes" in m_conn.group(1):
|
|
result['status'] = "Connected"
|
|
else:
|
|
result['status'] = "Disconnected"
|
|
if result['mode'] == 'STA': result['failed'] = True
|
|
|
|
# IP
|
|
m_ip = re.search(r"ip:(\d+\.\d+\.\d+\.\d+)", response_buffer, re.IGNORECASE)
|
|
if m_ip: result['ip'] = m_ip.group(1)
|
|
|
|
# Failure Logic
|
|
if "Red" in result['led'] or "Failed" in result['led']:
|
|
result['failed'] = True
|
|
if result['mode'] == 'Unknown' and not result['ip']:
|
|
# Only mark as failed if we got absolutely nothing intelligible
|
|
if len(response_buffer) < 10:
|
|
result['status'] = "No Response"
|
|
result['failed'] = True
|
|
else:
|
|
result['status'] = "Logs Only (No Status)"
|
|
|
|
except Exception as e:
|
|
result['status'] = f"Error: {e}"
|
|
result['failed'] = True
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
return result
|
|
|
|
# --- Logic: Recovery Actions ---
|
|
async def send_command_async(port, command):
|
|
try:
|
|
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200)
|
|
writer.write(f"\n{command}\n".encode('utf-8'))
|
|
await writer.drain()
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
async def reboot_async(port):
|
|
try:
|
|
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=115200)
|
|
writer.transport.serial.dtr = False; writer.transport.serial.rts = True
|
|
await asyncio.sleep(0.1)
|
|
writer.transport.serial.rts = False
|
|
await asyncio.sleep(0.1)
|
|
writer.transport.serial.dtr = True
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
# --- Main ---
|
|
async def main_async():
|
|
print(f"{Colors.BLUE}{'='*80}{Colors.RESET}")
|
|
print(f"ESP32 Async Fleet Auditor")
|
|
print(f"{Colors.BLUE}{'='*80}{Colors.RESET}")
|
|
|
|
devices = detect_esp32.detect_esp32_devices()
|
|
if not devices:
|
|
print("No devices found.")
|
|
return
|
|
|
|
# Sort
|
|
def natural_keys(d):
|
|
return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]
|
|
devices.sort(key=natural_keys)
|
|
|
|
print(f"Scanning {len(devices)} devices concurrently...")
|
|
|
|
# Run Audit
|
|
tasks = [audit_device(dev.device) for dev in devices]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# Print Table
|
|
print(f"\n{'PORT':<15} {'MODE':<10} {'STATUS':<25} {'LED STATE':<25}")
|
|
print("-" * 80)
|
|
|
|
failed_list = []
|
|
|
|
for res in results:
|
|
# Colorize
|
|
c_stat = Colors.GREEN if res['status'] == 'Connected' else Colors.YELLOW
|
|
if res['failed']: c_stat = Colors.RED
|
|
|
|
c_led = Colors.RESET
|
|
if "Red" in res['led']: c_led = Colors.RED
|
|
elif "Green" in res['led']: c_led = Colors.GREEN
|
|
elif "Blue" in res['led']: c_led = Colors.BLUE
|
|
|
|
status_txt = res['status']
|
|
if res['ip']: status_txt += f" ({res['ip']})"
|
|
|
|
print(f"{res['port']:<15} {res['mode']:<10} {c_stat}{status_txt:<25}{Colors.RESET} {c_led}{res['led']:<25}{Colors.RESET}")
|
|
|
|
if res['failed']:
|
|
failed_list.append(res['port'])
|
|
|
|
# Recovery Menu
|
|
if not failed_list:
|
|
print(f"\n{Colors.GREEN}✓ All systems nominal.{Colors.RESET}")
|
|
return
|
|
|
|
print(f"\n{Colors.RED}Found {len(failed_list)} failed device(s).{Colors.RESET}")
|
|
print("Options:")
|
|
print(" [1] Send 'mode_sta' (Soft Reconnect)")
|
|
print(" [2] Hardware Reboot")
|
|
print(" [3] Exit")
|
|
|
|
# Note: Input is blocking, but that's fine for a menu at the end
|
|
choice = input("\nEnter choice: ").strip()
|
|
|
|
if choice == '1':
|
|
print(f"Sending 'mode_sta' to {len(failed_list)} devices...")
|
|
tasks = [send_command_async(p, "mode_sta") for p in failed_list]
|
|
await asyncio.gather(*tasks)
|
|
print("Done.")
|
|
|
|
elif choice == '2':
|
|
print(f"Rebooting {len(failed_list)} devices...")
|
|
tasks = [reboot_async(p) for p in failed_list]
|
|
await asyncio.gather(*tasks)
|
|
print("Done.")
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
if os.name == 'nt':
|
|
asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
|
asyncio.run(main_async())
|
|
except KeyboardInterrupt:
|
|
print("\nExit.")
|