diff --git a/esp32_deploy.py b/esp32_deploy.py index 2cec440..f30d772 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -3,11 +3,10 @@ ESP32 Unified Deployment Tool (esp32_deploy) Combines firmware flashing and device configuration with full control. Updates: - - FIXED: Indentation errors in class methods - - FIXED: Overlap error caused by swapping ota_data_initial.bin with main app - - '--target auto' support for mixed-device flashing - - 'target all' support (Build 12 configurations) - - Unique binary naming and 'firmware/' persistence + - FIXED: Reset logic (DTR=False) to ensure App boot instead of Bootloader + - AUTO-DETECT: Prioritizes /dev/esp_port_* (udev rules) + - ROBUSTNESS: Merged "poking" and "retry" logic + - 'target all' support """ import asyncio @@ -20,6 +19,7 @@ import re import time import shutil import logging +import glob from pathlib import Path # Ensure detection script is available @@ -64,6 +64,19 @@ def generate_config_suffix(target, csi, ampdu): ampdu_str = "ampdu_on" if ampdu else "ampdu_off" return f"{target}_{csi_str}_{ampdu_str}" +def auto_detect_devices(): + """Prioritizes static udev paths (/dev/esp_port_XX) if they exist.""" + try: + ports = glob.glob('/dev/esp_port_*') + if ports: + # Sort by suffix number + ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0) + print(f"{Colors.CYAN}Auto-detected {len(ports)} devices using static udev rules.{Colors.RESET}") + return [type('obj', (object,), {'device': p}) for p in ports] + except Exception: + pass + return detect_esp32.detect_esp32_devices() + class UnifiedDeployWorker: def __init__(self, port, target_ip, args, project_dir, flash_sem): self.port = port @@ -74,7 +87,7 @@ class UnifiedDeployWorker: self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') - # We look for "Entering idle loop" (App Ready) or the prompt + # Matches the log from your updated main.c self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved|Config saved', re.IGNORECASE) @@ -86,12 +99,12 @@ class UnifiedDeployWorker: if self.args.flash_erase: if not await self._erase_flash(): return False if not await self._flash_firmware(): return False - # Wait for flash tool to release port - await asyncio.sleep(1.0) + # Give it a moment to stabilize after flash reset + await asyncio.sleep(2.0) if not self.args.flash_only: if self.args.ssid and self.args.password: - # RETRY LOOP + # Retry logic success = False for attempt in range(1, 4): self.log.info(f"Configuring (Attempt {attempt}/3)...") @@ -217,30 +230,34 @@ class UnifiedDeployWorker: try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) except Exception as e: - self.log.error(f"Serial open failed: {e}") return False try: - # 1. Reset (Standard Sequence: DTR=0, RTS=0 -> Idle) - # Assert Reset (EN=L) + # 1. Reset writer.transport.serial.dtr = False writer.transport.serial.rts = True - await asyncio.sleep(0.2) - # Release Reset (EN=H) + await asyncio.sleep(0.1) writer.transport.serial.rts = False - # Ensure DTR is Idle (IO0=H) + # FIX: DTR Must be False to allow Booting (True=Low=Bootloader Mode) writer.transport.serial.dtr = False - # 2. Wait for App (Active Poking) + # 2. Robust Wait (with Poke) if not await self._wait_for_boot(reader, writer): self.log.warning("Boot prompt missed (sending blindly)...") - # 3. Send Config + # 3. Send await self._send_config(writer) # 4. Verify - if await self._verify_configuration(reader): + is_configured = await self._verify_configuration(reader) + + if is_configured: self.log.info(f"{Colors.GREEN}Config verified.{Colors.RESET}") + # Final Reset to apply + writer.transport.serial.dtr = False + writer.transport.serial.rts = True + await asyncio.sleep(0.1) + writer.transport.serial.rts = False return True else: self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}") @@ -254,29 +271,24 @@ class UnifiedDeployWorker: await writer.wait_closed() async def _wait_for_boot(self, reader, writer): - # Timeout 20s to cover GPS delay - end_time = time.time() + 20 + # Timeout covers GPS delay (~3.5s) + boot overhead + end_time = time.time() + 12 last_poke = time.time() - self.log.info("Waiting for boot logs...") - while time.time() < end_time: try: - # Poke every 2 seconds to wake up console - if time.time() - last_poke > 2.0: - writer.write(b'\r\n') + # Poke every 1.5 seconds if we haven't seen the prompt + if time.time() - last_poke > 1.5: + writer.write(b'\n') await writer.drain() last_poke = time.time() - # Read with short timeout try: + # Short timeout to allow polling loop line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1) line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue - # DEBUG LOG: Show us what the device is saying! - # print(f"[{self.port}] [Log] {line}") - if self.regex_ready.search(line): return True except asyncio.TimeoutError: @@ -288,9 +300,11 @@ class UnifiedDeployWorker: return False async def _send_config(self, writer): - # 1. Clear buffer with a newline + # Wait a moment for any last boot logs to clear await asyncio.sleep(0.5) - writer.write(b'\r\n') + + # Wake up console + writer.write(b'\n') await writer.drain() await asyncio.sleep(0.2) @@ -324,12 +338,12 @@ class UnifiedDeployWorker: "END" ] - # 2. Send Line-by-Line to prevent FIFO Overflow (128 bytes max) + # CHANGED: Send line-by-line with a small delay to prevent UART FIFO overflow for line in config_lines: cmd = line + "\r\n" writer.write(cmd.encode('utf-8')) await writer.drain() - # 50ms delay between lines allows the ESP32 task to empty the FIFO + # 50ms delay allows the ESP32 (running at 115200 baud) to process the line await asyncio.sleep(0.05) async def _verify_configuration(self, reader): @@ -340,8 +354,6 @@ class UnifiedDeployWorker: line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: continue - # print(f"[{self.port}] [Verify] {line}") # Debug - if self.regex_csi_saved.search(line): return True m = self.regex_got_ip.search(line) if m and m.group(1) == self.target_ip: return True @@ -549,8 +561,13 @@ async def run_deployment(args): if args.devices: devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')] else: - devs = detect_esp32.detect_esp32_devices() + # Use AUTO DETECT first (for static names), then standard fallback + devs = auto_detect_devices() if not devs: print("No devices found"); return + + # Sort naturally (esp_port_01 before esp_port_10) + # We rely on the internal sort of auto_detect or detect_esp32, + # but a final sort by string length/digits helps with mixing types. devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]) print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}") diff --git a/gen_udev_rules.py b/gen_udev_rules.py new file mode 100755 index 0000000..3cb913d --- /dev/null +++ b/gen_udev_rules.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import os +import pyudev + +def generate_rules(): + context = pyudev.Context() + + # Find all TTY devices driven by usb-serial drivers (CP210x, FTDI, etc.) + devices = [] + for device in context.list_devices(subsystem='tty'): + if 'ID_VENDOR_ID' in device.properties and 'ID_MODEL_ID' in device.properties: + # We filter for USB serial devices only + parent = device.find_parent('usb', 'usb_interface') + if parent: + devices.append(device) + + # Sort by their physical path so they are ordered by hub port + # The 'DEVPATH' usually looks like .../usb1/1-2/1-2.3/1-2.3.4... + devices.sort(key=lambda x: x.properties.get('DEVPATH', '')) + + print(f"# Detected {len(devices)} devices. Generating rules...\n") + + rules = [] + hub_counter = 1 + port_counter = 1 + last_parent_path = None + + for dev in devices: + # Get the unique physical path identifier (KERNELS) + # We need the parent USB interface kernel name (e.g., '1-1.2:1.0') + parent = dev.find_parent('usb', 'usb_interface') + if not parent: continue + + kernels_path = parent.device_path.split('/')[-1] + + # Simple heuristic to detect a new hub (big jump in path length or numbering) + # You might need to manually tweak the hub numbers in the file later. + + # Generate the rule + # KERNELS matches the physical port. + # SYMLINK creates the static alias. + rule = f'SUBSYSTEM=="tty", KERNELS=="{kernels_path}", SYMLINK+="esp_port_{len(rules)+1:02d}"' + rules.append(rule) + print(f"Mapped {dev.device_node} ({kernels_path}) -> /dev/esp_port_{len(rules):02d}") + + # Write to file + with open("99-esp32-static.rules", "w") as f: + f.write("# Persistent USB Serial mapping for ESP32 Hubs\n") + f.write("# Generated automatically. Do not edit unless topology changes.\n\n") + for r in rules: + f.write(r + "\n") + + print(f"\n{'-'*60}") + print(f"SUCCESS: Generated {len(rules)} rules in '99-esp32-static.rules'.") + print(f"To install:\n 1. Review the file to ensure order is correct.") + print(f" 2. sudo cp 99-esp32-static.rules /etc/udev/rules.d/") + print(f" 3. sudo udevadm control --reload-rules") + print(f" 4. sudo udevadm trigger") + print(f"{'-'*60}") + +if __name__ == '__main__': + # Requires 'pyudev'. Install with: sudo dnf install python3-pyudev (or pip install pyudev) + try: + import pyudev + generate_rules() + except ImportError: + print("Error: This script requires 'pyudev'.") + print("Install it via: pip install pyudev")