more on build

This commit is contained in:
Bob 2025-12-11 16:18:00 -08:00
parent a62790cbb4
commit 62b26be138
2 changed files with 121 additions and 36 deletions

View File

@ -3,11 +3,10 @@
ESP32 Unified Deployment Tool (esp32_deploy) ESP32 Unified Deployment Tool (esp32_deploy)
Combines firmware flashing and device configuration with full control. Combines firmware flashing and device configuration with full control.
Updates: Updates:
- FIXED: Indentation errors in class methods - FIXED: Reset logic (DTR=False) to ensure App boot instead of Bootloader
- FIXED: Overlap error caused by swapping ota_data_initial.bin with main app - AUTO-DETECT: Prioritizes /dev/esp_port_* (udev rules)
- '--target auto' support for mixed-device flashing - ROBUSTNESS: Merged "poking" and "retry" logic
- 'target all' support (Build 12 configurations) - 'target all' support
- Unique binary naming and 'firmware/' persistence
""" """
import asyncio import asyncio
@ -20,6 +19,7 @@ import re
import time import time
import shutil import shutil
import logging import logging
import glob
from pathlib import Path from pathlib import Path
# Ensure detection script is available # Ensure detection script is available
@ -64,6 +64,19 @@ def generate_config_suffix(target, csi, ampdu):
ampdu_str = "ampdu_on" if ampdu else "ampdu_off" ampdu_str = "ampdu_on" if ampdu else "ampdu_off"
return f"{target}_{csi_str}_{ampdu_str}" return f"{target}_{csi_str}_{ampdu_str}"
def auto_detect_devices():
"""Prioritizes static udev paths (/dev/esp_port_XX) if they exist."""
try:
ports = glob.glob('/dev/esp_port_*')
if ports:
# Sort by suffix number
ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0)
print(f"{Colors.CYAN}Auto-detected {len(ports)} devices using static udev rules.{Colors.RESET}")
return [type('obj', (object,), {'device': p}) for p in ports]
except Exception:
pass
return detect_esp32.detect_esp32_devices()
class UnifiedDeployWorker: class UnifiedDeployWorker:
def __init__(self, port, target_ip, args, project_dir, flash_sem): def __init__(self, port, target_ip, args, project_dir, flash_sem):
self.port = port self.port = port
@ -74,7 +87,7 @@ class UnifiedDeployWorker:
self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.log = DeviceLoggerAdapter(logger, {'connid': port})
self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)')
# We look for "Entering idle loop" (App Ready) or the prompt # Matches the log from your updated main.c
self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE) self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE)
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_csi_saved = re.compile(r'CSI enable state saved|Config saved', re.IGNORECASE) self.regex_csi_saved = re.compile(r'CSI enable state saved|Config saved', re.IGNORECASE)
@ -86,12 +99,12 @@ class UnifiedDeployWorker:
if self.args.flash_erase: if self.args.flash_erase:
if not await self._erase_flash(): return False if not await self._erase_flash(): return False
if not await self._flash_firmware(): return False if not await self._flash_firmware(): return False
# Wait for flash tool to release port # Give it a moment to stabilize after flash reset
await asyncio.sleep(1.0) await asyncio.sleep(2.0)
if not self.args.flash_only: if not self.args.flash_only:
if self.args.ssid and self.args.password: if self.args.ssid and self.args.password:
# RETRY LOOP # Retry logic
success = False success = False
for attempt in range(1, 4): for attempt in range(1, 4):
self.log.info(f"Configuring (Attempt {attempt}/3)...") self.log.info(f"Configuring (Attempt {attempt}/3)...")
@ -217,30 +230,34 @@ class UnifiedDeployWorker:
try: try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e: except Exception as e:
self.log.error(f"Serial open failed: {e}")
return False return False
try: try:
# 1. Reset (Standard Sequence: DTR=0, RTS=0 -> Idle) # 1. Reset
# Assert Reset (EN=L)
writer.transport.serial.dtr = False writer.transport.serial.dtr = False
writer.transport.serial.rts = True writer.transport.serial.rts = True
await asyncio.sleep(0.2) await asyncio.sleep(0.1)
# Release Reset (EN=H)
writer.transport.serial.rts = False writer.transport.serial.rts = False
# Ensure DTR is Idle (IO0=H) # FIX: DTR Must be False to allow Booting (True=Low=Bootloader Mode)
writer.transport.serial.dtr = False writer.transport.serial.dtr = False
# 2. Wait for App (Active Poking) # 2. Robust Wait (with Poke)
if not await self._wait_for_boot(reader, writer): if not await self._wait_for_boot(reader, writer):
self.log.warning("Boot prompt missed (sending blindly)...") self.log.warning("Boot prompt missed (sending blindly)...")
# 3. Send Config # 3. Send
await self._send_config(writer) await self._send_config(writer)
# 4. Verify # 4. Verify
if await self._verify_configuration(reader): is_configured = await self._verify_configuration(reader)
if is_configured:
self.log.info(f"{Colors.GREEN}Config verified.{Colors.RESET}") self.log.info(f"{Colors.GREEN}Config verified.{Colors.RESET}")
# Final Reset to apply
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
return True return True
else: else:
self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}") self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}")
@ -254,29 +271,24 @@ class UnifiedDeployWorker:
await writer.wait_closed() await writer.wait_closed()
async def _wait_for_boot(self, reader, writer): async def _wait_for_boot(self, reader, writer):
# Timeout 20s to cover GPS delay # Timeout covers GPS delay (~3.5s) + boot overhead
end_time = time.time() + 20 end_time = time.time() + 12
last_poke = time.time() last_poke = time.time()
self.log.info("Waiting for boot logs...")
while time.time() < end_time: while time.time() < end_time:
try: try:
# Poke every 2 seconds to wake up console # Poke every 1.5 seconds if we haven't seen the prompt
if time.time() - last_poke > 2.0: if time.time() - last_poke > 1.5:
writer.write(b'\r\n') writer.write(b'\n')
await writer.drain() await writer.drain()
last_poke = time.time() last_poke = time.time()
# Read with short timeout
try: try:
# Short timeout to allow polling loop
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1) line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1)
line = line_bytes.decode('utf-8', errors='ignore').strip() line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue if not line: continue
# DEBUG LOG: Show us what the device is saying!
# print(f"[{self.port}] [Log] {line}")
if self.regex_ready.search(line): if self.regex_ready.search(line):
return True return True
except asyncio.TimeoutError: except asyncio.TimeoutError:
@ -288,9 +300,11 @@ class UnifiedDeployWorker:
return False return False
async def _send_config(self, writer): async def _send_config(self, writer):
# 1. Clear buffer with a newline # Wait a moment for any last boot logs to clear
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
writer.write(b'\r\n')
# Wake up console
writer.write(b'\n')
await writer.drain() await writer.drain()
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
@ -324,12 +338,12 @@ class UnifiedDeployWorker:
"END" "END"
] ]
# 2. Send Line-by-Line to prevent FIFO Overflow (128 bytes max) # CHANGED: Send line-by-line with a small delay to prevent UART FIFO overflow
for line in config_lines: for line in config_lines:
cmd = line + "\r\n" cmd = line + "\r\n"
writer.write(cmd.encode('utf-8')) writer.write(cmd.encode('utf-8'))
await writer.drain() await writer.drain()
# 50ms delay between lines allows the ESP32 task to empty the FIFO # 50ms delay allows the ESP32 (running at 115200 baud) to process the line
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
async def _verify_configuration(self, reader): async def _verify_configuration(self, reader):
@ -340,8 +354,6 @@ class UnifiedDeployWorker:
line = line_bytes.decode('utf-8', errors='ignore').strip() line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue if not line: continue
# print(f"[{self.port}] [Verify] {line}") # Debug
if self.regex_csi_saved.search(line): return True if self.regex_csi_saved.search(line): return True
m = self.regex_got_ip.search(line) m = self.regex_got_ip.search(line)
if m and m.group(1) == self.target_ip: return True if m and m.group(1) == self.target_ip: return True
@ -549,8 +561,13 @@ async def run_deployment(args):
if args.devices: if args.devices:
devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')] devs = [type('obj', (object,), {'device': d.strip()}) for d in args.devices.split(',')]
else: else:
devs = detect_esp32.detect_esp32_devices() # Use AUTO DETECT first (for static names), then standard fallback
devs = auto_detect_devices()
if not devs: print("No devices found"); return if not devs: print("No devices found"); return
# Sort naturally (esp_port_01 before esp_port_10)
# We rely on the internal sort of auto_detect or detect_esp32,
# but a final sort by string length/digits helps with mixing types.
devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]) devs.sort(key=lambda d: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)])
print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}") print(f"\n{Colors.GREEN}Found {len(devs)} devices{Colors.RESET}")

