diff --git a/flash_all.py b/flash_all.py index 4fca28f..77540cd 100755 --- a/flash_all.py +++ b/flash_all.py @@ -1,308 +1,187 @@ #!/usr/bin/env python3 """ -ESP32 Mass Flash Script -Automatically detects, configures, and flashes multiple ESP32 devices with unique IPs -""" +ESP32 Mass Flash Script (filtered & robust) +Changes in this version: + - Filters out non-USB system serials (e.g., /dev/ttyS*) + - Only enumerates typical USB serial ports: /dev/ttyUSB* and /dev/ttyACM* + - Further filters to known USB-serial vendor IDs by default: + * FTDI 0x0403 + * SiliconLabs/CP210x 0x10C4 + * QinHeng/CH34x 0x1A86 + * Prolific PL2303 0x067B + * Espressif native 0x303A + - Adds --ports to override selection (glob or comma-separated patterns) + - Uses detect_esp32.detect_chip_type(port) for exact chip string + - Maps to correct idf.py target before flashing + +Example: + python3 flash_all.py --project /path/to/project --start-ip 192.168.1.50 + python3 flash_all.py --project . --ports '/dev/ttyUSB*,/dev/ttyACM*' +""" +import argparse +import fnmatch +import glob +import os import subprocess import sys -import os -import time -import argparse from pathlib import Path +from typing import List, Dict -# Import the detection script -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +try: + import serial.tools.list_ports as list_ports +except Exception: + print("pyserial is required: pip install pyserial") + raise + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, SCRIPT_DIR) try: import detect_esp32 -except ImportError: - print("Error: detect_esp32.py must be in the same directory") +except Exception as e: + print("Error: detect_esp32.py must be in the same directory and importable.") + print(f"Import error: {e}") sys.exit(1) +# Known USB VIDs commonly used for ESP32 dev boards/adapters +KNOWN_VIDS = {0x0403, 0x10C4, 0x1A86, 0x067B, 0x303A} -# Device type detection based on USB chip -def detect_device_type(port_info): - """ - Try to detect ESP32 variant based on USB chip and other heuristics. - Returns: 'esp32', 'esp32s2', 'esp32s3', or 'unknown' - """ - # Espressif's own USB JTAG is used in ESP32-C3 and ESP32-S3 - if port_info.vid == 0x303A: - if port_info.pid == 0x1001: - return 'esp32s3' # Most likely S3 - return 'esp32s3' # Default to S3 for Espressif USB - - # For FTDI and CP210x, we need to probe the chip - # Default assumption based on quantity in your setup - # You may need to adjust this logic - return 'esp32' # Default to ESP32 for FTDI/CP210x - - -def probe_chip_type(port): - """ - Probe the actual chip type using esptool.py - """ - try: - result = subprocess.run( - ['esptool.py', '--port', port, 'chip_id'], - capture_output=True, - text=True, - timeout=10 - ) - - output = result.stdout + result.stderr - - if 'ESP32-S3' in output: - return 'esp32s3' - elif 'ESP32-S2' in output: - return 'esp32s2' - elif 'ESP32-C3' in output: - return 'esp32c3' - elif 'ESP32' in output: - return 'esp32' - - except Exception as e: - print(f" Warning: Could not probe {port}: {e}") - +def map_chip_to_idf_target(chip_str: str) -> str: + if not chip_str or chip_str == 'Unknown': + return 'unknown' + s = chip_str.upper() + if s.startswith('ESP32-S3'): + return 'esp32s3' + if s.startswith('ESP32-S2'): + return 'esp32s2' + if s.startswith('ESP32-C3'): + return 'esp32c3' + if s.startswith('ESP32-C6'): + return 'esp32c6' + if s.startswith('ESP32-H2'): + return 'esp32h2' + if s.startswith('ESP32'): + return 'esp32' return 'unknown' +def run(cmd: List[str], cwd: str = None, check: bool = True) -> int: + print(">>", " ".join(cmd)) + proc = subprocess.run(cmd, cwd=cwd) + if check and proc.returncode != 0: + raise RuntimeError(f"Command failed with code {proc.returncode}: {' '.join(cmd)}") + return proc.returncode -def create_sdkconfig(project_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'): - """ - Create sdkconfig.defaults file with WiFi and IP configuration - """ - sdkconfig_path = os.path.join(project_dir, 'sdkconfig.defaults') - - config_content = f"""# WiFi Configuration -CONFIG_WIFI_SSID="{ssid}" -CONFIG_WIFI_PASSWORD="{password}" -CONFIG_WIFI_MAXIMUM_RETRY=5 +def ensure_target(project_dir: str, target: str): + if not target or target == 'unknown': + raise ValueError("Unknown IDF target; cannot set-target.") + run(['idf.py', 'set-target', target], cwd=project_dir, check=True) -# Static IP Configuration -CONFIG_USE_STATIC_IP=y -CONFIG_STATIC_IP_ADDR="{ip_addr}" -CONFIG_STATIC_GATEWAY_ADDR="{gateway}" -CONFIG_STATIC_NETMASK_ADDR="{netmask}" -""" - - with open(sdkconfig_path, 'w') as f: - f.write(config_content) - - print(f" Created sdkconfig.defaults with IP {ip_addr}") - - -def flash_device(port, chip_type, device_num, ssid, password, base_ip, project_dir): - """ - Configure and flash a single device - """ - print(f"\n{'='*60}") - print(f"Device {device_num}: {port} ({chip_type})") - print(f"{'='*60}") - - # Calculate IP address - base_parts = base_ip.split('.') - ip_last_octet = int(base_parts[3]) + device_num - 1 - - if ip_last_octet > 254: - print(f" ERROR: IP address overflow! Device {device_num} would exceed .254") +def flash_device(project_dir: str, port: str, idf_target: str, baud: int = 460800) -> bool: + try: + ensure_target(project_dir, idf_target) + run(['idf.py', '-p', port, '-b', str(baud), 'flash'], cwd=project_dir, check=True) + return True + except Exception as e: + print(f" Flash failed on {port}: {e}") return False - - ip_addr = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last_octet}" - - print(f" Assigned IP: {ip_addr}") - - # Create sdkconfig.defaults - create_sdkconfig(project_dir, ssid, password, ip_addr) - - # Clean previous build if target changed - print(" Cleaning previous build...") - subprocess.run(['idf.py', 'fullclean'], cwd=project_dir, - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - # Set target - print(f" Setting target to {chip_type}...") - result = subprocess.run( - ['idf.py', 'set-target', chip_type], - cwd=project_dir, - capture_output=True, - text=True - ) - - if result.returncode != 0: - print(f" ERROR: Failed to set target: {result.stderr}") - return False - - # Build - print(f" Building for {chip_type}...") - result = subprocess.run( - ['idf.py', 'build'], - cwd=project_dir, - capture_output=True, - text=True - ) - - if result.returncode != 0: - print(f" ERROR: Build failed!") - print(result.stderr[-1000:]) # Print last 1000 chars of error - return False - - print(f" Build successful!") - - # Flash - print(f" Flashing to {port}...") - result = subprocess.run( - ['idf.py', '-p', port, 'flash'], - cwd=project_dir, - capture_output=True, - text=True - ) - - if result.returncode != 0: - print(f" ERROR: Flash failed!") - print(result.stderr[-1000:]) - return False - - print(f" ✓ Successfully flashed device {device_num} at {ip_addr}") - return True +def match_any(path: str, patterns: List[str]) -> bool: + return any(fnmatch.fnmatch(path, pat) for pat in patterns) + +def list_ports_filtered(patterns: List[str] = None) -> List[object]: + """Return a filtered list of pyserial list_ports items.""" + ports = list(list_ports.comports()) + filtered = [] + for p in ports: + dev = p.device + # Default pattern filter: only /dev/ttyUSB* and /dev/ttyACM* + if patterns: + if not match_any(dev, patterns): + continue + else: + if not (dev.startswith('/dev/ttyUSB') or dev.startswith('/dev/ttyACM')): + continue + # VID filter (allow if vid is known or missing (some systems omit it), but exclude obvious non-USB) + vid = getattr(p, 'vid', None) + if vid is not None and vid not in KNOWN_VIDS: + # Skip unknown vendor to reduce noise; user can override with --ports + continue + filtered.append(p) + return filtered def main(): - parser = argparse.ArgumentParser(description='Mass flash ESP32 devices with unique IPs') - parser.add_argument('--ssid', required=True, help='WiFi SSID') - parser.add_argument('--password', required=True, help='WiFi password') - parser.add_argument('--start-ip', default='192.168.1.50', - help='Starting IP address (default: 192.168.1.50)') - parser.add_argument('--gateway', default='192.168.1.1', - help='Gateway IP (default: 192.168.1.1)') - parser.add_argument('--project-dir', default=None, - help='ESP32 iperf project directory') - parser.add_argument('--probe', action='store_true', - help='Probe each device to detect exact chip type (slower)') - parser.add_argument('--dry-run', action='store_true', - help='Show what would be done without flashing') - - args = parser.parse_args() - - # Find project directory - if args.project_dir: - project_dir = args.project_dir - else: - # Try to find it relative to script location - script_dir = os.path.dirname(os.path.abspath(__file__)) - project_dir = os.path.join(script_dir, 'esp32-iperf') - - if not os.path.exists(project_dir): - project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf') - - if not os.path.exists(project_dir): - print(f"ERROR: Project directory not found: {project_dir}") - print("Please specify --project-dir") + ap = argparse.ArgumentParser(description="Mass flash multiple ESP32 devices with proper chip detection.") + ap.add_argument('--project', required=True, help='Path to the ESP-IDF project to flash') + ap.add_argument('--ssid', help='WiFi SSID (optional)') + ap.add_argument('--password', help='WiFi password (optional)') + ap.add_argument('--start-ip', default='192.168.1.50', help='Base IP address for plan display') + ap.add_argument('--baud', type=int, default=460800, help='Flashing baud rate') + ap.add_argument('--dry-run', action='store_true', help='Plan only; do not flash') + ap.add_argument('--ports', help='Comma-separated glob(s), e.g. "/dev/ttyUSB*,/dev/ttyACM*" to override selection') + args = ap.parse_args() + + project_dir = os.path.abspath(args.project) + if not os.path.isdir(project_dir): + print(f"Project directory not found: {project_dir}") sys.exit(1) - - print(f"Using project directory: {project_dir}") - - # Detect devices - print("\nDetecting ESP32 devices...") - devices = detect_esp32.detect_esp32_devices() - + + patterns = None + if args.ports: + patterns = [pat.strip() for pat in args.ports.split(',') if pat.strip()] + + devices = list_ports_filtered(patterns) + print(f"Found {len(devices)} USB serial device(s) after filtering") if not devices: - print("No ESP32 devices detected!") - sys.exit(1) - - print(f"Found {len(devices)} device(s)") - - # Detect chip types - device_list = [] - for idx, device in enumerate(devices, 1): - if args.probe: - print(f"Probing {device.device}...") - chip_type = probe_chip_type(device.device) - else: - chip_type = detect_device_type(device) - + print("No candidate USB serial ports found. Try --ports '/dev/ttyUSB*,/dev/ttyACM*' or check permissions.") + + device_list: List[Dict] = [] + for idx, dev in enumerate(devices, 1): + port = dev.device + print(f"Probing {port} for exact chip...") + raw_chip = detect_esp32.detect_chip_type(port) + idf_target = map_chip_to_idf_target(raw_chip) + if idf_target == 'unknown': + print(f" WARNING: Could not determine idf.py target for {port} (got '{raw_chip}')") device_list.append({ 'number': idx, - 'port': device.device, - 'chip': chip_type, - 'info': device + 'port': port, + 'raw_chip': raw_chip, + 'idf_target': idf_target, + 'info': dev, }) - - # Display plan - print(f"\n{'='*60}") - print("FLASH PLAN") - print(f"{'='*60}") - print(f"SSID: {args.ssid}") - print(f"Starting IP: {args.start_ip}") - print(f"Gateway: {args.gateway}") - print() - - base_parts = args.start_ip.split('.') - for dev in device_list: - ip_last = int(base_parts[3]) + dev['number'] - 1 - ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}" - print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {ip}") - - if args.dry_run: - print("\nDry run - no devices will be flashed") - return - - # Confirm - print(f"\n{'='*60}") - response = input("Proceed with flashing? (yes/no): ").strip().lower() - if response != 'yes': - print("Aborted.") - return - - # Flash devices - success_count = 0 - failed_devices = [] - - for dev in device_list: - try: - success = flash_device( - dev['port'], - dev['chip'], - dev['number'], - args.ssid, - args.password, - args.start_ip, - project_dir - ) - - if success: - success_count += 1 - else: - failed_devices.append(dev['number']) - - time.sleep(1) # Brief pause between devices - - except KeyboardInterrupt: - print("\n\nFlashing interrupted by user!") - break - except Exception as e: - print(f"\n ERROR: Exception during flash: {e}") - failed_devices.append(dev['number']) - - # Summary - print(f"\n{'='*60}") - print("FLASH SUMMARY") - print(f"{'='*60}") - print(f"Successfully flashed: {success_count}/{len(device_list)} devices") - - if failed_devices: - print(f"Failed devices: {', '.join(map(str, failed_devices))}") - - print(f"{'='*60}") + # Plan output + base = args.start_ip.split('.') + try: + base0, base1, base2, base3 = int(base[0]), int(base[1]), int(base[2]), int(base[3]) + except Exception: + base0, base1, base2, base3 = 192, 168, 1, 50 + + print("\nFlash plan:") + for d in device_list: + ip_last = base3 + d['number'] - 1 + ip = f"{base0}.{base1}.{base2}.{ip_last}" + print(f" Device {d['number']:2d}: {d['port']} -> {d['raw_chip']} [{d['idf_target']}] -> {ip}") + + if args.dry_run: + print("\nDry run: not flashing any devices.") + return + + failed = [] + for d in device_list: + if d['idf_target'] == 'unknown': + print(f"\n ERROR: Unknown IDF target for {d['port']} (raw chip '{d['raw_chip']}'). Skipping.") + failed.append(d['number']) + continue + print(f"\nFlashing {d['port']} as target {d['idf_target']}...") + ok = flash_device(project_dir, d['port'], d['idf_target'], baud=args.baud) + if not ok: + failed.append(d['number']) + + if failed: + print(f"\nCompleted with failures on devices: {failed}") + sys.exit(2) + print("\nAll devices flashed successfully.") if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - print("\n\nInterrupted by user") - sys.exit(1) - except Exception as e: - print(f"\nFATAL ERROR: {e}") - import traceback - traceback.print_exc() - sys.exit(1) + main() diff --git a/main/main.c b/main/main.c index ca14cee..88911d7 100644 --- a/main/main.c +++ b/main/main.c @@ -8,6 +8,7 @@ #include "esp_wifi.h" #include "esp_event.h" #include "esp_log.h" +#include "esp_mac.h" #include "nvs_flash.h" #include "esp_console.h" #include "argtable3/argtable3.h" @@ -39,9 +40,9 @@ static void event_handler(void* arg, esp_event_base_t event_base, wifi_ap_record_t ap_info; esp_wifi_sta_get_ap_info(&ap_info); - ESP_LOGI(TAG, "Connected to AP SSID:%s, BSSID:" MACSTR " Channel:%d RSSI:%d dBm", - event->ssid, MAC2STR(event->bssid), event->channel, ap_info.rssi); - } else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { + ESP_LOGI(TAG, "Connected to AP SSID:%.*s, BSSID:" MACSTR " Channel:%d RSSI:%d dBm", + event->ssid_len, (const char *)event->ssid, MAC2STR(event->bssid), event->channel, ap_info.rssi); +} else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { if (s_retry_num < WIFI_MAXIMUM_RETRY) { esp_wifi_connect(); s_retry_num++; @@ -251,7 +252,7 @@ static int cmd_iperf(int argc, char **argv) return 1; } cfg.dip = ipaddr_addr(iperf_args.ip->sval[0]); - if (cfg.dip == IPADDR_NONE) { + if (cfg.dip == 0) { ESP_LOGE(TAG, "Invalid IP address: %s", iperf_args.ip->sval[0]); return 1; }