From feb0d4d142dc964d634e4786be19fcdf902a6625 Mon Sep 17 00:00:00 2001 From: Robert McMahon Date: Sat, 27 Dec 2025 16:42:09 -0800 Subject: [PATCH] Remove deprecated Python scripts and clean up repository - Removed 13 deprecated deployment/configuration scripts superseded by esp32_deploy.py: * flash_all.py, flash_all_parallel.py, flash_all_serial_config.py * flash_and_config.py, mass_deploy.py, async_mass_deploy.py * async_batch_config.py, batch_config.py, config_device.py * esp32_reconfig.py, reconfig_simple.py, reconfig_simple_nextip.py * map_usb_to_ip.py - Updated .gitignore to exclude: * Emacs backup files (#filename#) * firmware/ directory and flash_args_* build artifacts - Repository now contains only active scripts: * esp32_deploy.py (main unified deployment tool) * detect_esp32.py, control_iperf.py, parse_csi.py * add_license.py, gen_udev_rules.py, identiy_port.py * async_find_failed.py (diagnostic utility) --- .gitignore | 4 + async_batch_config.py | 267 ----------------------- async_mass_deploy.py | 280 ------------------------ batch_config.py | 158 -------------- config_device.py | 389 --------------------------------- dependencies.lock | 21 ++ esp32_deploy.py | 162 +++++++++++--- esp32_reconfig.py | 235 -------------------- flash_all.py | 187 ---------------- flash_all_parallel.py | 432 ------------------------------------- flash_all_serial_config.py | 72 ------- flash_and_config.py | 198 ----------------- gen_udev_rules.py | 285 +++++++++++++++++++----- leddiff.txt | 188 ++++++++++++++++ main/idf_component.yml | 17 ++ map_usb_to_ip.py | 244 --------------------- mass_deploy.py | 319 --------------------------- new_rules.part | 21 ++ reconfig_simple.py | 167 -------------- reconfig_simple_nextip.py | 315 --------------------------- 20 files changed, 619 insertions(+), 3342 deletions(-) delete mode 100755 async_batch_config.py delete mode 100755 async_mass_deploy.py delete mode 100755 batch_config.py delete mode 100755 config_device.py create mode 100644 dependencies.lock delete mode 100755 esp32_reconfig.py delete mode 100755 flash_all.py delete mode 100755 flash_all_parallel.py delete mode 100644 flash_all_serial_config.py delete mode 100755 flash_and_config.py create mode 100644 leddiff.txt create mode 100644 main/idf_component.yml delete mode 100755 map_usb_to_ip.py delete mode 100755 mass_deploy.py create mode 100644 new_rules.part delete mode 100755 reconfig_simple.py delete mode 100755 reconfig_simple_nextip.py diff --git a/.gitignore b/.gitignore index c172444..f9dbee9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ sdkconfig.old *.bin *.elf *.map +firmware/ +flash_args_* # IDE .vscode/ @@ -16,6 +18,8 @@ sdkconfig.old *.swp *.swo *~ +# Emacs backup files +\#*\# # Dependencies dependencies/ diff --git a/async_batch_config.py b/async_batch_config.py deleted file mode 100755 index 2d5d3d1..0000000 --- a/async_batch_config.py +++ /dev/null @@ -1,267 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Async Batch Configuration Tool -The definitive parallel configuration tool. -Configures 30+ ESP32 devices concurrently using non-blocking I/O. - -Features: -- Concurrent execution (configure 30 devices in <20 seconds) -- Robust Regex-based state detection -- Supports verifying both Station Mode (IP check) and Monitor Mode -- Context-aware logging -- CSI enable/disable control -""" - -import asyncio -import serial_asyncio -import sys -import os -import argparse -import ipaddress -import re -import time -import logging - -# Ensure detection script is available -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -try: - import detect_esp32 -except ImportError: - print("Error: 'detect_esp32.py' not found.") - sys.exit(1) - -# --- Logging Setup --- -class DeviceLoggerAdapter(logging.LoggerAdapter): - """Prefixes log messages with the device port name""" - def process(self, msg, kwargs): - return '[%s] %s' % (self.extra['connid'], msg), kwargs - -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') -logger = logging.getLogger("BatchConfig") - -class Esp32Configurator: - """ - Manages the lifecycle of configuring a single ESP32 device via Async Serial. - """ - def __init__(self, port, target_ip, args): - self.port = port - self.target_ip = target_ip - self.args = args - self.log = DeviceLoggerAdapter(logger, {'connid': port}) - - # --- Regex Patterns --- - # Success indicators - self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) - self.regex_monitor_success = re.compile(r'Monitor mode active', re.IGNORECASE) - self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE) - - # Prompts indicating device is booting/ready - self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) - - # Error indicators - self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE) - - async def run(self): - """Main execution workflow for this device""" - try: - reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) - except Exception as e: - self.log.error(f"Failed to open port: {e}") - return False - - try: - # 1. Hardware Reset (DTR/RTS) - self.log.info("Resetting...") - writer.transport.serial.dtr = False - writer.transport.serial.rts = True - await asyncio.sleep(0.1) - writer.transport.serial.rts = False - await asyncio.sleep(0.1) - writer.transport.serial.dtr = True - - # 2. Wait for Boot - # We assume the device is ready when we see logs or a prompt - if not await self._wait_for_boot(reader): - self.log.warning("Boot prompt missed, attempting config anyway...") - - # 3. Send Configuration - await self._send_config(writer) - - # 4. Verify Success - return await self._verify_configuration(reader) - - except Exception as e: - self.log.error(f"Exception: {e}") - return False - finally: - writer.close() - await writer.wait_closed() - - async def _wait_for_boot(self, reader): - """Reads stream until a known 'ready' prompt appears""" - self.log.info("Waiting for boot...") - timeout = time.time() + 5 # 5 second boot timeout - - while time.time() < timeout: - try: - line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5) - line = line_bytes.decode('utf-8', errors='ignore').strip() - if not line: continue - - if self.regex_ready.search(line): - return True - except asyncio.TimeoutError: - continue - return False - - async def _send_config(self, writer): - """Builds and transmits the configuration command""" - csi_val = '1' if self.args.csi_enable else '0' - self.log.info(f"Sending config for IP {self.target_ip} (CSI:{csi_val})...") - - # Construct command block - config_str = ( - f"CFG\n" - f"SSID:{self.args.ssid}\n" - f"PASS:{self.args.password}\n" - f"IP:{self.target_ip}\n" - f"MASK:{self.args.netmask}\n" - f"GW:{self.args.gateway}\n" - f"DHCP:0\n" - f"BAND:{self.args.band}\n" - f"BW:{self.args.bandwidth}\n" - f"POWERSAVE:{self.args.powersave}\n" - f"MODE:{self.args.mode}\n" - f"MON_CH:{self.args.monitor_channel}\n" - f"CSI:{csi_val}\n" - f"END\n" - ) - - writer.write(config_str.encode('utf-8')) - await writer.drain() - - async def _verify_configuration(self, reader): - """Monitors output for confirmation of Success""" - self.log.info("Verifying configuration...") - timeout = time.time() + 15 # 15s verification timeout - csi_saved = False - - while time.time() < timeout: - try: - line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0) - line = line_bytes.decode('utf-8', errors='ignore').strip() - if not line: continue - - # Check for CSI save confirmation - if self.regex_csi_saved.search(line): - csi_saved = True - - # Check for Station Mode Success (IP Address) - m_ip = self.regex_got_ip.search(line) - if m_ip: - got_ip = m_ip.group(1) - if got_ip == self.target_ip: - csi_status = "CSI saved" if csi_saved else "" - self.log.info(f"SUCCESS: Assigned {got_ip} {csi_status}") - return True - else: - self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}") - - # Check for Monitor Mode Success - if self.regex_monitor_success.search(line): - self.log.info("SUCCESS: Monitor Mode Active") - return True - - # Check for errors - if self.regex_error.search(line): - self.log.warning(f"Device Reported Error: {line}") - - except asyncio.TimeoutError: - continue - - self.log.error("Timeout: Device did not confirm configuration.") - return False - -async def main_async(): - parser = argparse.ArgumentParser( - description='Async ESP32 Batch Config with CSI Control', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Configure 20 iperf baseline devices (NO CSI) - %(prog)s --start-ip 192.168.1.81 - - # Configure devices WITH CSI enabled - %(prog)s --start-ip 192.168.1.111 --csi - - # Configure for monitor mode on channel 36 - %(prog)s --start-ip 192.168.1.90 -M MONITOR -mc 36 - - # 5GHz with 40MHz bandwidth - %(prog)s --start-ip 192.168.1.81 -b 5G -B HT40 - """ - ) - - # Arguments - parser.add_argument('--start-ip', required=True, help='Starting Static IP') - parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='WiFi SSID') - parser.add_argument('-P', '--password', default='ez2remember', help='WiFi password') - parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP') - parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') - parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G']) - parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80']) - parser.add_argument('-ps', '--powersave', default='NONE') - parser.add_argument('-M', '--mode', default='STA') - parser.add_argument('-mc', '--monitor-channel', type=int, default=36) - parser.add_argument('--csi', dest='csi_enable', action='store_true', - help='Enable CSI capture (default: disabled)') - - args = parser.parse_args() - - # 1. Detect - print("Step 1: Detecting Devices...") - devices = detect_esp32.detect_esp32_devices() - if not devices: - print("No devices found.") - return - - # Sort naturally (ttyUSB2 before ttyUSB10) - def natural_keys(d): - return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] - devices.sort(key=natural_keys) - - try: - start_ip_obj = ipaddress.IPv4Address(args.start_ip) - except: - print(f"Invalid IP: {args.start_ip}") - return - - # 2. Configure Concurrently - csi_status = "ENABLED" if args.csi_enable else "DISABLED" - print(f"Step 2: Configuring {len(devices)} devices concurrently (CSI: {csi_status})...") - tasks = [] - - for i, dev in enumerate(devices): - current_ip = str(start_ip_obj + i) - configurator = Esp32Configurator(dev.device, current_ip, args) - tasks.append(configurator.run()) - - # Run everything at once - results = await asyncio.gather(*tasks) - - # 3. Report - success_count = results.count(True) - print("\n" + "="*40) - print(f"Total Devices: {len(devices)}") - print(f"Success: {success_count}") - print(f"Failed: {len(devices) - success_count}") - print(f"CSI Setting: {csi_status}") - print("="*40) - -if __name__ == '__main__': - try: - if os.name == 'nt': - asyncio.set_event_loop(asyncio.ProactorEventLoop()) - asyncio.run(main_async()) - except KeyboardInterrupt: - print("\nAborted by user.") diff --git a/async_mass_deploy.py b/async_mass_deploy.py deleted file mode 100755 index 3fd3cab..0000000 --- a/async_mass_deploy.py +++ /dev/null @@ -1,280 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Async Mass Deployment Tool -Combines parallel flashing (via esptool) with async configuration. -""" -import asyncio -import serial_asyncio -import sys -import os -import argparse -import ipaddress -import re -import time -import logging -from pathlib import Path - -# Ensure detection script is available -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -try: - import detect_esp32 -except ImportError: - print("Error: 'detect_esp32.py' not found.") - sys.exit(1) - -# --- Configuration --- -MAX_CONCURRENT_FLASH = 8 - -class Colors: - GREEN = '\033[92m' - RED = '\033[91m' - YELLOW = '\033[93m' - BLUE = '\033[94m' - RESET = '\033[0m' - -class DeviceLoggerAdapter(logging.LoggerAdapter): - def process(self, msg, kwargs): - return '[%s] %s' % (self.extra['connid'], msg), kwargs - -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S') -logger = logging.getLogger("Deploy") - -class DeployWorker: - def __init__(self, port, target_ip, args, build_dir, flash_sem): - self.port = port - self.target_ip = target_ip - self.args = args - self.build_dir = build_dir - self.flash_sem = flash_sem - self.log = DeviceLoggerAdapter(logger, {'connid': port}) - - # Regex Patterns - self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE) - # Match output from 'mode_status' command - self.regex_status_connected = re.compile(r'WiFi connected: Yes', re.IGNORECASE) - self.regex_status_ip = re.compile(r'Got IP: (\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) - - async def run(self): - try: - # 1. Flash Phase - async with self.flash_sem: - if self.args.erase: - if not await self._erase_flash(): return False - if not await self._flash_firmware(): return False - - # 2. Config Phase - await asyncio.sleep(1.0) # Wait for port to stabilize after flash reset - - if self.args.ssid and self.args.password: - if not await self._configure_device(): return False - else: - self.log.info(f"{Colors.GREEN}Flash Complete (NVS Preserved){Colors.RESET}") - return True - except Exception as e: - self.log.error(f"Worker Exception: {e}") - return False - - async def _erase_flash(self): - self.log.info("Erasing flash...") - cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash'] - proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - stdout, stderr = await proc.communicate() - if proc.returncode == 0: - self.log.info("Erase successful.") - return True - self.log.error(f"Erase failed: {stderr.decode()}") - return False - - async def _flash_firmware(self): - self.log.info("Flashing firmware...") - cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud), - '--before', 'default_reset', '--after', 'hard_reset', - 'write_flash', '@flash_args'] - - proc = await asyncio.create_subprocess_exec( - *cmd, cwd=self.build_dir, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) - except asyncio.TimeoutError: - proc.kill() - self.log.error("Flash timed out.") - return False - - if proc.returncode == 0: - self.log.info("Flash successful.") - return True - - self.log.error(f"Flash failed: {stderr.decode()}") - return False - - async def _configure_device(self): - self.log.info("Connecting to console...") - try: - reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) - except Exception as e: - self.log.error(f"Serial open failed: {e}") - return False - - try: - # A. Boot Wait - self.log.info("Waiting for boot...") - booted = False - end_time = time.time() + 10 - while time.time() < end_time: - try: - line_b = await asyncio.wait_for(reader.readline(), timeout=0.5) - line = line_b.decode('utf-8', errors='ignore').strip() - if self.regex_ready.search(line): - booted = True - break - except asyncio.TimeoutError: - continue - - if not booted: - self.log.warning("Boot prompt missed, attempting config anyway...") - - # B. Send Config - self.log.info(f"Sending config for {self.target_ip}...") - config_str = (f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\n" - f"IP:{self.target_ip}\nMASK:{self.args.netmask}\nGW:{self.args.gateway}\n" - f"DHCP:0\nEND\n") - writer.write(config_str.encode('utf-8')) - await writer.drain() - - # C. Active Polling Verification - self.log.info("Verifying configuration (Polling)...") - start_verify = time.time() - - while time.time() < start_verify + 30: - # 1. Clear buffer - try: - while True: - await asyncio.wait_for(reader.read(1024), timeout=0.01) - except asyncio.TimeoutError: - pass - - # 2. Send status request - writer.write(b"\nmode_status\n") - await writer.drain() - - # 3. Read response for ~2 seconds - poll_end = time.time() + 2.0 - while time.time() < poll_end: - try: - line_b = await asyncio.wait_for(reader.readline(), timeout=0.5) - line = line_b.decode('utf-8', errors='ignore').strip() - - # Check for success indicators in status output - if self.regex_status_connected.search(line): - self.log.info(f"{Colors.GREEN}SUCCESS: Connected{Colors.RESET}") - return True - - # Also catch passive "Got IP" logs if they appear - m = self.regex_status_ip.search(line) - if m: - if m.group(1) == self.target_ip: - self.log.info(f"{Colors.GREEN}SUCCESS: IP Confirmed ({m.group(1)}){Colors.RESET}") - return True - - except asyncio.TimeoutError: - break - - # Wait a bit before next poll - await asyncio.sleep(1.0) - - self.log.error("Timeout: Device did not confirm connection.") - return False - - except Exception as e: - self.log.error(f"Config error: {e}") - return False - finally: - writer.close() - await writer.wait_closed() - -def parse_args(): - parser = argparse.ArgumentParser(description='Async ESP32 Mass Deployment') - parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir') - parser.add_argument('-s', '--ssid', help='WiFi SSID') - parser.add_argument('-P', '--password', help='WiFi Password') - parser.add_argument('--start-ip', default='192.168.1.51', help='Start IP') - parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud') - parser.add_argument('--erase', action='store_true', help='Full erase first') - parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway') - parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') - return parser.parse_args() - -async def run_deployment(args): - project_dir = Path(args.dir).resolve() - build_dir = project_dir / 'build' - - # 1. Build Firmware - print(f"{Colors.YELLOW}[1/3] Building Firmware...{Colors.RESET}") - proc = await asyncio.create_subprocess_exec( - 'idf.py', 'build', - cwd=project_dir, - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.PIPE - ) - _, stderr = await proc.communicate() - - if proc.returncode != 0: - print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}") - return - - if not (build_dir / 'flash_args').exists(): - print(f"{Colors.RED}Error: build/flash_args missing.{Colors.RESET}") - return - print(f"{Colors.GREEN}Build Complete.{Colors.RESET}") - - # 2. Detect Devices - print(f"{Colors.YELLOW}[2/3] Scanning Devices...{Colors.RESET}") - devices = detect_esp32.detect_esp32_devices() - if not devices: - print(f"{Colors.RED}No devices found.{Colors.RESET}") - return - - def natural_keys(d): - return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)] - devices.sort(key=natural_keys) - - # 3. Deploy - print(f"{Colors.YELLOW}[3/3] Deploying to {len(devices)} devices...{Colors.RESET}") - try: - start_ip_obj = ipaddress.IPv4Address(args.start_ip) - except: - print("Invalid Start IP") - return - - flash_sem = asyncio.Semaphore(MAX_CONCURRENT_FLASH) - tasks = [] - - for i, dev in enumerate(devices): - target_ip = str(start_ip_obj + i) - worker = DeployWorker(dev.device, target_ip, args, build_dir, flash_sem) - tasks.append(worker.run()) - - results = await asyncio.gather(*tasks) - - # 4. Summary - success = results.count(True) - print(f"\n{Colors.BLUE}{'='*40}{Colors.RESET}") - print(f"Total: {len(devices)}") - print(f"Success: {Colors.GREEN}{success}{Colors.RESET}") - print(f"Failed: {Colors.RED}{len(devices) - success}{Colors.RESET}") - print(f"{Colors.BLUE}{'='*40}{Colors.RESET}") - -def main(): - args = parse_args() - if os.name == 'nt': - asyncio.set_event_loop(asyncio.ProactorEventLoop()) - try: - asyncio.run(run_deployment(args)) - except KeyboardInterrupt: - print("\nCancelled.") - -if __name__ == '__main__': - main() diff --git a/batch_config.py b/batch_config.py deleted file mode 100755 index 0694172..0000000 --- a/batch_config.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Batch Configuration Tool -Detects all connected ESP32s and configures them with sequential Static IPs. -Requires: detect_esp32.py and config_device.py in the same directory. -""" - -import sys -import os -import argparse -import time -import ipaddress -import re - -# Ensure we can import the other scripts -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -try: - import detect_esp32 - import config_device -except ImportError as e: - print(f"Error: Could not import required modules ({e}).") - print("Make sure 'detect_esp32.py' and 'config_device.py' are in the same folder.") - sys.exit(1) - -def natural_sort_key(device_obj): - """ - Sorts ports naturally (ttyUSB2 comes before ttyUSB10) - """ - s = device_obj.device - # Split string into a list of integers and non-integers - return [int(text) if text.isdigit() else text.lower() - for text in re.split('([0-9]+)', s)] - -def main(): - parser = argparse.ArgumentParser( - description='Batch Config: Detects all ESP32s and configures sequential IPs', - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - - # Arguments matching config_device.py options - parser.add_argument('--start-ip', required=True, - help='Starting Static IP (e.g., 192.168.1.101). Will increment for each device.') - parser.add_argument('-s', '--ssid', default='ClubHouse2G', - help='WiFi SSID') - parser.add_argument('-P', '--password', default='ez2remember', - help='WiFi password') - parser.add_argument('-g', '--gateway', default='192.168.1.1', - help='Gateway IP') - parser.add_argument('-m', '--netmask', default='255.255.255.0', - help='Netmask') - parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'], - help='WiFi band') - parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'], - help='Channel bandwidth') - parser.add_argument('-ps', '--powersave', default='NONE', - choices=['NONE', 'MIN', 'MIN_MODEM', 'MAX', 'MAX_MODEM'], - help='Power save mode') - parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'], - help='Operating mode') - parser.add_argument('-mc', '--monitor-channel', type=int, default=36, - help='Monitor mode channel') - parser.add_argument('-r', '--no-reboot', action='store_true', - help='Do NOT reboot devices after configuration') - parser.add_argument('-v', '--verbose', action='store_true', - help='Enable verbose output') - - args = parser.parse_args() - - # 1. Detect Devices - print(f"{'='*60}") - print("Step 1: Detecting ESP32 Devices...") - print(f"{'='*60}") - - devices = detect_esp32.detect_esp32_devices() - - if not devices: - print("No ESP32 devices found! Check USB connections.") - sys.exit(1) - - # Sort devices naturally so IPs are assigned in order (USB0, USB1, USB2...) - devices.sort(key=natural_sort_key) - - print(f"Found {len(devices)} devices:") - for d in devices: - print(f" - {d.device} ({d.description})") - print() - - # 2. Parse Starting IP - try: - start_ip_obj = ipaddress.IPv4Address(args.start_ip) - except ipaddress.AddressValueError: - print(f"Error: Invalid IP address format: {args.start_ip}") - sys.exit(1) - - # 3. Configure Each Device - print(f"{'='*60}") - print("Step 2: Configuring Devices Sequentially") - print(f"{'='*60}") - - success_count = 0 - fail_count = 0 - failed_devices = [] - - for index, device in enumerate(devices): - # Calculate current IP - current_ip = str(start_ip_obj + index) - port = device.device - - print(f"\n[{index+1}/{len(devices)}] Configuring {port} with IP {current_ip}...") - - # Call the config function from your existing script - result = config_device.config_device( - port=port, - ip=current_ip, - ssid=args.ssid, - password=args.password, - gateway=args.gateway, - netmask=args.netmask, - band=args.band, - bandwidth=args.bandwidth, - powersave=args.powersave, - mode=args.mode, - monitor_channel=args.monitor_channel, - reboot=not args.no_reboot, - verbose=args.verbose - ) - - if result: - print(f"✓ Success: {port} -> {current_ip}") - success_count += 1 - else: - print(f"✗ Failed: {port}") - fail_count += 1 - failed_devices.append(port) - - # Small delay to prevent USB power spikes if multiple devices reboot simultaneously - if not args.no_reboot and index < len(devices) - 1: - time.sleep(1.0) - - # 4. Summary - print(f"\n{'='*60}") - print("Batch Configuration Complete") - print(f"{'='*60}") - print(f"Total Devices: {len(devices)}") - print(f"Successful: {success_count}") - print(f"Failed: {fail_count}") - - if failed_devices: - print("\nFailed Ports:") - for p in failed_devices: - print(f" - {p}") - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\nBatch process interrupted by user.") diff --git a/config_device.py b/config_device.py deleted file mode 100755 index 3ae3b16..0000000 --- a/config_device.py +++ /dev/null @@ -1,389 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 WiFi Configuration Tool - Static IP with auto-disable DHCP and CSI control -""" - -import serial -import time -import sys -import argparse - -def log_verbose(message, verbose=False): - """Print message only if verbose is enabled""" - if verbose: - print(f"[VERBOSE] {message}") - -def config_device(port, ip, ssid="ClubHouse2G", password="ez2remember", - gateway="192.168.1.1", netmask="255.255.255.0", - band="2.4G", bandwidth="HT20", powersave="NONE", - mode="STA", monitor_channel=36, csi_enable=False, - reboot=True, verbose=False): - """Configure ESP32 device via serial with static IP and CSI control""" - - print(f"\n{'='*70}") - print(f"ESP32 WiFi Configuration (Static IP + Mode + CSI)") - print(f"{'='*70}") - print(f"Port: {port}") - print(f"SSID: {ssid}") - print(f"Password: {'*' * len(password)}") - print(f"IP: {ip} (DHCP disabled)") - print(f"Gateway: {gateway}") - print(f"Netmask: {netmask}") - print(f"Mode: {mode}") - if mode == "MONITOR": - print(f"Mon Ch: {monitor_channel}") - print(f"Band: {band}") - print(f"Bandwidth: {bandwidth}") - print(f"PowerSave: {powersave}") - print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}") - print(f"Reboot: {'Yes' if reboot else 'No'}") - print(f"Verbose: {verbose}") - print(f"{'='*70}\n") - - try: - # Open serial connection - log_verbose(f"Opening serial port {port} at 115200 baud...", verbose) - ser = serial.Serial(port, 115200, timeout=0.5, write_timeout=0.5) - log_verbose(f"Serial port opened successfully", verbose) - log_verbose(f"Port settings: {ser}", verbose) - - time.sleep(0.2) - - # Check if there's any data waiting - if ser.in_waiting: - log_verbose(f"{ser.in_waiting} bytes waiting in buffer", verbose) - existing = ser.read(ser.in_waiting).decode('utf-8', errors='ignore') - log_verbose(f"Existing data: {existing[:100]}", verbose) - - # Build config message - # DHCP is always disabled (0) when IP address is provided - config_lines = [ - "CFG", - f"SSID:{ssid}", - f"PASS:{password}", - f"IP:{ip}", - f"MASK:{netmask}", - f"GW:{gateway}", - "DHCP:0", # Always disabled for static IP - f"BAND:{band}", - f"BW:{bandwidth}", - f"POWERSAVE:{powersave}", - f"MODE:{mode}", - f"MON_CH:{monitor_channel}", - f"CSI:{'1' if csi_enable else '0'}", - "END" - ] - - config = '\n'.join(config_lines) + '\n' - - log_verbose(f"Config message size: {len(config)} bytes", verbose) - if verbose: - print("[VERBOSE] Config message:") - for line in config_lines: - display_line = line if not line.startswith("PASS:") else "PASS:********" - print(f"[VERBOSE] {display_line}") - - # Send config - print("Sending configuration...") - print("\nConfiguration being sent:") - for line in config_lines: - display_line = line if not line.startswith("PASS:") else "PASS:********" - print(f" {display_line}") - print() - - start_time = time.time() - - bytes_written = ser.write(config.encode('utf-8')) - ser.flush() - - send_time = time.time() - start_time - log_verbose(f"Wrote {bytes_written} bytes in {send_time:.3f}s", verbose) - print(f"Sent {bytes_written} bytes") - - print("Waiting for response...") - time.sleep(3) - - # Read response - if ser.in_waiting: - response_size = ser.in_waiting - print(f"\n✓ Response received: {response_size} bytes") - - response = ser.read(response_size).decode('utf-8', errors='ignore') - - print("\nDevice response:") - print("-" * 70) - for line in response.split('\n')[:30]: - if line.strip(): - print(f" {line}") - print("-" * 70) - - # Check for key indicators - success_indicators = [] - warning_indicators = [] - - if "OK" in response: - success_indicators.append("✓ Configuration acknowledged (OK)") - if "Config saved" in response or "saved to NVS" in response: - success_indicators.append("✓ Config saved to NVS") - if "CSI enable state saved" in response: - csi_state = "ENABLED" if csi_enable else "DISABLED" - success_indicators.append(f"✓ CSI {csi_state} saved to NVS") - if "got ip:" in response.lower(): - success_indicators.append("✓ Device connected to WiFi!") - import re - ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', response, re.IGNORECASE) - if ip_match: - received_ip = ip_match.group(1) - success_indicators.append(f" Assigned IP: {received_ip}") - if received_ip != ip: - warning_indicators.append(f"⚠ Warning: Device got {received_ip} instead of configured {ip}") - warning_indicators.append(" This might indicate DHCP is still enabled") - if "connected" in response.lower(): - success_indicators.append("✓ WiFi connection established") - - if "failed" in response.lower() or "disconnect" in response.lower(): - warning_indicators.append("⚠ WiFi connection may have failed") - if "error" in response.lower(): - warning_indicators.append("⚠ Error detected in response") - - if success_indicators: - print("\nStatus indicators:") - for indicator in success_indicators: - print(f" {indicator}") - - if warning_indicators: - print("\nWarnings:") - for warning in warning_indicators: - print(f" {warning}") - else: - print("\n⚠ No response from device") - print(" This could mean:") - print(" - Device is not running config handler") - print(" - Wrong serial port") - print(" - Baud rate mismatch") - - # Reboot device if requested - if reboot: - print("\n" + "="*70) - print("Rebooting device...") - print("="*70) - log_verbose("Performing hardware reset via DTR/RTS", verbose) - - ser.dtr = False - ser.rts = True - time.sleep(0.1) - - ser.rts = False - time.sleep(0.1) - - ser.dtr = True - - print("✓ Reset signal sent - waiting for boot...") - - time.sleep(3) - - if ser.in_waiting: - boot_msg = ser.read(ser.in_waiting).decode('utf-8', errors='ignore') - - print("\nBoot messages:") - print("-" * 70) - for line in boot_msg.split('\n')[:40]: - if line.strip(): - print(f" {line}") - print("-" * 70) - - # Check boot status - boot_success = [] - boot_warnings = [] - - if "WiFi config loaded from NVS" in boot_msg: - boot_success.append("✓ Config successfully loaded from NVS") - elif "No WiFi config" in boot_msg or "YELLOW LED" in boot_msg: - boot_warnings.append("✗ NO CONFIG found in NVS") - boot_warnings.append(" Device does not see saved config") - - # Check CSI status - if "CSI Capture: ENABLED" in boot_msg: - boot_success.append("✓ CSI capture is ENABLED") - elif "CSI Capture: DISABLED" in boot_msg: - if csi_enable: - boot_warnings.append("⚠ CSI is DISABLED but was configured as ENABLED") - else: - boot_success.append("✓ CSI capture is DISABLED (as configured)") - - # Check if device got the correct static IP - import re - ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', boot_msg, re.IGNORECASE) - if ip_match: - received_ip = ip_match.group(1) - if received_ip == ip: - boot_success.append(f"✓ Device got correct static IP: {ip}") - else: - boot_warnings.append(f"⚠ Device got {received_ip} instead of {ip}") - boot_warnings.append(" DHCP may still be enabled or IP conflict exists") - - if "WiFi CONNECTED" in boot_msg: - boot_success.append("✓ WiFi connection confirmed") - - if boot_success: - print("\nBoot Status - SUCCESS:") - for msg in boot_success: - print(f" {msg}") - - if boot_warnings: - print("\nBoot Status - ISSUES:") - for msg in boot_warnings: - print(f" {msg}") - else: - print("\n⚠ No boot messages received") - print(" Device may still be booting...") - - if verbose: - log_verbose(f"Input buffer: {ser.in_waiting} bytes", verbose) - log_verbose(f"Output buffer empty: {ser.out_waiting == 0}", verbose) - - ser.close() - log_verbose("Serial port closed", verbose) - - print(f"\n{'='*70}") - print("Configuration Summary") - print(f"{'='*70}") - print(f"Port: {port}") - print(f"Static IP: {ip}") - print(f"SSID: {ssid}") - print(f"Mode: {mode}") - print(f"Band: {band}") - print(f"Bandwidth: {bandwidth}") - print(f"PowerSave: {powersave}") - print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}") - print(f"DHCP: Disabled (static IP mode)") - print(f"{'='*70}") - print("\nNext steps:") - print(f" 1. Test connection:") - print(f" ping {ip}") - print(f" iperf -c {ip}") - print(f"\n 2. Verify device has correct IP:") - print(f" idf.py -p {port} monitor") - print(f" Look for: 'got ip:{ip}'") - if csi_enable: - print(f"\n 3. Verify CSI is capturing:") - print(f" Look for: 'CSI Capture: ENABLED'") - print(f" 'Captured X CSI packets'") - - return True - - except serial.SerialException as e: - print(f"\n✗ Serial error: {e}") - log_verbose(f"Serial exception details: {type(e).__name__}", verbose) - print(" Is another program using this port?") - return False - except KeyboardInterrupt: - print("\n\nConfiguration cancelled by user") - if 'ser' in locals() and ser.is_open: - ser.close() - log_verbose("Serial port closed after interrupt", verbose) - return False - except Exception as e: - print(f"\n✗ Error: {e}") - if verbose: - import traceback - print("\n[VERBOSE] Full traceback:") - traceback.print_exc() - return False - -def main(): - parser = argparse.ArgumentParser( - description='Configure ESP32 WiFi with static IP (DHCP automatically disabled) and CSI control', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Configure device #1 for STA mode with CSI DISABLED (baseline testing) - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA - - # Configure device #1 for STA mode with CSI ENABLED - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi - - # Configure device #25 for MONITOR mode (collapse detection, CSI not needed) - %(prog)s -p /dev/ttyUSB1 -i 192.168.1.90 -M MONITOR -mc 36 - - # STA mode with CSI for iperf + CSI correlation testing - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi -ps NONE - - # Monitor mode on 2.4GHz channel 6 - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.91 -M MONITOR -mc 6 -b 2.4G - - # STA mode on 5GHz with 40MHz bandwidth and CSI - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA -b 5G -B HT40 --csi - - # With verbose output - %(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 -v - -Note: - - Mode and CSI enable state are saved to NVS - - Device will auto-start in configured mode on boot - - CSI defaults to DISABLED unless --csi flag is used - - DHCP is always disabled when using this script (static IP mode) - """ - ) - - parser.add_argument('-p', '--port', required=True, - help='Serial port (e.g., /dev/ttyUSB0)') - parser.add_argument('-i', '--ip', required=True, - help='Static IP address (DHCP will be disabled)') - parser.add_argument('-s', '--ssid', default='ClubHouse2G', - help='WiFi SSID (default: ClubHouse2G)') - parser.add_argument('-P', '--password', default='ez2remember', - help='WiFi password (default: ez2remember)') - parser.add_argument('-g', '--gateway', default='192.168.1.1', - help='Gateway IP (default: 192.168.1.1)') - parser.add_argument('-m', '--netmask', default='255.255.255.0', - help='Netmask (default: 255.255.255.0)') - parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'], - help='WiFi band: 2.4G or 5G (default: 2.4G)') - parser.add_argument('-B', '--bandwidth', default='HT20', - choices=['HT20', 'HT40', 'VHT80'], - help='Channel bandwidth: HT20 (20MHz), HT40 (40MHz), VHT80 (80MHz, 5GHz only) (default: HT20)') - parser.add_argument('-ps', '--powersave', default='NONE', - choices=['NONE', 'MIN', 'MIN_MODEM', 'MAX', 'MAX_MODEM'], - help='Power save mode: NONE (no PS, best for CSI), MIN/MIN_MODEM, MAX/MAX_MODEM (default: NONE)') - parser.add_argument('-M', '--mode', default='STA', - choices=['STA', 'MONITOR'], - help='Operating mode: STA (connect to AP, CSI+iperf) or MONITOR (promiscuous, collapse detection) (default: STA)') - parser.add_argument('-mc', '--monitor-channel', type=int, default=36, - help='Monitor mode channel (1-11 for 2.4GHz, 36-165 for 5GHz) (default: 36)') - parser.add_argument('--csi', action='store_true', - help='Enable CSI capture (default: disabled). Use for devices that need CSI data collection.') - parser.add_argument('-r', '--no-reboot', action='store_true', - help='Do NOT reboot device after configuration') - parser.add_argument('-v', '--verbose', action='store_true', - help='Enable verbose output') - - args = parser.parse_args() - - # Validate bandwidth selection - if args.bandwidth == 'VHT80' and args.band == '2.4G': - print("\n✗ Error: VHT80 (80MHz) is only supported on 5GHz band") - print(" Either use -b 5G or choose HT20/HT40 bandwidth") - sys.exit(1) - - success = config_device( - port=args.port, - ip=args.ip, - ssid=args.ssid, - password=args.password, - gateway=args.gateway, - netmask=args.netmask, - band=args.band, - bandwidth=args.bandwidth, - powersave=args.powersave, - mode=args.mode, - monitor_channel=args.monitor_channel, - csi_enable=args.csi, - reboot=not args.no_reboot, - verbose=args.verbose - ) - - sys.exit(0 if success else 1) - -if __name__ == '__main__': - main() diff --git a/dependencies.lock b/dependencies.lock new file mode 100644 index 0000000..7e51cc5 --- /dev/null +++ b/dependencies.lock @@ -0,0 +1,21 @@ +dependencies: + espressif/led_strip: + component_hash: 28c6509a727ef74925b372ed404772aeedf11cce10b78c3f69b3c66799095e2d + dependencies: + - name: idf + require: private + version: '>=4.4' + source: + registry_url: https://components.espressif.com/ + type: service + version: 2.5.5 + idf: + source: + type: idf + version: 6.0.0 +direct_dependencies: +- espressif/led_strip +- idf +manifest_hash: cfead66889b7175cc6aa9a766fd00dc94649d6800986f3fcc4645dc58723ed39 +target: esp32c5 +version: 2.0.0 diff --git a/esp32_deploy.py b/esp32_deploy.py index f238c0b..680888d 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -135,7 +135,8 @@ class UnifiedDeployWorker: # --- Semaphore Released Here --- - await asyncio.sleep(2.0) + # Brief pause after flash to allow device to start booting (hardware timing) + await asyncio.sleep(1.0) if not self.args.flash_only: if self.args.ssid and self.args.password: @@ -171,10 +172,10 @@ class UnifiedDeployWorker: except Exception as e: return False try: - # Reset DTR/RTS logic + # Reset DTR/RTS logic (hardware reset timing - must use sleep) writer.transport.serial.dtr = False writer.transport.serial.rts = True - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # Hardware timing requirement, not event-based writer.transport.serial.rts = False writer.transport.serial.dtr = False @@ -184,36 +185,100 @@ class UnifiedDeployWorker: return False # 2. Send Configuration via CLI - # Command: wifi_config -s "SSID" -p "PASS" -i "IP" - # Note: The Shell will auto-reboot after this command. - cmd = f'wifi_config -s "{self.args.ssid}" -p "{self.args.password}" -i "{self.target_ip}"' - if not self.args.iperf_client and not self.args.iperf_server: - # If just connecting, maybe we want DHCP? - # But if target_ip is set, we force static. - pass - - self.log.info(f"Sending: {cmd}") - writer.write(f"{cmd}\n".encode()) + # Use separate commands: wifi connect and ip set + # First, connect to WiFi (saves credentials to NVS) + # Password is optional, only include if provided + if self.args.password: + wifi_cmd = f'wifi connect "{self.args.ssid}" "{self.args.password}"' + else: + wifi_cmd = f'wifi connect "{self.args.ssid}"' + self.log.info(f"Sending: {wifi_cmd}") + writer.write(f"{wifi_cmd}\n".encode()) await writer.drain() + + # Wait for "Connecting to..." message (command acknowledged) + match_idx, output = await self._wait_for_pattern(reader, ["Connecting to"], timeout=2.0) + if match_idx is None: + self.log.warning("WiFi connect command response not detected, continuing...") + + # Second, set static IP address (if IP is provided) + if self.target_ip: + ip_cmd = f'ip set {self.target_ip} {self.args.netmask} {self.args.gateway}' + self.log.info(f"Sending: {ip_cmd}") + writer.write(f"{ip_cmd}\n".encode()) + await writer.drain() + + # Wait for IP set confirmation + match_idx, output = await self._wait_for_pattern( + reader, + ["Static IP set", "Saved. Will apply on next init", "Invalid IP format"], + timeout=3.0 + ) + if match_idx is None: + self.log.warning("IP set command response not detected, continuing...") + elif match_idx == 2: # "Invalid IP format" + self.log.error("IP configuration failed: Invalid IP format") + return False - # 3. Wait for the reboot and new prompt - # The device prints "Rebooting..." then restarts. - self.log.info("Waiting for reboot...") - await asyncio.sleep(3.0) # Give it time to actually reset + # 3. Wait for prompt to ensure command completed (may take a moment for WiFi to start connecting) + # WiFi connection is async, so we just verify the prompt returns + if not await self._wait_for_prompt(reader, writer, timeout=5.0): + self.log.warning("Prompt check failed after configuration, but continuing...") - if not await self._wait_for_prompt(reader, writer, timeout=20): - self.log.error("Device did not return to prompt after reboot.") - return False + self.log.info(f"{Colors.GREEN}Configuration complete.{Colors.RESET}") - self.log.info(f"{Colors.GREEN}Reboot complete. Shell Ready.{Colors.RESET}") - - # 4. (Optional) Start iperf if requested - # The new firmware does not auto-start iperf on boot unless commanded. + # 4. (Optional) Configure and start iperf if requested if not self.args.no_iperf: - self.log.info("Starting iperf listener...") + # Configure iperf parameters if specified + iperf_params = [] + if self.args.iperf_dest_ip: + iperf_params.append(f'--client {self.args.iperf_dest_ip}') + if self.args.iperf_port: + iperf_params.append(f'--port {self.args.iperf_port}') + if self.args.iperf_len: + iperf_params.append(f'--len {self.args.iperf_len}') + if self.args.iperf_burst: + iperf_params.append(f'--burst {self.args.iperf_burst}') + # Note: iperf-period not directly supported, would need PPS calculation + + if iperf_params: + iperf_set_cmd = f"iperf set {' '.join(iperf_params)}\n" + self.log.info(f"Configuring iperf: {iperf_set_cmd.strip()}") + writer.write(iperf_set_cmd.encode()) + await writer.drain() + + # Wait for iperf set confirmation + match_idx, output = await self._wait_for_pattern( + reader, + ["Configuration updated", "No changes specified"], + timeout=2.0 + ) + if match_idx is None: + self.log.warning("iperf set response not detected, continuing...") + + # Save configuration to NVS + self.log.info("Saving iperf config to NVS...") + writer.write(b"iperf save\n") + await writer.drain() + + # Wait for iperf save confirmation + match_idx, output = await self._wait_for_pattern( + reader, + ["Configuration saved to NVS", "No changes to save", "Error saving"], + timeout=2.0 + ) + if match_idx is None: + self.log.warning("iperf save response not detected, continuing...") + elif match_idx == 2: # "Error saving" + self.log.error("iperf save failed, but continuing...") + + # Start iperf (no specific output expected, just wait for prompt) + self.log.info("Starting iperf...") writer.write(b"iperf start\n") await writer.drain() - await asyncio.sleep(0.5) + + # Wait a brief moment for command to be processed, then check for prompt + await self._wait_for_prompt(reader, writer, timeout=2.0) return True @@ -246,6 +311,48 @@ class UnifiedDeployWorker: break return False + async def _wait_for_pattern(self, reader, patterns, timeout=5.0, consume=True): + """ + Wait for one of the given patterns to appear in serial output. + patterns: list of regex patterns or simple strings to search for + timeout: maximum time to wait + consume: if True, read and discard data; if False, only peek + Returns: tuple (matched_pattern_index, buffer) or (None, buffer) if timeout + """ + end_time = time.time() + timeout + buffer = "" + + # Compile patterns + compiled_patterns = [] + for p in patterns: + if isinstance(p, str): + compiled_patterns.append(re.compile(re.escape(p), re.IGNORECASE)) + else: + compiled_patterns.append(p) + + while time.time() < end_time: + try: + chunk = await asyncio.wait_for(reader.read(1024), timeout=0.1) + if chunk: + buffer += chunk.decode('utf-8', errors='ignore') + # Keep buffer size manageable (last 4KB) + if len(buffer) > 4096: + buffer = buffer[-4096:] + + # Check each pattern + for i, pattern in enumerate(compiled_patterns): + if pattern.search(buffer): + if not consume: + # Put data back (not easily possible with async, so we'll consume) + pass + return (i, buffer) + except asyncio.TimeoutError: + continue + except Exception: + break + + return (None, buffer) + # [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware AS IS] async def _query_version(self): try: @@ -254,7 +361,8 @@ class UnifiedDeployWorker: writer.transport.serial.rts = False writer.write(b'\n') await writer.drain() - await asyncio.sleep(0.1) + # Brief pause for prompt, then send command + await asyncio.sleep(0.1) # Hardware timing for prompt to appear writer.write(b'version\n') await writer.drain() diff --git a/esp32_reconfig.py b/esp32_reconfig.py deleted file mode 100755 index 3acf1dc..0000000 --- a/esp32_reconfig.py +++ /dev/null @@ -1,235 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Reconfiguration Tool -Iterates through connected devices and updates their settings (SSID, IP, etc.) -without reflashing firmware. Runs sequentially for reliability. -""" - -import sys -import os -import argparse -import ipaddress -import re -import time -import serial -from serial.tools import list_ports - -# --- Configuration --- -BAUD_RATE = 115200 -TIMEOUT = 0.5 # Serial read timeout - -class Colors: - GREEN = '\033[92m' - RED = '\033[91m' - YELLOW = '\033[93m' - BLUE = '\033[94m' - CYAN = '\033[96m' - RESET = '\033[0m' - -def detect_devices(): - """Returns a sorted list of ESP32 USB serial ports.""" - candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60")) - ports = [p.device for p in candidates] - ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)]) - return ports - -def extract_device_number(device_path): - match = re.search(r'(\d+)$', device_path) - return int(match.group(1)) if match else 0 - -def configure_device(port, target_ip, args): - """ - Connects to a single device, resets it, and injects the config. - Returns True if verification succeeds. - """ - try: - ser = serial.Serial(port, BAUD_RATE, timeout=0.1) - except Exception as e: - print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}") - return False - - try: - # 1. Reset Device - ser.dtr = False - ser.rts = True - time.sleep(0.1) - ser.rts = False - ser.dtr = True - - # 2. Wait for App to Settle (Handle GPS delay) - print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True) - start_time = time.time() - - # We wait until we see the "esp32>" prompt OR specific log lines - # The prompt is the safest indicator that the console is ready. - prompt_detected = False - buffer = "" - - while time.time() - start_time < 10.0: - try: - # Read char by char to catch prompts that don't end in newline - chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore') - if chunk: - buffer += chunk - # Check for prompt or end of init - if "esp32>" in buffer or "Entering console loop" in buffer: - prompt_detected = True - break - # Keep buffer size manageable - if len(buffer) > 1000: buffer = buffer[-1000:] - except Exception: - pass - time.sleep(0.05) - - if not prompt_detected: - print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}") - else: - print(" OK") - - # 3. Clear Buffers & Wakeup - ser.reset_input_buffer() - ser.write(b'\n') # Send an Enter to clear any partial commands - time.sleep(0.2) - - # 4. Construct Config String (Using CRLF \r\n for safety) - csi_val = '1' if args.csi_enable else '0' - role_str = "SERVER" if args.iperf_server else "CLIENT" - iperf_enable_val = '0' if args.no_iperf else '1' - period_us = int(args.iperf_period * 1000000) - - # Note: We send \r\n explicitly - config_lines = [ - "CFG", - f"SSID:{args.ssid}", - f"PASS:{args.password}", - f"IP:{target_ip}", - f"MASK:{args.netmask}", - f"GW:{args.gateway}", - f"DHCP:0", - f"BAND:{args.band}", - f"BW:{args.bandwidth}", - f"POWERSAVE:{args.powersave}", - f"MODE:{args.mode}", - f"MON_CH:{args.monitor_channel}", - f"CSI:{csi_val}", - f"IPERF_PERIOD_US:{period_us}", - f"IPERF_ROLE:{role_str}", - f"IPERF_PROTO:{args.iperf_proto}", - f"IPERF_DEST_IP:{args.iperf_dest_ip}", - f"IPERF_PORT:{args.iperf_port}", - f"IPERF_BURST:{args.iperf_burst}", - f"IPERF_LEN:{args.iperf_len}", - f"IPERF_ENABLED:{iperf_enable_val}", - "END" - ] - - config_payload = "\r\n".join(config_lines) + "\r\n" - - # 5. Send Config - print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True) - ser.write(config_payload.encode('utf-8')) - ser.flush() - - # 6. Verify - verify_start = time.time() - verified = False - ser.timeout = 0.5 # Increase timeout for line reading - - while time.time() - verify_start < 8.0: - line = ser.readline().decode('utf-8', errors='ignore').strip() - if not line: continue - - # Check for success indicators - if "Config saved" in line or "CSI enable state saved" in line: - verified = True - break - - # Check for IP confirmation - if f"got ip:{target_ip}" in line: - verified = True - break - - if verified: - print(f" {Colors.GREEN}SUCCESS{Colors.RESET}") - # Final Reset to apply settings cleanly - ser.dtr = False - ser.rts = True - time.sleep(0.1) - ser.rts = False - return True - else: - print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}") - return False - - except Exception as e: - print(f"[{port}] Error: {e}") - return False - finally: - if ser.is_open: - ser.close() - -def main(): - parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool') - - parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)') - parser.add_argument('-s', '--ssid', default='ClubHouse2G') - parser.add_argument('-P', '--password', default='ez2remember') - parser.add_argument('-g', '--gateway', default='192.168.1.1') - parser.add_argument('-m', '--netmask', default='255.255.255.0') - parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G']) - parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80']) - parser.add_argument('-ps', '--powersave', default='NONE') - parser.add_argument('--iperf-period', type=float, default=0.01) - parser.add_argument('--iperf-burst', type=int, default=1) - parser.add_argument('--iperf-len', type=int, default=1470) - parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP']) - parser.add_argument('--iperf-dest-ip', default='192.168.1.50') - parser.add_argument('--iperf-port', type=int, default=5001) - parser.add_argument('--no-iperf', action='store_true') - parser.add_argument('--iperf-client', action='store_true') - parser.add_argument('--iperf-server', action='store_true') - parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR']) - parser.add_argument('-mc', '--monitor-channel', type=int, default=36) - parser.add_argument('--csi', dest='csi_enable', action='store_true') - parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device") - - args = parser.parse_args() - - print(f"{Colors.BLUE}{'='*60}{Colors.RESET}") - print(f" ESP32 Sequential Reconfig Tool") - print(f"{Colors.BLUE}{'='*60}{Colors.RESET}") - - devices = detect_devices() - if not devices: - print(f"{Colors.RED}No devices found.{Colors.RESET}") - sys.exit(1) - - print(f"Found {len(devices)} devices. Starting reconfiguration...\n") - - start_ip = ipaddress.IPv4Address(args.start_ip) - - for i, port in enumerate(devices): - offset = extract_device_number(port) - target_ip = str(start_ip + offset) - - print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}") - - success = False - for attempt in range(1, args.retries + 1): - if attempt > 1: - print(f" Retry {attempt}/{args.retries}...") - - if configure_device(port, target_ip, args): - success = True - break - time.sleep(1.0) - - if not success: - print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n") - else: - print("") - - print(f"{Colors.BLUE}Done.{Colors.RESET}") - -if __name__ == '__main__': - main() diff --git a/flash_all.py b/flash_all.py deleted file mode 100755 index 77540cd..0000000 --- a/flash_all.py +++ /dev/null @@ -1,187 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Mass Flash Script (filtered & robust) - -Changes in this version: - - Filters out non-USB system serials (e.g., /dev/ttyS*) - - Only enumerates typical USB serial ports: /dev/ttyUSB* and /dev/ttyACM* - - Further filters to known USB-serial vendor IDs by default: - * FTDI 0x0403 - * SiliconLabs/CP210x 0x10C4 - * QinHeng/CH34x 0x1A86 - * Prolific PL2303 0x067B - * Espressif native 0x303A - - Adds --ports to override selection (glob or comma-separated patterns) - - Uses detect_esp32.detect_chip_type(port) for exact chip string - - Maps to correct idf.py target before flashing - -Example: - python3 flash_all.py --project /path/to/project --start-ip 192.168.1.50 - python3 flash_all.py --project . --ports '/dev/ttyUSB*,/dev/ttyACM*' -""" -import argparse -import fnmatch -import glob -import os -import subprocess -import sys -from pathlib import Path -from typing import List, Dict - -try: - import serial.tools.list_ports as list_ports -except Exception: - print("pyserial is required: pip install pyserial") - raise - -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, SCRIPT_DIR) -try: - import detect_esp32 -except Exception as e: - print("Error: detect_esp32.py must be in the same directory and importable.") - print(f"Import error: {e}") - sys.exit(1) - -# Known USB VIDs commonly used for ESP32 dev boards/adapters -KNOWN_VIDS = {0x0403, 0x10C4, 0x1A86, 0x067B, 0x303A} - -def map_chip_to_idf_target(chip_str: str) -> str: - if not chip_str or chip_str == 'Unknown': - return 'unknown' - s = chip_str.upper() - if s.startswith('ESP32-S3'): - return 'esp32s3' - if s.startswith('ESP32-S2'): - return 'esp32s2' - if s.startswith('ESP32-C3'): - return 'esp32c3' - if s.startswith('ESP32-C6'): - return 'esp32c6' - if s.startswith('ESP32-H2'): - return 'esp32h2' - if s.startswith('ESP32'): - return 'esp32' - return 'unknown' - -def run(cmd: List[str], cwd: str = None, check: bool = True) -> int: - print(">>", " ".join(cmd)) - proc = subprocess.run(cmd, cwd=cwd) - if check and proc.returncode != 0: - raise RuntimeError(f"Command failed with code {proc.returncode}: {' '.join(cmd)}") - return proc.returncode - -def ensure_target(project_dir: str, target: str): - if not target or target == 'unknown': - raise ValueError("Unknown IDF target; cannot set-target.") - run(['idf.py', 'set-target', target], cwd=project_dir, check=True) - -def flash_device(project_dir: str, port: str, idf_target: str, baud: int = 460800) -> bool: - try: - ensure_target(project_dir, idf_target) - run(['idf.py', '-p', port, '-b', str(baud), 'flash'], cwd=project_dir, check=True) - return True - except Exception as e: - print(f" Flash failed on {port}: {e}") - return False - -def match_any(path: str, patterns: List[str]) -> bool: - return any(fnmatch.fnmatch(path, pat) for pat in patterns) - -def list_ports_filtered(patterns: List[str] = None) -> List[object]: - """Return a filtered list of pyserial list_ports items.""" - ports = list(list_ports.comports()) - filtered = [] - for p in ports: - dev = p.device - # Default pattern filter: only /dev/ttyUSB* and /dev/ttyACM* - if patterns: - if not match_any(dev, patterns): - continue - else: - if not (dev.startswith('/dev/ttyUSB') or dev.startswith('/dev/ttyACM')): - continue - # VID filter (allow if vid is known or missing (some systems omit it), but exclude obvious non-USB) - vid = getattr(p, 'vid', None) - if vid is not None and vid not in KNOWN_VIDS: - # Skip unknown vendor to reduce noise; user can override with --ports - continue - filtered.append(p) - return filtered - -def main(): - ap = argparse.ArgumentParser(description="Mass flash multiple ESP32 devices with proper chip detection.") - ap.add_argument('--project', required=True, help='Path to the ESP-IDF project to flash') - ap.add_argument('--ssid', help='WiFi SSID (optional)') - ap.add_argument('--password', help='WiFi password (optional)') - ap.add_argument('--start-ip', default='192.168.1.50', help='Base IP address for plan display') - ap.add_argument('--baud', type=int, default=460800, help='Flashing baud rate') - ap.add_argument('--dry-run', action='store_true', help='Plan only; do not flash') - ap.add_argument('--ports', help='Comma-separated glob(s), e.g. "/dev/ttyUSB*,/dev/ttyACM*" to override selection') - args = ap.parse_args() - - project_dir = os.path.abspath(args.project) - if not os.path.isdir(project_dir): - print(f"Project directory not found: {project_dir}") - sys.exit(1) - - patterns = None - if args.ports: - patterns = [pat.strip() for pat in args.ports.split(',') if pat.strip()] - - devices = list_ports_filtered(patterns) - print(f"Found {len(devices)} USB serial device(s) after filtering") - if not devices: - print("No candidate USB serial ports found. Try --ports '/dev/ttyUSB*,/dev/ttyACM*' or check permissions.") - - device_list: List[Dict] = [] - for idx, dev in enumerate(devices, 1): - port = dev.device - print(f"Probing {port} for exact chip...") - raw_chip = detect_esp32.detect_chip_type(port) - idf_target = map_chip_to_idf_target(raw_chip) - if idf_target == 'unknown': - print(f" WARNING: Could not determine idf.py target for {port} (got '{raw_chip}')") - device_list.append({ - 'number': idx, - 'port': port, - 'raw_chip': raw_chip, - 'idf_target': idf_target, - 'info': dev, - }) - - # Plan output - base = args.start_ip.split('.') - try: - base0, base1, base2, base3 = int(base[0]), int(base[1]), int(base[2]), int(base[3]) - except Exception: - base0, base1, base2, base3 = 192, 168, 1, 50 - - print("\nFlash plan:") - for d in device_list: - ip_last = base3 + d['number'] - 1 - ip = f"{base0}.{base1}.{base2}.{ip_last}" - print(f" Device {d['number']:2d}: {d['port']} -> {d['raw_chip']} [{d['idf_target']}] -> {ip}") - - if args.dry_run: - print("\nDry run: not flashing any devices.") - return - - failed = [] - for d in device_list: - if d['idf_target'] == 'unknown': - print(f"\n ERROR: Unknown IDF target for {d['port']} (raw chip '{d['raw_chip']}'). Skipping.") - failed.append(d['number']) - continue - print(f"\nFlashing {d['port']} as target {d['idf_target']}...") - ok = flash_device(project_dir, d['port'], d['idf_target'], baud=args.baud) - if not ok: - failed.append(d['number']) - - if failed: - print(f"\nCompleted with failures on devices: {failed}") - sys.exit(2) - print("\nAll devices flashed successfully.") - -if __name__ == '__main__': - main() diff --git a/flash_all_parallel.py b/flash_all_parallel.py deleted file mode 100755 index 9bf63eb..0000000 --- a/flash_all_parallel.py +++ /dev/null @@ -1,432 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Parallel Mass Flash Script -Build and flash multiple ESP32 devices concurrently for much faster deployment -""" - -import subprocess -import sys -import os -import time -import argparse -import shutil -from pathlib import Path -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed -from multiprocessing import cpu_count - -# Import the detection script -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -try: - import detect_esp32 -except ImportError: - print("Error: detect_esp32.py must be in the same directory") - sys.exit(1) - - -def detect_device_type(port_info): - """Detect ESP32 variant based on USB chip""" - if port_info.vid == 0x303A: - return 'esp32s3' - return 'esp32' - - -def probe_chip_type(port): - """Probe the actual chip type using esptool.py""" - try: - result = subprocess.run( - ['esptool.py', '--port', port, 'chip_id'], - capture_output=True, - text=True, - timeout=10 - ) - - output = result.stdout + result.stderr - - if 'ESP32-S3' in output: - return 'esp32s3' - elif 'ESP32-S2' in output: - return 'esp32s2' - elif 'ESP32-C3' in output: - return 'esp32c3' - elif 'ESP32' in output: - return 'esp32' - - except Exception as e: - print(f" Warning: Could not probe {port}: {e}") - - return 'esp32' - - -def create_sdkconfig(build_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'): - """Create sdkconfig.defaults file with WiFi and IP configuration""" - sdkconfig_path = os.path.join(build_dir, 'sdkconfig.defaults') - - config_content = f"""# WiFi Configuration -CONFIG_WIFI_SSID="{ssid}" -CONFIG_WIFI_PASSWORD="{password}" -CONFIG_WIFI_MAXIMUM_RETRY=5 - -# Static IP Configuration -CONFIG_USE_STATIC_IP=y -CONFIG_STATIC_IP_ADDR="{ip_addr}" -CONFIG_STATIC_GATEWAY_ADDR="{gateway}" -CONFIG_STATIC_NETMASK_ADDR="{netmask}" -""" - - with open(sdkconfig_path, 'w') as f: - f.write(config_content) - - -def build_firmware(device_info, project_dir, build_dir, ssid, password): - """Build firmware for a single device with unique configuration""" - dev_num = device_info['number'] - chip_type = device_info['chip'] - ip_addr = device_info['ip'] - port = device_info['port'] - - print(f"[Device {dev_num}] [{port}] Chip: {chip_type.upper()} | Building with IP {ip_addr}") - - try: - # Create build directory - os.makedirs(build_dir, exist_ok=True) - - # Copy project files to build directory - for item in ['main', 'CMakeLists.txt']: - src = os.path.join(project_dir, item) - dst = os.path.join(build_dir, item) - if os.path.isdir(src): - if os.path.exists(dst): - shutil.rmtree(dst) - shutil.copytree(src, dst) - else: - shutil.copy2(src, dst) - - # Create sdkconfig.defaults - create_sdkconfig(build_dir, ssid, password, ip_addr) - - # Set target - result = subprocess.run( - ['idf.py', 'set-target', chip_type], - cwd=build_dir, - capture_output=True, - text=True - ) - - if result.returncode != 0: - return { - 'success': False, - 'device': dev_num, - 'error': f"Set target failed: {result.stderr[:200]}" - } - - # Build - result = subprocess.run( - ['idf.py', 'build'], - cwd=build_dir, - capture_output=True, - text=True - ) - - if result.returncode != 0: - return { - 'success': False, - 'device': dev_num, - 'error': f"Build failed: {result.stderr[-500:]}" - } - - print(f"[Device {dev_num}] ✓ Build complete ({chip_type.upper()})") - - return { - 'success': True, - 'device': dev_num, - 'build_dir': build_dir - } - - except Exception as e: - return { - 'success': False, - 'device': dev_num, - 'error': str(e) - } - - -def flash_device(device_info, build_dir): - """Flash a single device""" - dev_num = device_info['number'] - port = device_info['port'] - ip_addr = device_info['ip'] - chip_type = device_info['chip'] - - print(f"[Device {dev_num}] [{port}] {chip_type.upper()} | Flashing -> {ip_addr}") - - try: - result = subprocess.run( - ['idf.py', '-p', port, 'flash'], - cwd=build_dir, - capture_output=True, - text=True, - timeout=120 - ) - - if result.returncode != 0: - return { - 'success': False, - 'device': dev_num, - 'port': port, - 'error': f"Flash failed: {result.stderr[-500:]}" - } - - print(f"[Device {dev_num}] ✓ Flash complete ({chip_type.upper()}) at {ip_addr}") - - return { - 'success': True, - 'device': dev_num, - 'port': port, - 'ip': ip_addr - } - - except subprocess.TimeoutExpired: - return { - 'success': False, - 'device': dev_num, - 'port': port, - 'error': "Flash timeout" - } - except Exception as e: - return { - 'success': False, - 'device': dev_num, - 'port': port, - 'error': str(e) - } - - -def build_and_flash(device_info, project_dir, work_dir, ssid, password): - """Combined build and flash for a single device""" - dev_num = device_info['number'] - build_dir = os.path.join(work_dir, f'build_device_{dev_num}') - - # Build - build_result = build_firmware(device_info, project_dir, build_dir, ssid, password) - if not build_result['success']: - return build_result - - # Flash - flash_result = flash_device(device_info, build_dir) - - # Clean up build directory to save space - try: - shutil.rmtree(build_dir) - except: - pass - - return flash_result - - -def main(): - parser = argparse.ArgumentParser(description='Parallel mass flash ESP32 devices') - parser.add_argument('--ssid', required=True, help='WiFi SSID') - parser.add_argument('--password', required=True, help='WiFi password') - parser.add_argument('--start-ip', default='192.168.1.50', - help='Starting IP address (default: 192.168.1.50)') - parser.add_argument('--gateway', default='192.168.1.1', - help='Gateway IP (default: 192.168.1.1)') - parser.add_argument('--project-dir', default=None, - help='ESP32 iperf project directory') - parser.add_argument('--probe', action='store_true', - help='Probe each device to detect exact chip type (slower)') - parser.add_argument('--dry-run', action='store_true', - help='Show what would be done without building/flashing') - parser.add_argument('--build-parallel', type=int, default=None, - help='Number of parallel builds (default: CPU cores)') - parser.add_argument('--flash-parallel', type=int, default=8, - help='Number of parallel flash operations (default: 8)') - parser.add_argument('--strategy', choices=['build-then-flash', 'build-and-flash'], - default='build-and-flash', - help='Deployment strategy (default: build-and-flash)') - - args = parser.parse_args() - - # Determine parallelism - if args.build_parallel is None: - args.build_parallel = max(1, cpu_count() - 1) - - # Find project directory - if args.project_dir: - project_dir = args.project_dir - else: - script_dir = os.path.dirname(os.path.abspath(__file__)) - project_dir = script_dir - - if not os.path.exists(os.path.join(project_dir, 'main')): - project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf') - - if not os.path.exists(project_dir): - print(f"ERROR: Project directory not found: {project_dir}") - sys.exit(1) - - # Create work directory for builds - work_dir = os.path.join(project_dir, '.builds') - os.makedirs(work_dir, exist_ok=True) - - print(f"Using project directory: {project_dir}") - print(f"Work directory: {work_dir}") - - # Detect devices - print("\nDetecting ESP32 devices...") - devices = detect_esp32.detect_esp32_devices() - - if not devices: - print("No ESP32 devices detected!") - sys.exit(1) - - print(f"Found {len(devices)} device(s)") - - # Prepare device list with IPs - base_parts = args.start_ip.split('.') - device_list = [] - - for idx, device in enumerate(devices, 1): - if args.probe: - print(f"Probing {device.device}...") - chip_type = probe_chip_type(device.device) - else: - chip_type = detect_device_type(device) - - ip_last = int(base_parts[3]) + idx - 1 - ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}" - - device_list.append({ - 'number': idx, - 'port': device.device, - 'chip': chip_type, - 'ip': ip, - 'info': device - }) - - # Display plan - print(f"\n{'='*70}") - print("PARALLEL FLASH PLAN") - print(f"{'='*70}") - print(f"SSID: {args.ssid}") - print(f"Strategy: {args.strategy}") - print(f"Build parallelism: {args.build_parallel}") - print(f"Flash parallelism: {args.flash_parallel}") - print() - - for dev in device_list: - print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {dev['ip']}") - - if args.dry_run: - print("\nDry run - no devices will be built or flashed") - return - - # Confirm - print(f"\n{'='*70}") - response = input("Proceed with parallel flashing? (yes/no): ").strip().lower() - if response != 'yes': - print("Aborted.") - return - - print(f"\n{'='*70}") - print("STARTING PARALLEL DEPLOYMENT") - print(f"{'='*70}\n") - - start_time = time.time() - - if args.strategy == 'build-then-flash': - # Strategy 1: Build all, then flash all - print(f"Phase 1: Building {len(device_list)} configurations with {args.build_parallel} parallel builds...") - - build_results = [] - with ProcessPoolExecutor(max_workers=args.build_parallel) as executor: - futures = {} - for dev in device_list: - build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}') - future = executor.submit( - build_firmware, dev, project_dir, build_dir, args.ssid, args.password - ) - futures[future] = dev - - for future in as_completed(futures): - result = future.result() - build_results.append(result) - if not result['success']: - print(f"[Device {result['device']}] ✗ Build failed: {result['error']}") - - # Flash phase - successful_builds = [r for r in build_results if r['success']] - print(f"\nPhase 2: Flashing {len(successful_builds)} devices with {args.flash_parallel} parallel operations...") - - flash_results = [] - with ThreadPoolExecutor(max_workers=args.flash_parallel) as executor: - futures = {} - for result in successful_builds: - dev = device_list[result['device'] - 1] - build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}') - future = executor.submit(flash_device, dev, build_dir) - futures[future] = dev - - for future in as_completed(futures): - result = future.result() - flash_results.append(result) - if not result['success']: - print(f"[Device {result['device']}] ✗ Flash failed: {result['error']}") - - # Cleanup - print("\nCleaning up build directories...") - try: - shutil.rmtree(work_dir) - except: - pass - - final_results = flash_results - - else: - # Strategy 2: Build and flash together (limited parallelism) - print(f"Building and flashing with {args.build_parallel} parallel operations...") - - final_results = [] - with ProcessPoolExecutor(max_workers=args.build_parallel) as executor: - futures = {} - for dev in device_list: - future = executor.submit( - build_and_flash, dev, project_dir, work_dir, args.ssid, args.password - ) - futures[future] = dev - - for future in as_completed(futures): - result = future.result() - final_results.append(result) - if not result['success']: - print(f"[Device {result['device']}] ✗ Failed: {result['error']}") - - # Summary - elapsed_time = time.time() - start_time - success_count = sum(1 for r in final_results if r['success']) - failed_devices = [r['device'] for r in final_results if not r['success']] - - print(f"\n{'='*70}") - print("DEPLOYMENT SUMMARY") - print(f"{'='*70}") - print(f"Successfully deployed: {success_count}/{len(device_list)} devices") - print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)") - print(f"Average time per device: {elapsed_time/len(device_list):.1f} seconds") - - if failed_devices: - print(f"\nFailed devices: {', '.join(map(str, failed_devices))}") - - print(f"{'='*70}") - - -if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - print("\n\nInterrupted by user") - sys.exit(1) - except Exception as e: - print(f"\nFATAL ERROR: {e}") - import traceback - traceback.print_exc() - sys.exit(1) diff --git a/flash_all_serial_config.py b/flash_all_serial_config.py deleted file mode 100644 index 47cb423..0000000 --- a/flash_all_serial_config.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import argparse -import serial -import time -from pathlib import Path - -def send_cfg(port, ssid, password, dhcp, start_ip, mask, gw): - print(f"Connecting to {port}...") - with serial.Serial(port, baudrate=115200, timeout=2) as ser: - time.sleep(0.5) - ser.reset_input_buffer() - ser.write(b"CFG\n") - ser.write(f"SSID:{ssid}\n".encode()) - ser.write(f"PASS:{password}\n".encode()) - if dhcp == 0: - ser.write(f"IP:{start_ip}\n".encode()) - ser.write(f"MASK:{mask}\n".encode()) - ser.write(f"GW:{gw}\n".encode()) - ser.write(f"DHCP:{dhcp}\n".encode()) - ser.write(b"END\n") - time.sleep(0.3) - - print("\nDevice response:") - while ser.in_waiting: - sys.stdout.write(ser.read(ser.in_waiting).decode(errors='ignore')) - sys.stdout.flush() - time.sleep(0.1) - print("\n✅ Configuration sent successfully.") - -def main(): - parser = argparse.ArgumentParser(description="Configure ESP32 Wi-Fi settings over serial") - parser.add_argument("--project", help="ESP-IDF project path (defaults to current working directory)") - parser.add_argument("--ssid", required=True) - parser.add_argument("--password", required=True) - parser.add_argument("--start-ip", help="Static IP address") - parser.add_argument("--mask", default="255.255.255.0") - parser.add_argument("--gw", help="Gateway address") - parser.add_argument("--dhcp", type=int, choices=[0,1], default=1) - parser.add_argument("--baud", type=int, default=460800) - parser.add_argument("--cfg-baud", type=int, default=115200) - parser.add_argument("--ports", nargs='+', help="Serial port(s), e.g., /dev/ttyUSB0 /dev/ttyUSB1") - parser.add_argument("--port", help="Single serial port (shorthand for --ports PORT)") - parser.add_argument("--dry-run", action="store_true") - - args = parser.parse_args() - - # Default to current working directory - project_path = args.project or os.getcwd() - print(f"Using project directory: {project_path}") - - # Resolve ports - ports = [] - if args.port: - ports.append(args.port) - elif args.ports: - ports = args.ports - else: - print("❌ No serial port specified. Use --port or --ports.") - sys.exit(1) - - # Apply configuration - for port in ports: - if args.dry_run: - print(f"[DRY RUN] Would send Wi-Fi config to {port}") - else: - send_cfg(port, args.ssid, args.password, args.dhcp, args.start_ip, args.mask, args.gw) - -if __name__ == "__main__": - main() - diff --git a/flash_and_config.py b/flash_and_config.py deleted file mode 100755 index 2f67ea0..0000000 --- a/flash_and_config.py +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/env python3 -""" -Flash uniform firmware to all ESP32 devices, then reconfigure via serial -This is much faster than building unique firmwares for each device -""" - -import os -import sys -import subprocess -import argparse -import glob -from pathlib import Path -import time - -def detect_devices(): - """Detect all ESP32 devices""" - devices = sorted(glob.glob('/dev/ttyUSB*')) - return devices - -def build_firmware(project_dir): - """Build the firmware once""" - print("=" * 60) - print("Building firmware (one time)...") - print("=" * 60) - - result = subprocess.run( - ['idf.py', 'build'], - cwd=project_dir, - capture_output=False - ) - - if result.returncode != 0: - print("✗ Build failed!") - return False - - print("✓ Build complete") - return True - -def flash_device(port, project_dir): - """Flash a single device""" - try: - result = subprocess.run( - ['idf.py', '-p', port, 'flash'], - cwd=project_dir, - capture_output=True, - text=True, - timeout=120 - ) - return result.returncode == 0 - except Exception as e: - return False - -def flash_all_devices(devices, project_dir): - """Flash all devices with the same firmware""" - print(f"\nFlashing {len(devices)} devices...") - - success_count = 0 - for idx, dev in enumerate(devices, 1): - print(f"[{idx:2d}/{len(devices)}] Flashing {dev}...", end='', flush=True) - - if flash_device(dev, project_dir): - print(" ✓") - success_count += 1 - else: - print(" ✗") - - print(f"\nFlashed: {success_count}/{len(devices)}") - return success_count - -def reconfigure_devices(ssid, password, start_ip, gateway="192.168.1.1"): - """Reconfigure devices using the reconfig script""" - script_path = os.path.join(os.path.dirname(__file__), 'reconfig_simple.py') - - if not os.path.exists(script_path): - print(f"Error: {script_path} not found!") - return False - - print("\n" + "=" * 60) - print("Reconfiguring WiFi settings via serial...") - print("=" * 60) - - cmd = [ - 'python3', script_path, - '-s', ssid, - '-p', password, - '--start-ip', start_ip, - '-g', gateway - ] - - result = subprocess.run(cmd) - return result.returncode == 0 - -def main(): - parser = argparse.ArgumentParser( - description='Flash and configure all ESP32 devices', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -This script: - 1. Builds firmware ONCE - 2. Flashes the SAME firmware to all devices (fast!) - 3. Reconfigures each device via serial with unique IP - -Much faster than building 32 different firmwares! - -Examples: - # Basic usage - %(prog)s --ssid MyWiFi --password mypass - - # Custom IP range - %(prog)s --ssid MyWiFi --password mypass --start-ip 192.168.1.100 - - # Build only (no flash) - %(prog)s --build-only - - # Reconfigure only (no flash) - %(prog)s --reconfig-only --ssid MyWiFi --password mypass - """ - ) - - parser.add_argument('--ssid', help='WiFi SSID') - parser.add_argument('--password', help='WiFi password') - parser.add_argument('--start-ip', default='192.168.1.50', - help='Starting IP address (default: 192.168.1.50)') - parser.add_argument('--gateway', default='192.168.1.1', - help='Gateway IP (default: 192.168.1.1)') - parser.add_argument('--project-dir', default=None, - help='ESP32 project directory (default: current dir)') - parser.add_argument('--build-only', action='store_true', - help='Only build, do not flash') - parser.add_argument('--reconfig-only', action='store_true', - help='Only reconfigure, do not build/flash') - parser.add_argument('--skip-build', action='store_true', - help='Skip build, use existing firmware') - - args = parser.parse_args() - - # Determine project directory - if args.project_dir: - project_dir = args.project_dir - else: - project_dir = os.path.dirname(os.path.abspath(__file__)) - - if not os.path.exists(os.path.join(project_dir, 'CMakeLists.txt')): - print(f"Error: Not an ESP-IDF project directory: {project_dir}") - return 1 - - # Detect devices - devices = detect_devices() - if not devices and not args.build_only: - print("Error: No devices found!") - return 1 - - print(f"Found {len(devices)} device(s)") - - # Reconfigure only mode - if args.reconfig_only: - if not args.ssid or not args.password: - print("Error: --ssid and --password required for reconfigure mode") - return 1 - - return 0 if reconfigure_devices(args.ssid, args.password, args.start_ip, args.gateway) else 1 - - # Build firmware - if not args.skip_build: - if not build_firmware(project_dir): - return 1 - - if args.build_only: - print("\n Build complete. Use --skip-build to flash later.") - return 0 - - # Flash all devices - flash_count = flash_all_devices(devices, project_dir) - - if flash_count == 0: - print("✗ No devices flashed successfully") - return 1 - - # Reconfigure if credentials provided - if args.ssid and args.password: - print("\nWaiting for devices to boot...") - time.sleep(5) - - if not reconfigure_devices(args.ssid, args.password, args.start_ip, args.gateway): - print("✗ Reconfiguration failed") - return 1 - else: - print("\n" + "=" * 60) - print("Flashing complete!") - print("=" * 60) - print("\nTo configure WiFi settings, run:") - print(f" python3 reconfig_simple.py -s YourSSID -p YourPassword --start-ip {args.start_ip}") - - print("\n✓ Done!") - return 0 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/gen_udev_rules.py b/gen_udev_rules.py index 3cb913d..01084f7 100755 --- a/gen_udev_rules.py +++ b/gen_udev_rules.py @@ -1,68 +1,249 @@ #!/usr/bin/env python3 +""" +Unified udev rules generator for ESP32 devices. +Combines functionality from gen_udev_rules.py and append_new_rules.py. + +Features: +- Generate rules from scratch (full scan) +- Append new rules to existing file (incremental update) +- Uses ID_PATH for stable device identification +- Sorts devices by physical topology for consistent ordering +""" + import os -import pyudev +import re +import sys +import subprocess +from pathlib import Path +from serial.tools import list_ports -def generate_rules(): - context = pyudev.Context() +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + CYAN = '\033[96m' + RESET = '\033[0m' - # Find all TTY devices driven by usb-serial drivers (CP210x, FTDI, etc.) +# Default paths +DEFAULT_RULES_FILE = "/etc/udev/rules.d/99-esp32-stable.rules" +TEMP_RULES_FILE = "new_rules.part" + +def get_existing_rules(rules_path): + """ + Parse existing rules file to extract: + - Set of ID_PATH values already mapped + - Maximum port number used + Returns: (known_paths, max_port) + """ + known_paths = set() + max_port = 0 + + if not os.path.exists(rules_path): + return known_paths, 0 + + regex_path = re.compile(r'ENV\{ID_PATH\}=="([^"]+)"') + regex_port = re.compile(r'esp_port_(\d+)') + + try: + with open(rules_path, 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + + path_match = regex_path.search(line) + port_match = regex_port.search(line) + + if path_match: + known_paths.add(path_match.group(1)) + if port_match: + port_num = int(port_match.group(1)) + if port_num > max_port: + max_port = port_num + except Exception as e: + print(f"{Colors.RED}Error reading rules file: {e}{Colors.RESET}") + return known_paths, 0 + + return known_paths, max_port + +def scan_usb_devices(): + """ + Scan all USB serial devices and return list of (device, id_path, location) tuples. + Uses udevadm to get stable ID_PATH identifiers. + """ devices = [] - for device in context.list_devices(subsystem='tty'): - if 'ID_VENDOR_ID' in device.properties and 'ID_MODEL_ID' in device.properties: - # We filter for USB serial devices only - parent = device.find_parent('usb', 'usb_interface') - if parent: - devices.append(device) - # Sort by their physical path so they are ordered by hub port - # The 'DEVPATH' usually looks like .../usb1/1-2/1-2.3/1-2.3.4... - devices.sort(key=lambda x: x.properties.get('DEVPATH', '')) + # Get all USB serial devices + usb_devices = list(list_ports.grep("USB|ACM|CP210|FT232")) - print(f"# Detected {len(devices)} devices. Generating rules...\n") + if not usb_devices: + return devices + # Sort by physical location for consistent ordering + usb_devices.sort(key=lambda x: x.location if x.location else x.device) + + for dev in usb_devices: + try: + # Get ID_PATH via udevadm (most stable identifier) + cmd = ['udevadm', 'info', '--name', dev.device, '--query=property'] + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=2) + + if proc.returncode != 0: + continue + + props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line) + id_path = props.get('ID_PATH', '') + + if not id_path: + continue + + devices.append({ + 'device': dev.device, + 'id_path': id_path, + 'location': dev.location or '', + 'serial': props.get('ID_SERIAL_SHORT', '') + }) + except Exception as e: + print(f"{Colors.YELLOW}Warning: Could not inspect {dev.device}: {e}{Colors.RESET}") + continue + + return devices + +def generate_rules(devices, start_port=1): + """ + Generate udev rules for the given devices. + Returns list of rule strings. + """ rules = [] - hub_counter = 1 - port_counter = 1 - last_parent_path = None + port_num = start_port for dev in devices: - # Get the unique physical path identifier (KERNELS) - # We need the parent USB interface kernel name (e.g., '1-1.2:1.0') - parent = dev.find_parent('usb', 'usb_interface') - if not parent: continue - - kernels_path = parent.device_path.split('/')[-1] - - # Simple heuristic to detect a new hub (big jump in path length or numbering) - # You might need to manually tweak the hub numbers in the file later. - - # Generate the rule - # KERNELS matches the physical port. - # SYMLINK creates the static alias. - rule = f'SUBSYSTEM=="tty", KERNELS=="{kernels_path}", SYMLINK+="esp_port_{len(rules)+1:02d}"' + symlink = f"esp_port_{port_num}" + rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev["id_path"]}", SYMLINK+="{symlink}"' rules.append(rule) - print(f"Mapped {dev.device_node} ({kernels_path}) -> /dev/esp_port_{len(rules):02d}") + port_num += 1 - # Write to file - with open("99-esp32-static.rules", "w") as f: - f.write("# Persistent USB Serial mapping for ESP32 Hubs\n") - f.write("# Generated automatically. Do not edit unless topology changes.\n\n") - for r in rules: - f.write(r + "\n") + return rules - print(f"\n{'-'*60}") - print(f"SUCCESS: Generated {len(rules)} rules in '99-esp32-static.rules'.") - print(f"To install:\n 1. Review the file to ensure order is correct.") - print(f" 2. sudo cp 99-esp32-static.rules /etc/udev/rules.d/") - print(f" 3. sudo udevadm control --reload-rules") - print(f" 4. sudo udevadm trigger") - print(f"{'-'*60}") +def main(): + import argparse + + parser = argparse.ArgumentParser( + description='Generate or update udev rules for ESP32 devices', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Generate rules from scratch (overwrites existing) + %(prog)s --full + + # Append only new devices to existing rules + %(prog)s --append + + # Dry run (show what would be generated) + %(prog)s --append --dry-run + """ + ) + parser.add_argument('--full', action='store_true', + help='Generate complete rules file from scratch') + parser.add_argument('--append', action='store_true', + help='Append only new devices to existing rules') + parser.add_argument('--rules-file', default=DEFAULT_RULES_FILE, + help=f'Path to udev rules file (default: {DEFAULT_RULES_FILE})') + parser.add_argument('--dry-run', action='store_true', + help='Show what would be done without writing files') + parser.add_argument('--output', help='Write to custom file instead of rules file') + + args = parser.parse_args() + + # Default to append mode if neither specified + if not args.full and not args.append: + args.append = True + + print(f"{Colors.BLUE}{'='*60}{Colors.RESET}") + print(f" ESP32 Udev Rules Generator") + print(f"{Colors.BLUE}{'='*60}{Colors.RESET}\n") + + # Scan connected devices + print(f"{Colors.CYAN}Scanning USB topology...{Colors.RESET}") + all_devices = scan_usb_devices() + + if not all_devices: + print(f"{Colors.RED}No USB serial devices found.{Colors.RESET}") + return 1 + + print(f"Found {len(all_devices)} USB serial devices\n") + + if args.full: + # Generate complete rules file + print(f"{Colors.CYAN}Generating complete rules file...{Colors.RESET}") + rules = generate_rules(all_devices, start_port=1) + output_file = args.output or args.rules_file + + print(f"\n{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}") + print("-" * 65) + for i, dev in enumerate(all_devices): + port_num = i + 1 + symlink = f"esp_port_{port_num}" + print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}") + + if not args.dry_run: + rules_content = "# Auto-generated by gen_udev_rules.py\n" + rules_content += "# Stable port mapping based on USB physical topology (ID_PATH)\n\n" + rules_content += "\n".join(rules) + "\n" + + if output_file == args.rules_file and os.geteuid() != 0: + # Write to temp file for user to copy + temp_file = "99-esp32-stable.rules" + with open(temp_file, 'w') as f: + f.write(rules_content) + print(f"\n{Colors.YELLOW}Rules written to: {temp_file}{Colors.RESET}") + print(f"To install, run:") + print(f" {Colors.GREEN}sudo cp {temp_file} {args.rules_file}{Colors.RESET}") + else: + with open(output_file, 'w') as f: + f.write(rules_content) + print(f"\n{Colors.GREEN}Rules written to: {output_file}{Colors.RESET}") + else: + print(f"\n{Colors.YELLOW}DRY RUN: Would generate {len(rules)} rules{Colors.RESET}") + + else: # append mode + # Read existing rules + known_paths, max_port = get_existing_rules(args.rules_file) + print(f"Existing rules: {len(known_paths)} devices mapped (Max Port: {max_port})") + + # Find new devices + new_devices = [d for d in all_devices if d['id_path'] not in known_paths] + + if not new_devices: + print(f"\n{Colors.GREEN}All connected devices are already mapped. No changes needed.{Colors.RESET}") + return 0 + + print(f"\n{Colors.CYAN}Found {len(new_devices)} UNMAPPED devices:{Colors.RESET}") + print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}") + print("-" * 65) + + new_rules = generate_rules(new_devices, start_port=max_port + 1) + for i, dev in enumerate(new_devices): + port_num = max_port + 1 + i + symlink = f"esp_port_{port_num:02d}" + print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}") + + if not args.dry_run: + # Write new rules to temp file + with open(TEMP_RULES_FILE, 'w') as f: + f.write("\n# --- Added by gen_udev_rules.py ---\n") + f.write("\n".join(new_rules) + "\n") + + print(f"\n{Colors.CYAN}New rules saved to: {TEMP_RULES_FILE}{Colors.RESET}") + print(f"To apply, run:") + print(f" {Colors.GREEN}cat {TEMP_RULES_FILE} | sudo tee -a {args.rules_file}{Colors.RESET}") + print(f" {Colors.GREEN}sudo udevadm control --reload-rules && sudo udevadm trigger{Colors.RESET}") + else: + print(f"\n{Colors.YELLOW}DRY RUN: Would append {len(new_rules)} rules{Colors.RESET}") + + print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}") + return 0 if __name__ == '__main__': - # Requires 'pyudev'. Install with: sudo dnf install python3-pyudev (or pip install pyudev) - try: - import pyudev - generate_rules() - except ImportError: - print("Error: This script requires 'pyudev'.") - print("Install it via: pip install pyudev") + sys.exit(main()) diff --git a/leddiff.txt b/leddiff.txt new file mode 100644 index 0000000..7d0cf9f --- /dev/null +++ b/leddiff.txt @@ -0,0 +1,188 @@ +diff --git a/components/status_led/status_led.c b/components/status_led/status_led.c +index 5a34d7e..6dd6d09 100644 +--- a/components/status_led/status_led.c ++++ b/components/status_led/status_led.c +@@ -30,7 +30,6 @@ + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +- + #include "status_led.h" + #include "freertos/FreeRTOS.h" + #include "freertos/task.h" +@@ -38,87 +37,56 @@ + #include "led_strip.h" + #include "esp_log.h" + +-static const char *TAG = "status_led"; ++static const char *TAG = "STATUS_LED"; // Added TAG for logging + + static led_strip_handle_t s_led_strip = NULL; + static bool s_is_rgb = false; + static int s_gpio_pin = -1; + static volatile led_state_t s_current_state = LED_STATE_NO_CONFIG; + +-// Helper to set color safely + static void set_color(uint8_t r, uint8_t g, uint8_t b) { + if (s_is_rgb && s_led_strip) { + led_strip_set_pixel(s_led_strip, 0, r, g, b); + led_strip_refresh(s_led_strip); + } else if (!s_is_rgb && s_gpio_pin >= 0) { +- // Simple LED logic: If any color is requested, turn ON. +- // NOTE: If your LED is active-low (VCC->LED->Pin), invert this to !((r+g+b)>0) +- gpio_set_level(s_gpio_pin, (r + g + b) > 0 ? 1 : 0); ++ gpio_set_level(s_gpio_pin, (r+g+b) > 0); + } + } + + static void led_task(void *arg) { + int toggle = 0; +- +- // --- Startup Diagnostic Sequence --- +- // Cycle R -> G -> B to prove hardware is working +- ESP_LOGW(TAG, "Running LED Diagnostic Sequence on GPIO %d...", s_gpio_pin); +- set_color(50, 0, 0); // Red +- vTaskDelay(pdMS_TO_TICKS(300)); +- set_color(0, 50, 0); // Green +- vTaskDelay(pdMS_TO_TICKS(300)); +- set_color(0, 0, 50); // Blue +- vTaskDelay(pdMS_TO_TICKS(300)); +- set_color(0, 0, 0); // Off +- vTaskDelay(pdMS_TO_TICKS(100)); +- + while (1) { +- // Brightness set to 30-50 (out of 255) for visibility + switch (s_current_state) { +- case LED_STATE_NO_CONFIG: // Yellow (Solid RGB / Blink Simple) +- if (s_is_rgb) { +- set_color(40, 30, 0); +- vTaskDelay(pdMS_TO_TICKS(1000)); +- } else { +- set_color(1,1,1); vTaskDelay(pdMS_TO_TICKS(100)); +- set_color(0,0,0); vTaskDelay(pdMS_TO_TICKS(100)); +- } ++ case LED_STATE_NO_CONFIG: // Yellow ++ if (s_is_rgb) { set_color(25, 25, 0); vTaskDelay(pdMS_TO_TICKS(1000)); } ++ else { set_color(1,1,1); vTaskDelay(100); set_color(0,0,0); vTaskDelay(100); } + break; +- case LED_STATE_WAITING: // Blue Blink +- set_color(0, 0, toggle ? 50 : 0); +- toggle = !toggle; ++ // ... rest of cases identical to your code ... ++ case LED_STATE_WAITING: ++ set_color(0, 0, toggle ? 50 : 0); toggle = !toggle; + vTaskDelay(pdMS_TO_TICKS(500)); + break; +- case LED_STATE_CONNECTED: // Green Solid +- set_color(0, 30, 0); +- vTaskDelay(pdMS_TO_TICKS(1000)); ++ case LED_STATE_CONNECTED: ++ set_color(0, 25, 0); vTaskDelay(pdMS_TO_TICKS(1000)); + break; +- case LED_STATE_MONITORING: // Cyan Solid +- set_color(0, 30, 30); +- vTaskDelay(pdMS_TO_TICKS(1000)); ++ case LED_STATE_MONITORING: ++ set_color(0, 0, 50); vTaskDelay(pdMS_TO_TICKS(1000)); + break; +- case LED_STATE_TRANSMITTING: // Purple Fast Flash +- set_color(toggle ? 40 : 0, 0, toggle ? 40 : 0); +- toggle = !toggle; +- vTaskDelay(pdMS_TO_TICKS(100)); ++ case LED_STATE_TRANSMITTING: ++ set_color(toggle ? 50 : 0, 0, toggle ? 50 : 0); toggle = !toggle; ++ vTaskDelay(pdMS_TO_TICKS(50)); + break; +- case LED_STATE_TRANSMITTING_SLOW: // Purple Slow Pulse +- set_color(toggle ? 40 : 0, 0, toggle ? 40 : 0); +- toggle = !toggle; +- vTaskDelay(pdMS_TO_TICKS(500)); ++ case LED_STATE_TRANSMITTING_SLOW: ++ set_color(toggle ? 50 : 0, 0, toggle ? 50 : 0); toggle = !toggle; ++ vTaskDelay(pdMS_TO_TICKS(250)); + break; +- case LED_STATE_STALLED: // Red/Purple Solid +- set_color(50, 0, 20); +- vTaskDelay(pdMS_TO_TICKS(1000)); ++ case LED_STATE_STALLED: ++ set_color(50, 0, 50); vTaskDelay(pdMS_TO_TICKS(1000)); + break; +- case LED_STATE_FAILED: // Red Blink +- set_color(toggle ? 50 : 0, 0, 0); +- toggle = !toggle; ++ case LED_STATE_FAILED: ++ set_color(toggle ? 50 : 0, 0, 0); toggle = !toggle; + vTaskDelay(pdMS_TO_TICKS(200)); + break; +- default: +- vTaskDelay(pdMS_TO_TICKS(100)); +- break; + } + } + } +@@ -127,41 +95,27 @@ void status_led_init(int gpio_pin, bool is_rgb_strip) { + s_gpio_pin = gpio_pin; + s_is_rgb = is_rgb_strip; + +- ESP_LOGI(TAG, "Initializing Status LED: GPIO=%d, Type=%s", +- gpio_pin, is_rgb_strip ? "RGB Strip (WS2812)" : "Simple GPIO"); ++ // --- DIAGNOSTIC LOG --- ++ ESP_LOGW(TAG, "Initializing LED on GPIO %d (RGB: %d)", gpio_pin, is_rgb_strip); + + if (s_is_rgb) { +- led_strip_config_t s_cfg = { +- .strip_gpio_num = gpio_pin, +- .max_leds = 1, +- .led_pixel_format = LED_PIXEL_FORMAT_GRB, +- .led_model = LED_MODEL_WS2812, +- .flags.invert_out = false, +- }; +- led_strip_rmt_config_t r_cfg = { +- .resolution_hz = 10 * 1000 * 1000, +- .flags.with_dma = false, +- }; ++ led_strip_config_t s_cfg = { .strip_gpio_num = gpio_pin, .max_leds = 1 }; ++ led_strip_rmt_config_t r_cfg = { .resolution_hz = 10 * 1000 * 1000 }; + + esp_err_t ret = led_strip_new_rmt_device(&s_cfg, &r_cfg, &s_led_strip); + if (ret != ESP_OK) { +- ESP_LOGE(TAG, "Failed to create RMT LED strip: %s", esp_err_to_name(ret)); +- return; ++ ESP_LOGE(TAG, "RMT Device Init Failed: %s", esp_err_to_name(ret)); ++ } else { ++ ESP_LOGI(TAG, "RMT Device Init Success"); ++ led_strip_clear(s_led_strip); + } +- led_strip_clear(s_led_strip); + } else { + gpio_reset_pin(gpio_pin); + gpio_set_direction(gpio_pin, GPIO_MODE_OUTPUT); +- gpio_set_level(gpio_pin, 0); + } +- + xTaskCreate(led_task, "led_task", 2048, NULL, 5, NULL); + } + +-void status_led_set_state(led_state_t state) { +- s_current_state = state; +-} +- +-led_state_t status_led_get_state(void) { +- return s_current_state; +-} ++// ... Setters/Getters ... ++void status_led_set_state(led_state_t state) { s_current_state = state; } ++led_state_t status_led_get_state(void) { return s_current_state; } +diff --git a/main/board_config.h b/main/board_config.h +index 6e4aa28..05d7726 100644 +--- a/main/board_config.h ++++ b/main/board_config.h +@@ -42,7 +42,7 @@ + // ============================================================================ + // ESP32-C5 (DevKitC-1) 3.3V VCC Pin 1 GND PIN 15 + // ============================================================================ +- #define RGB_LED_GPIO 8 // Common addressable LED pin for C5 ++ #define RGB_LED_GPIO 27 // Common addressable LED pin for C5 + #define HAS_RGB_LED 1 + #define GPS_TX_PIN GPIO_NUM_24 + #define GPS_RX_PIN GPIO_NUM_23 diff --git a/main/idf_component.yml b/main/idf_component.yml new file mode 100644 index 0000000..7e9a4c3 --- /dev/null +++ b/main/idf_component.yml @@ -0,0 +1,17 @@ +## IDF Component Manager Manifest File +dependencies: + ## Required IDF version + idf: + version: '>=4.1.0' + # # Put list of dependencies here + # # For components maintained by Espressif: + # component: "~1.0.0" + # # For 3rd party components: + # username/component: ">=1.0.0,<2.0.0" + # username2/component2: + # version: "~1.0.0" + # # For transient dependencies `public` flag can be set. + # # `public` flag doesn't have an effect dependencies of the `main` component. + # # All dependencies of `main` are public by default. + # public: true + espressif/led_strip: ^2.5.3 diff --git a/map_usb_to_ip.py b/map_usb_to_ip.py deleted file mode 100755 index 48f23b8..0000000 --- a/map_usb_to_ip.py +++ /dev/null @@ -1,244 +0,0 @@ -#!/usr/bin/env python3 -""" -Map ESP32 USB ports to IP addresses -Creates and manages mapping between /dev/ttyUSB* and assigned IPs -""" - -import serial.tools.list_ports -import argparse -import json -import glob -import re -from pathlib import Path - -class USBIPMapper: - def __init__(self, start_ip="192.168.1.51", config_file="usb_ip_map.json"): - self.start_ip = start_ip - self.config_file = config_file - self.mapping = {} - - def get_ip_for_index(self, index): - """Calculate IP address for a given index""" - ip_parts = self.start_ip.split('.') - base_ip = int(ip_parts[3]) - ip_parts[3] = str(base_ip + index) - return '.'.join(ip_parts) - - def extract_usb_number(self, port): - """Extract number from /dev/ttyUSBX""" - match = re.search(r'ttyUSB(\d+)', port) - if match: - return int(match.group(1)) - return None - - def detect_devices(self): - """Detect all ESP32 USB devices and create mapping""" - devices = sorted(glob.glob('/dev/ttyUSB*')) - - print(f"\n{'='*70}") - print(f"ESP32 USB to IP Address Mapping") - print(f"{'='*70}") - print(f"Start IP: {self.start_ip}") - print(f"Detected {len(devices)} USB device(s)\n") - - self.mapping = {} - - for idx, port in enumerate(devices): - usb_num = self.extract_usb_number(port) - ip = self.get_ip_for_index(idx) - - # Get device info - try: - ports = serial.tools.list_ports.comports() - device_info = next((p for p in ports if p.device == port), None) - if device_info: - serial_num = device_info.serial_number or "Unknown" - description = device_info.description or "Unknown" - else: - serial_num = "Unknown" - description = "Unknown" - except: - serial_num = "Unknown" - description = "Unknown" - - self.mapping[port] = { - 'index': idx, - 'usb_number': usb_num, - 'ip': ip, - 'serial': serial_num, - 'description': description - } - - print(f"[{idx:2d}] {port:14s} → {ip:15s} (USB{usb_num}, SN: {serial_num})") - - print(f"\n{'='*70}") - print(f"Total: {len(devices)} devices mapped") - print(f"IP Range: {self.mapping[devices[0]]['ip']} - {self.mapping[devices[-1]]['ip']}" if devices else "") - print(f"{'='*70}\n") - - return self.mapping - - def save_mapping(self): - """Save mapping to JSON file""" - with open(self.config_file, 'w') as f: - json.dump(self.mapping, f, indent=2) - print(f"✓ Mapping saved to {self.config_file}") - - def load_mapping(self): - """Load mapping from JSON file""" - try: - with open(self.config_file, 'r') as f: - self.mapping = json.load(f) - print(f"✓ Mapping loaded from {self.config_file}") - return self.mapping - except FileNotFoundError: - print(f"✗ No saved mapping found at {self.config_file}") - return {} - - def get_ip(self, port): - """Get IP address for a specific USB port""" - if port in self.mapping: - return self.mapping[port]['ip'] - return None - - def get_port(self, ip): - """Get USB port for a specific IP address""" - for port, info in self.mapping.items(): - if info['ip'] == ip: - return port - return None - - def print_mapping(self): - """Print current mapping""" - if not self.mapping: - print("No mapping loaded. Run with --detect first.") - return - - print(f"\n{'='*70}") - print(f"Current USB to IP Mapping") - print(f"{'='*70}") - for port, info in sorted(self.mapping.items(), key=lambda x: x[1]['index']): - print(f"[{info['index']:2d}] {port:14s} → {info['ip']:15s} (USB{info['usb_number']})") - print(f"{'='*70}\n") - - def export_bash_script(self, filename="usb_ip_vars.sh"): - """Export mapping as bash variables""" - with open(filename, 'w') as f: - f.write("#!/bin/bash\n") - f.write("# USB to IP mapping - Auto-generated\n\n") - - # Create associative array - f.write("declare -A USB_TO_IP\n") - for port, info in self.mapping.items(): - f.write(f"USB_TO_IP[{port}]=\"{info['ip']}\"\n") - - f.write("\n# Create reverse mapping\n") - f.write("declare -A IP_TO_USB\n") - for port, info in self.mapping.items(): - f.write(f"IP_TO_USB[{info['ip']}]=\"{port}\"\n") - - f.write("\n# Helper functions\n") - f.write("get_ip_for_usb() { echo \"${USB_TO_IP[$1]}\"; }\n") - f.write("get_usb_for_ip() { echo \"${IP_TO_USB[$1]}\"; }\n") - - print(f"✓ Bash script exported to {filename}") - print(f" Usage: source {filename} && get_ip_for_usb /dev/ttyUSB0") - -def main(): - parser = argparse.ArgumentParser( - description='Map ESP32 USB ports to IP addresses', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Detect devices and create mapping - %(prog)s --detect - - # Detect and save to file - %(prog)s --detect --save - - # Load saved mapping and display - %(prog)s --load --print - - # Get IP for specific USB port - %(prog)s --load --port /dev/ttyUSB5 - - # Get USB port for specific IP - %(prog)s --load --ip 192.168.1.55 - - # Export as bash script - %(prog)s --load --export - - # Use custom IP range - %(prog)s --detect --start-ip 10.0.0.100 - """ - ) - - parser.add_argument('--detect', action='store_true', - help='Detect USB devices and create mapping') - parser.add_argument('--save', action='store_true', - help='Save mapping to file') - parser.add_argument('--load', action='store_true', - help='Load mapping from file') - parser.add_argument('--print', action='store_true', - help='Print current mapping') - parser.add_argument('--start-ip', default='192.168.1.51', - help='Starting IP address (default: 192.168.1.51)') - parser.add_argument('--config', default='usb_ip_map.json', - help='Config file path (default: usb_ip_map.json)') - parser.add_argument('--port', metavar='PORT', - help='Get IP for specific USB port (e.g., /dev/ttyUSB5)') - parser.add_argument('--ip', metavar='IP', - help='Get USB port for specific IP address') - parser.add_argument('--export', action='store_true', - help='Export mapping as bash script') - - args = parser.parse_args() - - mapper = USBIPMapper(start_ip=args.start_ip, config_file=args.config) - - # Detect devices - if args.detect: - mapper.detect_devices() - if args.save: - mapper.save_mapping() - - # Load mapping - if args.load: - mapper.load_mapping() - - # Print mapping - if args.print: - mapper.print_mapping() - - # Query specific port - if args.port: - if not mapper.mapping: - mapper.load_mapping() - ip = mapper.get_ip(args.port) - if ip: - print(f"{args.port} → {ip}") - else: - print(f"Port {args.port} not found in mapping") - - # Query specific IP - if args.ip: - if not mapper.mapping: - mapper.load_mapping() - port = mapper.get_port(args.ip) - if port: - print(f"{args.ip} → {port}") - else: - print(f"IP {args.ip} not found in mapping") - - # Export bash script - if args.export: - if not mapper.mapping: - mapper.load_mapping() - mapper.export_bash_script() - - # Default: detect and print - if not any([args.detect, args.load, args.print, args.port, args.ip, args.export]): - mapper.detect_devices() - -if __name__ == '__main__': - main() diff --git a/mass_deploy.py b/mass_deploy.py deleted file mode 100755 index 9cf3f87..0000000 --- a/mass_deploy.py +++ /dev/null @@ -1,319 +0,0 @@ -#!/usr/bin/env python3 -""" -ESP32 Mass Deployment Tool (Fixed for Parallel Flashing & Path Issues) -Uses esptool.py from the build directory to resolve relative paths correctly. -""" - -import os -import sys -import subprocess -import glob -import time -import argparse -import serial -from concurrent.futures import ThreadPoolExecutor, as_completed -from pathlib import Path - -class Colors: - RED = '\033[0;31m' - GREEN = '\033[0;32m' - YELLOW = '\033[1;33m' - BLUE = '\033[0;34m' - NC = '\033[0m' - -class DeviceDeployer: - def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51", - netmask="255.255.255.0", gateway="192.168.1.1", - baud_rate=460800, max_retries=2, verify_ping=True, - num_devices=None, verbose=False, parallel=True): - - self.project_dir = Path(project_dir).resolve() # Absolute path is safer - self.build_dir = self.project_dir / 'build' - - self.ssid = ssid - self.password = password - self.start_ip = start_ip - self.netmask = netmask - self.gateway = gateway - self.baud_rate = baud_rate - self.max_retries = max_retries - self.verify_ping = verify_ping - self.num_devices = num_devices - self.verbose = verbose - self.parallel = parallel - - self.config_mode = (self.ssid is not None and self.password is not None) - - if self.start_ip: - ip_parts = start_ip.split('.') - self.ip_base = '.'.join(ip_parts[:3]) - self.ip_start = int(ip_parts[3]) - else: - self.ip_base = "192.168.1" - self.ip_start = 51 - - self.devices = [] - self.results = {} - self.log_dir = Path('/tmp') - - def print_banner(self): - print() - print(f"{Colors.BLUE}{'='*70}") - print("ESP32 Mass Deployment Tool") - print(f"{'='*70}{Colors.NC}") - print(f"Project: {self.project_dir}") - print(f"Build Dir: {self.build_dir}") - if self.config_mode: - print(f"Mode: {Colors.YELLOW}FLASH + CONFIGURE{Colors.NC}") - print(f"SSID: {self.ssid}") - print(f"Password: {'*' * len(self.password)}") - print(f"Start IP: {self.start_ip}") - else: - print(f"Mode: {Colors.GREEN}FLASH ONLY (Preserve NVS){Colors.NC}") - - print(f"Flash Baud: {self.baud_rate}") - print(f"Parallel: {self.parallel}") - if self.num_devices: - print(f"Max Devices: {self.num_devices}") - print(f"{Colors.BLUE}{'='*70}{Colors.NC}") - - def build_firmware(self): - print() - print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}") - try: - subprocess.run( - ['idf.py', 'build'], - cwd=self.project_dir, - check=True, - capture_output=not self.verbose - ) - - flash_args_path = self.build_dir / 'flash_args' - if not flash_args_path.exists(): - print(f"{Colors.RED}Error: build/flash_args not found.{Colors.NC}") - return False - - print(f"{Colors.GREEN}✓ Build complete{Colors.NC}") - return True - except subprocess.CalledProcessError as e: - print(f"{Colors.RED}✗ Build failed!{Colors.NC}") - if self.verbose: print(e.stderr.decode() if e.stderr else "") - return False - - def detect_devices(self): - print() - print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}") - self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')) - - if not self.devices: - print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}") - return False - - if self.num_devices and len(self.devices) > self.num_devices: - print(f"Limiting to first {self.num_devices} devices") - self.devices = self.devices[:self.num_devices] - - print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}") - for i, device in enumerate(self.devices): - if self.config_mode: - print(f" [{i:2d}] {device:14s} → {self.get_ip_for_index(i)}") - else: - print(f" [{i:2d}] {device:14s} → (Existing IP)") - return True - - def get_ip_for_index(self, index): - return f"{self.ip_base}.{self.ip_start + index}" - - def flash_and_configure(self, index, device): - target_ip = self.get_ip_for_index(index) if self.config_mode else "Existing IP" - log_file = self.log_dir / f"esp32_deploy_{index}.log" - log_lines = [] - flash_args_file = 'flash_args' # Relative to build_dir - - def log(msg): - log_lines.append(msg) - if self.verbose or not self.parallel: - print(f"[{index}] {msg}") - - for attempt in range(1, self.max_retries + 1): - log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===") - - # --- FLASHING --- - log("Flashing via esptool...") - try: - cmd = [ - 'esptool.py', - '-p', device, - '-b', str(self.baud_rate), - '--before', 'default_reset', - '--after', 'hard_reset', - 'write_flash', - f"@{flash_args_file}" - ] - - # CRITICAL FIX: Run from build_dir so relative paths in flash_args are valid - result = subprocess.run( - cmd, - cwd=self.build_dir, - check=True, - capture_output=True, - timeout=300 - ) - log("✓ Flash successful") - except subprocess.CalledProcessError as e: - log(f"✗ Flash failed: {e.stderr.decode() if e.stderr else 'Unknown error'}") - if attempt == self.max_retries: - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines} - time.sleep(2) - continue - except subprocess.TimeoutExpired: - log("✗ Flash timeout") - if attempt == self.max_retries: - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'TIMEOUT', 'log': log_lines} - continue - - # --- CONFIGURATION --- - log("Waiting for boot (3s)...") - time.sleep(3) - - if self.config_mode: - log(f"Configuring WiFi ({target_ip})...") - try: - config = ( - f"CFG\n" - f"SSID:{self.ssid}\n" - f"PASS:{self.password}\n" - f"IP:{target_ip}\n" - f"MASK:{self.netmask}\n" - f"GW:{self.gateway}\n" - f"DHCP:0\n" - f"END\n" - ) - - with serial.Serial(device, 115200, timeout=2, write_timeout=2) as ser: - ser.reset_input_buffer() - ser.write(config.encode('utf-8')) - ser.flush() - - log("✓ Config sent") - - if self.verify_ping: - log("Waiting for network (6s)...") - time.sleep(6) - log(f"Pinging {target_ip}...") - try: - res = subprocess.run(['ping', '-c', '2', '-W', '3', target_ip], capture_output=True, timeout=10) - if res.returncode == 0: - log("✓ Ping successful") - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines} - else: - log("✗ Ping failed") - except: - log("✗ Ping error") - else: - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines} - - except Exception as e: - log(f"✗ Config error: {e}") - else: - log("Configuration skipped (Preserving NVS)") - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines} - - time.sleep(2) - - with open(log_file, 'w') as f: f.write('\n'.join(log_lines)) - return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines} - - def deploy_all_parallel(self): - print() - print(f"{Colors.YELLOW}[3/4] Flashing (parallel)...{Colors.NC}") - max_workers = min(10, len(self.devices)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = { - executor.submit(self.flash_and_configure, i, device): (i, device) - for i, device in enumerate(self.devices) - } - for future in as_completed(futures): - result = future.result() - self.results[result['index']] = result - self.print_device_status(result) - - def deploy_all_sequential(self): - print() - print(f"{Colors.YELLOW}[3/4] Flashing (sequential)...{Colors.NC}") - for i, device in enumerate(self.devices): - print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}") - result = self.flash_and_configure(i, device) - self.results[result['index']] = result - self.print_device_status(result) - - def print_device_status(self, result): - status_color = { - 'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW, - 'FAILED': Colors.RED, 'TIMEOUT': Colors.RED - }.get(result['status'], Colors.RED) - print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s} → {result['ip']:15s} [{result['status']}]{Colors.NC}") - - def deploy_all(self): - if self.parallel: self.deploy_all_parallel() - else: self.deploy_all_sequential() - - def print_summary(self): - print() - print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}") - print(f"{Colors.BLUE}{'='*70}{Colors.NC}") - success = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS') - failed = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT']) - for i in range(len(self.devices)): - if i in self.results: - r = self.results[i] - icon = f"{Colors.GREEN}✓{Colors.NC}" if r['status'] == 'SUCCESS' else f"{Colors.RED}✗{Colors.NC}" - print(f"{icon} {r['device']:14s} → {r['ip']}") - print(f"{Colors.BLUE}{'='*70}{Colors.NC}") - print(f"Total: {len(self.devices)}") - print(f"Success: {success}") - print(f"Failed: {failed}") - return failed - -def main(): - parser = argparse.ArgumentParser(description='ESP32 Mass Deployment Tool') - parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project dir') - parser.add_argument('-s', '--ssid', help='WiFi SSID') - parser.add_argument('-p', '--password', help='WiFi Password') - parser.add_argument('--start-ip', default='192.168.1.51', help='Starting IP') - parser.add_argument('-n', '--num-devices', type=int, default=30, help='Max devices') - parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP') - parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask') - parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud rate') - parser.add_argument('-r', '--retries', type=int, default=2, help='Retries') - parser.add_argument('--no-verify', action='store_true', help='Skip ping check') - parser.add_argument('--sequential', action='store_true', help='Run sequentially') - parser.add_argument('-v', '--verbose', action='store_true', help='Verbose') - args = parser.parse_args() - - if (args.ssid and not args.password) or (args.password and not args.ssid): - print(f"{Colors.RED}ERROR: Provide both SSID and Password for config, or neither for flash-only.{Colors.NC}") - sys.exit(1) - - deployer = DeviceDeployer( - project_dir=args.dir, ssid=args.ssid, password=args.password, - start_ip=args.start_ip, netmask=args.netmask, gateway=args.gateway, - baud_rate=args.baud, max_retries=args.retries, verify_ping=not args.no_verify, - num_devices=args.num_devices if args.num_devices > 0 else None, - verbose=args.verbose, parallel=not args.sequential - ) - - deployer.print_banner() - if not deployer.build_firmware(): sys.exit(1) - if not deployer.detect_devices(): sys.exit(1) - deployer.deploy_all() - failed_count = deployer.print_summary() - sys.exit(failed_count) - -if __name__ == '__main__': - main() diff --git a/new_rules.part b/new_rules.part new file mode 100644 index 0000000..85cccea --- /dev/null +++ b/new_rules.part @@ -0,0 +1,21 @@ + +# --- Added by update script --- +SUBSYSTEM=="tty", ATTRS{serial}=="04d3540e9223f011aefcbef12a319464", SYMLINK+="esp_port_01" +SUBSYSTEM=="tty", ATTRS{serial}=="0aa1c0a3a323f0118893c2f12a319464", SYMLINK+="esp_port_02" +SUBSYSTEM=="tty", ATTRS{serial}=="1ca7d2748a23f0118d9db8f12a319464", SYMLINK+="esp_port_03" +SUBSYSTEM=="tty", ATTRS{serial}=="1e8972fca871f011ae8af99e1045c30f", SYMLINK+="esp_port_04" +SUBSYSTEM=="tty", ATTRS{serial}=="263cd138a871f011bcecff9e1045c30f", SYMLINK+="esp_port_05" +SUBSYSTEM=="tty", ATTRS{serial}=="28e8c61a8523f011a56ac2f12a319464", SYMLINK+="esp_port_06" +SUBSYSTEM=="tty", ATTRS{serial}=="38717231a723f011a006c3f12a319464", SYMLINK+="esp_port_07" +SUBSYSTEM=="tty", ATTRS{serial}=="3e1afd689523f011b363baf12a319464", SYMLINK+="esp_port_08" +SUBSYSTEM=="tty", ATTRS{serial}=="4a6d2844a071f011bceaff9e1045c30f", SYMLINK+="esp_port_09" +SUBSYSTEM=="tty", ATTRS{serial}=="4e52608ba171f011af3ffb9e1045c30f", SYMLINK+="esp_port_10" +SUBSYSTEM=="tty", ATTRS{serial}=="4eaecac7a371f011be18fb9e1045c30f", SYMLINK+="esp_port_11" +SUBSYSTEM=="tty", ATTRS{serial}=="600faabf9a71f01195bbfb9e1045c30f", SYMLINK+="esp_port_12" +SUBSYSTEM=="tty", ATTRS{serial}=="640c56829723f01183c1bef12a319464", SYMLINK+="esp_port_13" +SUBSYSTEM=="tty", ATTRS{serial}=="904f28aede6ef011beac4d9b1045c30f", SYMLINK+="esp_port_14" +SUBSYSTEM=="tty", ATTRS{serial}=="A5069RR4", SYMLINK+="esp_port_15" +SUBSYSTEM=="tty", ATTRS{serial}=="a86592298f23f01199b0c2f12a319464", SYMLINK+="esp_port_16" +SUBSYSTEM=="tty", ATTRS{serial}=="ba2efc7fa071f0119aa1fb9e1045c30f", SYMLINK+="esp_port_17" +SUBSYSTEM=="tty", ATTRS{serial}=="fc641417fbf4ef11bd28a1a29ed47d52", SYMLINK+="esp_port_18" +SUBSYSTEM=="tty", ATTRS{serial}=="fedb86249723f01198ebc2f12a319464", SYMLINK+="esp_port_19" diff --git a/reconfig_simple.py b/reconfig_simple.py deleted file mode 100755 index 4e085ae..0000000 --- a/reconfig_simple.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/env python3 -""" -Simple ESP32 WiFi Reconfiguration Tool -Sends WiFi config to all connected ESP32 devices via serial -""" - -import serial -import time -import glob -import argparse -import sys - -def reconfig_devices(ssid, password, start_ip, gateway="192.168.1.1", - netmask="255.255.255.0", verbose=False): - """Reconfigure all connected devices""" - - devices = sorted(glob.glob('/dev/ttyUSB*')) - num_devices = len(devices) - - if num_devices == 0: - print("ERROR: No devices found!") - return 0 - - # Parse start IP - ip_parts = start_ip.split('.') - ip_base = '.'.join(ip_parts[:3]) - ip_start = int(ip_parts[3]) - - ok_devices = 0 - - print(f"Found {num_devices} devices") - print(f"SSID: {ssid}") - print(f"Password: {'*' * len(password)}") - print(f"IP Range: {ip_base}.{ip_start} - {ip_base}.{ip_start + num_devices - 1}") - print() - - for idx, dev in enumerate(devices): - ip = f"{ip_base}.{ip_start + idx}" - print(f"[{idx:2d}] Configuring {dev:14s} → {ip}", end='') - - try: - ser = serial.Serial(dev, 115200, timeout=1) - time.sleep(0.5) # Let serial port stabilize - - # Send configuration - ser.write(b"CFG\n") - time.sleep(0.1) - ser.write(f"SSID:{ssid}\n".encode()) - time.sleep(0.1) - ser.write(f"PASS:{password}\n".encode()) - time.sleep(0.1) - ser.write(f"IP:{ip}\n".encode()) - time.sleep(0.1) - ser.write(f"MASK:{netmask}\n".encode()) - time.sleep(0.1) - ser.write(f"GW:{gateway}\n".encode()) - time.sleep(0.1) - ser.write(b"DHCP:0\n") - time.sleep(0.1) - ser.write(b"END\n") - - # Wait for OK response - time.sleep(0.5) - response = ser.read(100).decode('utf-8', errors='ignore') - - if verbose and response.strip(): - print(f"\n Response: {response[:80]}") - - if 'OK' in response: - print(" ✓") - ok_devices += 1 - else: - print(" ⚠ (no OK)") - - ser.close() - - except Exception as e: - print(f" ✗ Error: {e}") - - time.sleep(0.5) - - print() - print(f"{'='*60}") - print(f"Success: {ok_devices}/{num_devices}") - print(f"Failed: {num_devices - ok_devices}/{num_devices}") - print(f"{'='*60}") - - return ok_devices - -def main(): - parser = argparse.ArgumentParser( - description='Reconfigure WiFi settings on all connected ESP32 devices', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Basic usage with defaults - %(prog)s - - # Custom IP range - %(prog)s --start-ip 192.168.1.100 - - # Custom WiFi credentials - %(prog)s -s MyNetwork -p mypassword - - # Different subnet - %(prog)s --start-ip 10.0.0.50 -g 10.0.0.1 - - # Verbose mode - %(prog)s -v - """ - ) - - parser.add_argument('-s', '--ssid', default='ClubHouse2G', - help='WiFi SSID (default: ClubHouse2G)') - parser.add_argument('-p', '--password', default='ez2remember', - help='WiFi password (default: ez2remember)') - parser.add_argument('--start-ip', default='192.168.1.51', - help='Starting IP address (default: 192.168.1.51)') - parser.add_argument('-g', '--gateway', default='192.168.1.1', - help='Gateway IP (default: 192.168.1.1)') - parser.add_argument('-m', '--netmask', default='255.255.255.0', - help='Network mask (default: 255.255.255.0)') - parser.add_argument('-v', '--verbose', action='store_true', - help='Show device responses') - parser.add_argument('-w', '--wait', type=int, default=30, - help='Seconds to wait for connections (default: 30)') - - args = parser.parse_args() - - # Reconfigure all devices - ok_count = reconfig_devices( - ssid=args.ssid, - password=args.password, - start_ip=args.start_ip, - gateway=args.gateway, - netmask=args.netmask, - verbose=args.verbose - ) - - # Wait for connections - if ok_count > 0: - print(f"\nWaiting {args.wait}s for WiFi connections...") - time.sleep(args.wait) - print("Done!") - print() - print("Test commands:") - - # Extract IP info - ip_parts = args.start_ip.split('.') - ip_base = '.'.join(ip_parts[:3]) - ip_start = int(ip_parts[3]) - num_devices = len(sorted(glob.glob('/dev/ttyUSB*'))) - - print(f" # Ping all devices") - print(f" for i in {{{ip_start}..{ip_start + num_devices - 1}}}; do ping -c 1 {ip_base}.$i & done; wait") - print() - print(f" # Check device status") - print(f" ./check_device_status.py --reset") - print() - print(f" # Test first device") - print(f" iperf -c {ip_base}.{ip_start}") - print() - - sys.exit(0 if ok_count > 0 else 1) - -if __name__ == '__main__': - main() diff --git a/reconfig_simple_nextip.py b/reconfig_simple_nextip.py deleted file mode 100755 index 35e40c5..0000000 --- a/reconfig_simple_nextip.py +++ /dev/null @@ -1,315 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import glob -import re -import serial -import time -import sys -from serial.tools import list_ports -import json -import os - -DEFAULT_PATTERN = "/dev/ttyUSB*" -MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json") - -YELLOW_TOKENS = [ - "NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG", - "YELLOW", "LED_STATE_NO_CONFIG" -] - -IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE) - -def eprint(*a, **kw): - print(*a, file=sys.stderr, **kw) - -def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None): - ser.timeout = timeout - ser.write_timeout = timeout - - def now(): return time.time() - - def read_and_collect(sleep_s=0.05): - buf = b"" - # sleep but respect deadline - t_end = now() + sleep_s - while now() < t_end: - time.sleep(0.01) - try: - while True: - if deadline and now() >= deadline: break - chunk = ser.read(256) - if not chunk: - break - buf += chunk - except Exception: - pass - return buf.decode('utf-8', errors='ignore') - - text = "" - # initial settle - t_end = now() + settle - while now() < t_end: - time.sleep(0.01) - text += read_and_collect(0.0) - - # probes - for cmd in probes: - if deadline and now() >= deadline: break - try: - ser.write(cmd) - except Exception: - pass - text += read_and_collect(0.1) - - if verbose and text.strip(): - eprint("--- STATUS TEXT BEGIN ---") - eprint(text) - eprint("--- STATUS TEXT END ---") - - utext = text.upper() - return any(tok in utext for tok in YELLOW_TOKENS), text - - -def parse_ip_from_text(text): - for m in IP_REGEX.finditer(text or ""): - ip = m.group(1) - try: - octs = [int(x) for x in ip.split(".")] - if all(0 <= x <= 255 for x in octs): - return ip - except Exception: - pass - return None - - -def next_free_ip(used_last_octets, start_ip_octet, max_octet=254): - x = start_ip_octet - while x <= max_octet: - if x not in used_last_octets: - used_last_octets.add(x) - return x - x += 1 - raise RuntimeError("No free IPs left in the range") - - -def load_map(path): - if os.path.exists(path): - try: - with open(path, "r") as f: - return json.load(f) - except Exception: - return {} - return {} - - -def usb_serial_for_port(dev): - for p in list_ports.comports(): - if p.device == dev: - return p.serial_number or p.hwid or dev - return dev - - -def configure_device(ser, ssid, password, ip, dhcp, verbose=False): - def writeln(s): - if isinstance(s, str): - s = s.encode() - ser.write(s + b"\n") - time.sleep(0.05) - - time.sleep(0.15) - - writeln("CFG") - writeln(f"SSID:{ssid}") - writeln(f"PASS:{password}") - if dhcp: - writeln("DHCP:1") - else: - writeln(f"IP:{ip}") - writeln("MASK:255.255.255.0") - writeln("GW:192.168.1.1") - writeln("DHCP:0") - writeln("END") - - time.sleep(0.2) - resp = b"" - try: - while True: - chunk = ser.read(256) - if not chunk: - break - resp += chunk - except Exception: - pass - text = resp.decode('utf-8', errors='ignore') - if verbose and text.strip(): - eprint("--- CONFIG RESPONSE BEGIN ---") - eprint(text) - eprint("--- CONFIG RESPONSE END ---") - - ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper()) - return ok, text - - -def main(): - parser = argparse.ArgumentParser( - description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex." - ) - parser.add_argument("--ssid", default="ClubHouse2G", help="Wi‑Fi SSID") - parser.add_argument("--password", default="ez2remember", help="Wi‑Fi password") - parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})") - parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'") - parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate") - parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)") - parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)") - parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe") - parser.add_argument("--only-yellow", action="store_true", - help="Only program devices that appear to be in 'no Wi‑Fi config' (solid yellow) state") - parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP") - parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)") - parser.add_argument("--persist-map", action="store_true", - help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs") - parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)") - parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit") - parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr") - parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen") - args = parser.parse_args() - - if args.list: - print("Ports:") - for p in list_ports.comports(): - print(f" {p.device:>12} sn={p.serial_number} desc={p.description}") - return - - devices = sorted(glob.glob(args.pattern)) - if args.exclude: - devices = [d for d in devices if not re.search(args.exclude, d)] - - print(f"Found {len(devices)} devices matching {args.pattern}", flush=True) - if args.exclude: - print(f"Excluding devices matching /{args.exclude}/", flush=True) - - ip_map = load_map(MAP_FILE) if args.persist_map else {} - used_last_octets = set() - - prepass_info = {} - for i, dev in enumerate(devices): - print(f"[pre] {i+1}/{len(devices)} probing {dev} …", flush=True) - start_t = time.time() - already_ip = None - no_cfg = False - try: - ser = serial.Serial( - dev, - args.baud, - timeout=args.timeout, - write_timeout=args.timeout, - rtscts=False, - dsrdtr=False, - xonxoff=False, - ) - # gentle DTR/RTS toggle - try: - ser.dtr = False; ser.rts = False; time.sleep(0.02) - ser.dtr = True; ser.rts = True; time.sleep(0.02) - except Exception: - pass - - probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n") - deadline = start_t + max(0.4, args.per_device_cap) - no_cfg, text = detect_no_config( - ser, verbose=args.verbose, settle=args.settle, - timeout=args.timeout, probes=probes, deadline=deadline - ) - already_ip = parse_ip_from_text(text) - ser.close() - except Exception as e: - eprint(f" [warn] {dev} probe error: {e}") - dur = time.time() - start_t - print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True) - - prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip} - if already_ip and not args.dhcp: - try: - last = int(already_ip.split(".")[-1]) - used_last_octets.add(last) - except Exception: - pass - - ok_devices = 0 - skipped = 0 - errors = 0 - - for idx, dev in enumerate(devices): - info = prepass_info.get(dev, {}) - already_ip = info.get("ip") - no_cfg = info.get("no_cfg", False) - usb_key = usb_serial_for_port(dev) - - if already_ip and not args.dhcp: - print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True) - skipped += 1 - if args.persist_map: - ip_map[usb_key] = already_ip - continue - - if args.only_yellow and not no_cfg: - print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True) - skipped += 1 - continue - - # pick target IP - if args.dhcp: - target_ip = None - mode = "DHCP" - else: - target_last_octet = None - if args.persist_map and usb_key in ip_map: - try: - prev_ip = ip_map[usb_key] - target_last_octet = int(prev_ip.split(".")[-1]) - if target_last_octet in used_last_octets: - target_last_octet = None - except Exception: - target_last_octet = None - - if target_last_octet is None: - target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254) - target_ip = f"192.168.1.{target_last_octet}" - mode = f"Static {target_ip}" - - print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True) - if args.dry_run: - print(" (dry-run) Would send CFG/END", flush=True) - ok = True - else: - try: - ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout) - ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose) - ser.close() - except Exception as e: - print(f" ✗ Error opening/configuring: {e}", flush=True) - ok = False - - if ok: - print(" ✓ OK", flush=True) - ok_devices += 1 - if not args.dhcp and args.persist_map and target_ip: - ip_map[usb_key] = target_ip - else: - print(" ✗ Failed", flush=True) - errors += 1 - - time.sleep(0.05) - - if args.persist_map: - try: - with open(MAP_FILE, "w") as f: - json.dump(ip_map, f, indent=2, sort_keys=True) - print(f"Persisted mapping to {MAP_FILE}", flush=True) - except Exception as e: - print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True) - - print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True) - -if __name__ == "__main__": - main()