68
gen_udev_rules.py Executable file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import os
import pyudev
def generate_rules():
context = pyudev.Context()
# Find all TTY devices driven by usb-serial drivers (CP210x, FTDI, etc.)
devices = []
for device in context.list_devices(subsystem='tty'):
if 'ID_VENDOR_ID' in device.properties and 'ID_MODEL_ID' in device.properties:
# We filter for USB serial devices only
parent = device.find_parent('usb', 'usb_interface')
if parent:
devices.append(device)
# Sort by their physical path so they are ordered by hub port
# The 'DEVPATH' usually looks like .../usb1/1-2/1-2.3/1-2.3.4...
devices.sort(key=lambda x: x.properties.get('DEVPATH', ''))
print(f"# Detected {len(devices)} devices. Generating rules...\n")
rules = []
hub_counter = 1
port_counter = 1
last_parent_path = None
for dev in devices:
# Get the unique physical path identifier (KERNELS)
# We need the parent USB interface kernel name (e.g., '1-1.2:1.0')
parent = dev.find_parent('usb', 'usb_interface')
if not parent: continue
kernels_path = parent.device_path.split('/')[-1]
# Simple heuristic to detect a new hub (big jump in path length or numbering)
# You might need to manually tweak the hub numbers in the file later.
# Generate the rule
# KERNELS matches the physical port.
# SYMLINK creates the static alias.
rule = f'SUBSYSTEM=="tty", KERNELS=="{kernels_path}", SYMLINK+="esp_port_{len(rules)+1:02d}"'
rules.append(rule)
print(f"Mapped {dev.device_node} ({kernels_path}) -> /dev/esp_port_{len(rules):02d}")
# Write to file
with open("99-esp32-static.rules", "w") as f:
f.write("# Persistent USB Serial mapping for ESP32 Hubs\n")
f.write("# Generated automatically. Do not edit unless topology changes.\n\n")
for r in rules:
f.write(r + "\n")
print(f"\n{'-'*60}")
print(f"SUCCESS: Generated {len(rules)} rules in '99-esp32-static.rules'.")
print(f"To install:\n 1. Review the file to ensure order is correct.")
print(f" 2. sudo cp 99-esp32-static.rules /etc/udev/rules.d/")
print(f" 3. sudo udevadm control --reload-rules")
print(f" 4. sudo udevadm trigger")
print(f"{'-'*60}")
if __name__ == '__main__':
# Requires 'pyudev'. Install with: sudo dnf install python3-pyudev (or pip install pyudev)
try:
import pyudev
generate_rules()
except ImportError:
print("Error: This script requires 'pyudev'.")
print("Install it via: pip install pyudev")