ESP32/async_batch_config_pro.py

230 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
ESP32 Async Batch Configuration Tool (Pro Version)
Incorporates architectural patterns from flows.py:
- Regex-based state detection
- Event-driven synchronization
- Context-aware logging
"""
import asyncio
import serial_asyncio
import sys
import os
import argparse
import ipaddress
import re
import time
import logging
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Logging Setup (Borrowed concept from flows.py) ---
class DeviceLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("BatchConfig")
class Esp32Configurator:
"""
Manages the lifecycle of configuring a single ESP32 device.
Uses regex patterns similar to iperf_client in flows.py.
"""
def __init__(self, port, target_ip, config_args):
self.port = port
self.target_ip = target_ip
self.args = config_args
self.log = DeviceLoggerAdapter(logger, {'connid': port})
# Regex Patterns (Inspired by flows.py lines 350+)
# We pre-compile these for efficiency
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_wifi_connected = re.compile(r'WiFi connected: Yes', re.IGNORECASE)
self.regex_config_saved = re.compile(r'Config saved', re.IGNORECASE)
self.regex_ready_prompt = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE)
self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE)
async def run(self):
"""Main coroutine for this device"""
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
self.log.error(f"Failed to open port: {e}")
return False
try:
# 1. Hardware Reset via DTR/RTS
self.log.info("Resetting...")
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
await asyncio.sleep(0.1)
writer.transport.serial.dtr = True
# 2. Monitor Boot Stream
# We wait until the device settles or we see a prompt
await self._wait_for_boot(reader)
# 3. Send Configuration
await self._send_config(writer)
# 4. Verification Loop
success = await self._verify_connection(reader)
return success
except Exception as e:
self.log.error(f"Process Exception: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _wait_for_boot(self, reader):
"""Consumes boot logs until device looks ready"""
self.log.info("Waiting for boot...")
# Give it a max of 5 seconds to settle or show a prompt
end_time = time.time() + 5
while time.time() < end_time:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
# Check if device is ready to accept commands
if self.regex_ready_prompt.search(line):
self.log.info("Device ready detected.")
return
except asyncio.TimeoutError:
# If silence for 0.5s, it's probably waiting
continue
async def _send_config(self, writer):
"""Constructs and writes the config block"""
self.log.info(f"Sending config for IP {self.target_ip}...")
config_str = (
f"CFG\n"
f"SSID:{self.args.ssid}\n"
f"PASS:{self.args.password}\n"
f"IP:{self.target_ip}\n"
f"MASK:{self.args.netmask}\n"
f"GW:{self.args.gateway}\n"
f"DHCP:0\n"
f"BAND:{self.args.band}\n"
f"BW:{self.args.bandwidth}\n"
f"POWERSAVE:{self.args.powersave}\n"
f"MODE:{self.args.mode}\n"
f"MON_CH:{self.args.monitor_channel}\n"
f"END\n"
)
# Flush input buffer before writing to ensure clean state
# (Note: asyncio streams don't have a direct flush_input, relies on OS)
writer.write(config_str.encode('utf-8'))
await writer.drain()
async def _verify_connection(self, reader):
"""Reads stream verifying IP assignment"""
self.log.info("Verifying configuration...")
end_time = time.time() + 15 # 15s Timeout
while time.time() < end_time:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
# Regex Checks
m_ip = self.regex_got_ip.search(line)
if m_ip:
got_ip = m_ip.group(1)
if got_ip == self.target_ip:
self.log.info(f"SUCCESS: Assigned {got_ip}")
return True
else:
self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}")
if self.regex_error.search(line):
self.log.warning(f"Device reported error: {line}")
except asyncio.TimeoutError:
continue
self.log.error("Timeout waiting for IP confirmation.")
return False
async def main_async():
parser = argparse.ArgumentParser(description='Async ESP32 Batch Config (Pro)')
parser.add_argument('--start-ip', required=True, help='Start IP')
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
parser.add_argument('-P', '--password', default='ez2remember')
parser.add_argument('-g', '--gateway', default='192.168.1.1')
parser.add_argument('-m', '--netmask', default='255.255.255.0')
parser.add_argument('-b', '--band', default='2.4G')
parser.add_argument('-B', '--bandwidth', default='HT20')
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('-M', '--mode', default='STA')
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
args = parser.parse_args()
# 1. Detect
print("Scanning devices...")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print("No devices found.")
return
# Sort naturally
def natural_keys(d):
return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]
devices.sort(key=natural_keys)
# 2. Parse IP
try:
start_ip_obj = ipaddress.IPv4Address(args.start_ip)
except:
print("Invalid IP")
return
# 3. Create Tasks
tasks = []
print(f"Configuring {len(devices)} devices concurrently...")
for i, dev in enumerate(devices):
current_ip = str(start_ip_obj + i)
configurator = Esp32Configurator(dev.device, current_ip, args)
tasks.append(configurator.run())
# 4. Run All
results = await asyncio.gather(*tasks)
# 5. Summary
success_count = results.count(True)
print("\n" + "="*40)
print(f"Total: {len(devices)}")
print(f"Success: {success_count}")
print(f"Failed: {len(devices) - success_count}")
print("="*40)
if __name__ == '__main__':
try:
# Windows/Linux loop compatibility handling (borrowed from ssh_node.py lines 60-65)
if os.name == 'nt':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
asyncio.run(main_async())
except KeyboardInterrupt:
print("\nCancelled.")