more on flash_all

This commit is contained in:
Bob 2025-11-10 08:50:14 -08:00
parent 1a05445931
commit db6d6b20c4
2 changed files with 164 additions and 284 deletions

View File

@ -1,308 +1,187 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
ESP32 Mass Flash Script ESP32 Mass Flash Script (filtered & robust)
Automatically detects, configures, and flashes multiple ESP32 devices with unique IPs
"""
Changes in this version:
- Filters out non-USB system serials (e.g., /dev/ttyS*)
- Only enumerates typical USB serial ports: /dev/ttyUSB* and /dev/ttyACM*
- Further filters to known USB-serial vendor IDs by default:
* FTDI 0x0403
* SiliconLabs/CP210x 0x10C4
* QinHeng/CH34x 0x1A86
* Prolific PL2303 0x067B
* Espressif native 0x303A
- Adds --ports to override selection (glob or comma-separated patterns)
- Uses detect_esp32.detect_chip_type(port) for exact chip string
- Maps to correct idf.py target before flashing
Example:
python3 flash_all.py --project /path/to/project --start-ip 192.168.1.50
python3 flash_all.py --project . --ports '/dev/ttyUSB*,/dev/ttyACM*'
"""
import argparse
import fnmatch
import glob
import os
import subprocess import subprocess
import sys import sys
import os
import time
import argparse
from pathlib import Path from pathlib import Path
from typing import List, Dict
# Import the detection script try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import serial.tools.list_ports as list_ports
except Exception:
print("pyserial is required: pip install pyserial")
raise
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
try: try:
import detect_esp32 import detect_esp32
except ImportError: except Exception as e:
print("Error: detect_esp32.py must be in the same directory") print("Error: detect_esp32.py must be in the same directory and importable.")
print(f"Import error: {e}")
sys.exit(1) sys.exit(1)
# Known USB VIDs commonly used for ESP32 dev boards/adapters
KNOWN_VIDS = {0x0403, 0x10C4, 0x1A86, 0x067B, 0x303A}
# Device type detection based on USB chip def map_chip_to_idf_target(chip_str: str) -> str:
def detect_device_type(port_info): if not chip_str or chip_str == 'Unknown':
""" return 'unknown'
Try to detect ESP32 variant based on USB chip and other heuristics. s = chip_str.upper()
Returns: 'esp32', 'esp32s2', 'esp32s3', or 'unknown' if s.startswith('ESP32-S3'):
""" return 'esp32s3'
# Espressif's own USB JTAG is used in ESP32-C3 and ESP32-S3 if s.startswith('ESP32-S2'):
if port_info.vid == 0x303A: return 'esp32s2'
if port_info.pid == 0x1001: if s.startswith('ESP32-C3'):
return 'esp32s3' # Most likely S3 return 'esp32c3'
return 'esp32s3' # Default to S3 for Espressif USB if s.startswith('ESP32-C6'):
return 'esp32c6'
# For FTDI and CP210x, we need to probe the chip if s.startswith('ESP32-H2'):
# Default assumption based on quantity in your setup return 'esp32h2'
# You may need to adjust this logic if s.startswith('ESP32'):
return 'esp32' # Default to ESP32 for FTDI/CP210x return 'esp32'
def probe_chip_type(port):
"""
Probe the actual chip type using esptool.py
"""
try:
result = subprocess.run(
['esptool.py', '--port', port, 'chip_id'],
capture_output=True,
text=True,
timeout=10
)
output = result.stdout + result.stderr
if 'ESP32-S3' in output:
return 'esp32s3'
elif 'ESP32-S2' in output:
return 'esp32s2'
elif 'ESP32-C3' in output:
return 'esp32c3'
elif 'ESP32' in output:
return 'esp32'
except Exception as e:
print(f" Warning: Could not probe {port}: {e}")
return 'unknown' return 'unknown'
def run(cmd: List[str], cwd: str = None, check: bool = True) -> int:
print(">>", " ".join(cmd))
proc = subprocess.run(cmd, cwd=cwd)
if check and proc.returncode != 0:
raise RuntimeError(f"Command failed with code {proc.returncode}: {' '.join(cmd)}")
return proc.returncode
def create_sdkconfig(project_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'): def ensure_target(project_dir: str, target: str):
""" if not target or target == 'unknown':
Create sdkconfig.defaults file with WiFi and IP configuration raise ValueError("Unknown IDF target; cannot set-target.")
""" run(['idf.py', 'set-target', target], cwd=project_dir, check=True)
sdkconfig_path = os.path.join(project_dir, 'sdkconfig.defaults')
config_content = f"""# WiFi Configuration def flash_device(project_dir: str, port: str, idf_target: str, baud: int = 460800) -> bool:
CONFIG_WIFI_SSID="{ssid}" try:
CONFIG_WIFI_PASSWORD="{password}" ensure_target(project_dir, idf_target)
CONFIG_WIFI_MAXIMUM_RETRY=5 run(['idf.py', '-p', port, '-b', str(baud), 'flash'], cwd=project_dir, check=True)
return True
# Static IP Configuration except Exception as e:
CONFIG_USE_STATIC_IP=y print(f" Flash failed on {port}: {e}")
CONFIG_STATIC_IP_ADDR="{ip_addr}"
CONFIG_STATIC_GATEWAY_ADDR="{gateway}"
CONFIG_STATIC_NETMASK_ADDR="{netmask}"
"""
with open(sdkconfig_path, 'w') as f:
f.write(config_content)
print(f" Created sdkconfig.defaults with IP {ip_addr}")
def flash_device(port, chip_type, device_num, ssid, password, base_ip, project_dir):
"""
Configure and flash a single device
"""
print(f"\n{'='*60}")
print(f"Device {device_num}: {port} ({chip_type})")
print(f"{'='*60}")
# Calculate IP address
base_parts = base_ip.split('.')
ip_last_octet = int(base_parts[3]) + device_num - 1
if ip_last_octet > 254:
print(f" ERROR: IP address overflow! Device {device_num} would exceed .254")
return False return False
ip_addr = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last_octet}" def match_any(path: str, patterns: List[str]) -> bool:
return any(fnmatch.fnmatch(path, pat) for pat in patterns)
print(f" Assigned IP: {ip_addr}")
# Create sdkconfig.defaults
create_sdkconfig(project_dir, ssid, password, ip_addr)
# Clean previous build if target changed
print(" Cleaning previous build...")
subprocess.run(['idf.py', 'fullclean'], cwd=project_dir,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Set target
print(f" Setting target to {chip_type}...")
result = subprocess.run(
['idf.py', 'set-target', chip_type],
cwd=project_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f" ERROR: Failed to set target: {result.stderr}")
return False
# Build
print(f" Building for {chip_type}...")
result = subprocess.run(
['idf.py', 'build'],
cwd=project_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f" ERROR: Build failed!")
print(result.stderr[-1000:]) # Print last 1000 chars of error
return False
print(f" Build successful!")
# Flash
print(f" Flashing to {port}...")
result = subprocess.run(
['idf.py', '-p', port, 'flash'],
cwd=project_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f" ERROR: Flash failed!")
print(result.stderr[-1000:])
return False
print(f" ✓ Successfully flashed device {device_num} at {ip_addr}")
return True
def list_ports_filtered(patterns: List[str] = None) -> List[object]:
"""Return a filtered list of pyserial list_ports items."""
ports = list(list_ports.comports())
filtered = []
for p in ports:
dev = p.device
# Default pattern filter: only /dev/ttyUSB* and /dev/ttyACM*
if patterns:
if not match_any(dev, patterns):
continue
else:
if not (dev.startswith('/dev/ttyUSB') or dev.startswith('/dev/ttyACM')):
continue
# VID filter (allow if vid is known or missing (some systems omit it), but exclude obvious non-USB)
vid = getattr(p, 'vid', None)
if vid is not None and vid not in KNOWN_VIDS:
# Skip unknown vendor to reduce noise; user can override with --ports
continue
filtered.append(p)
return filtered
def main(): def main():
parser = argparse.ArgumentParser(description='Mass flash ESP32 devices with unique IPs') ap = argparse.ArgumentParser(description="Mass flash multiple ESP32 devices with proper chip detection.")
parser.add_argument('--ssid', required=True, help='WiFi SSID') ap.add_argument('--project', required=True, help='Path to the ESP-IDF project to flash')
parser.add_argument('--password', required=True, help='WiFi password') ap.add_argument('--ssid', help='WiFi SSID (optional)')
parser.add_argument('--start-ip', default='192.168.1.50', ap.add_argument('--password', help='WiFi password (optional)')
help='Starting IP address (default: 192.168.1.50)') ap.add_argument('--start-ip', default='192.168.1.50', help='Base IP address for plan display')
parser.add_argument('--gateway', default='192.168.1.1', ap.add_argument('--baud', type=int, default=460800, help='Flashing baud rate')
help='Gateway IP (default: 192.168.1.1)') ap.add_argument('--dry-run', action='store_true', help='Plan only; do not flash')
parser.add_argument('--project-dir', default=None, ap.add_argument('--ports', help='Comma-separated glob(s), e.g. "/dev/ttyUSB*,/dev/ttyACM*" to override selection')
help='ESP32 iperf project directory') args = ap.parse_args()
parser.add_argument('--probe', action='store_true',
help='Probe each device to detect exact chip type (slower)')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without flashing')
args = parser.parse_args() project_dir = os.path.abspath(args.project)
if not os.path.isdir(project_dir):
# Find project directory print(f"Project directory not found: {project_dir}")
if args.project_dir:
project_dir = args.project_dir
else:
# Try to find it relative to script location
script_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = os.path.join(script_dir, 'esp32-iperf')
if not os.path.exists(project_dir):
project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf')
if not os.path.exists(project_dir):
print(f"ERROR: Project directory not found: {project_dir}")
print("Please specify --project-dir")
sys.exit(1) sys.exit(1)
print(f"Using project directory: {project_dir}") patterns = None
if args.ports:
# Detect devices patterns = [pat.strip() for pat in args.ports.split(',') if pat.strip()]
print("\nDetecting ESP32 devices...")
devices = detect_esp32.detect_esp32_devices()
devices = list_ports_filtered(patterns)
print(f"Found {len(devices)} USB serial device(s) after filtering")
if not devices: if not devices:
print("No ESP32 devices detected!") print("No candidate USB serial ports found. Try --ports '/dev/ttyUSB*,/dev/ttyACM*' or check permissions.")
sys.exit(1)
print(f"Found {len(devices)} device(s)")
# Detect chip types
device_list = []
for idx, device in enumerate(devices, 1):
if args.probe:
print(f"Probing {device.device}...")
chip_type = probe_chip_type(device.device)
else:
chip_type = detect_device_type(device)
device_list: List[Dict] = []
for idx, dev in enumerate(devices, 1):
port = dev.device
print(f"Probing {port} for exact chip...")
raw_chip = detect_esp32.detect_chip_type(port)
idf_target = map_chip_to_idf_target(raw_chip)
if idf_target == 'unknown':
print(f" WARNING: Could not determine idf.py target for {port} (got '{raw_chip}')")
device_list.append({ device_list.append({
'number': idx, 'number': idx,
'port': device.device, 'port': port,
'chip': chip_type, 'raw_chip': raw_chip,
'info': device 'idf_target': idf_target,
'info': dev,
}) })
# Display plan # Plan output
print(f"\n{'='*60}") base = args.start_ip.split('.')
print("FLASH PLAN") try:
print(f"{'='*60}") base0, base1, base2, base3 = int(base[0]), int(base[1]), int(base[2]), int(base[3])
print(f"SSID: {args.ssid}") except Exception:
print(f"Starting IP: {args.start_ip}") base0, base1, base2, base3 = 192, 168, 1, 50
print(f"Gateway: {args.gateway}")
print()
base_parts = args.start_ip.split('.') print("\nFlash plan:")
for dev in device_list: for d in device_list:
ip_last = int(base_parts[3]) + dev['number'] - 1 ip_last = base3 + d['number'] - 1
ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}" ip = f"{base0}.{base1}.{base2}.{ip_last}"
print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {ip}") print(f" Device {d['number']:2d}: {d['port']} -> {d['raw_chip']} [{d['idf_target']}] -> {ip}")
if args.dry_run: if args.dry_run:
print("\nDry run - no devices will be flashed") print("\nDry run: not flashing any devices.")
return return
# Confirm failed = []
print(f"\n{'='*60}") for d in device_list:
response = input("Proceed with flashing? (yes/no): ").strip().lower() if d['idf_target'] == 'unknown':
if response != 'yes': print(f"\n ERROR: Unknown IDF target for {d['port']} (raw chip '{d['raw_chip']}'). Skipping.")
print("Aborted.") failed.append(d['number'])
return continue
print(f"\nFlashing {d['port']} as target {d['idf_target']}...")
# Flash devices ok = flash_device(project_dir, d['port'], d['idf_target'], baud=args.baud)
success_count = 0 if not ok:
failed_devices = [] failed.append(d['number'])
for dev in device_list:
try:
success = flash_device(
dev['port'],
dev['chip'],
dev['number'],
args.ssid,
args.password,
args.start_ip,
project_dir
)
if success:
success_count += 1
else:
failed_devices.append(dev['number'])
time.sleep(1) # Brief pause between devices
except KeyboardInterrupt:
print("\n\nFlashing interrupted by user!")
break
except Exception as e:
print(f"\n ERROR: Exception during flash: {e}")
failed_devices.append(dev['number'])
# Summary
print(f"\n{'='*60}")
print("FLASH SUMMARY")
print(f"{'='*60}")
print(f"Successfully flashed: {success_count}/{len(device_list)} devices")
if failed_devices:
print(f"Failed devices: {', '.join(map(str, failed_devices))}")
print(f"{'='*60}")
if failed:
print(f"\nCompleted with failures on devices: {failed}")
sys.exit(2)
print("\nAll devices flashed successfully.")
if __name__ == '__main__': if __name__ == '__main__':
try: main()
main()
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(1)
except Exception as e:
print(f"\nFATAL ERROR: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -8,6 +8,7 @@
#include "esp_wifi.h" #include "esp_wifi.h"
#include "esp_event.h" #include "esp_event.h"
#include "esp_log.h" #include "esp_log.h"
#include "esp_mac.h"
#include "nvs_flash.h" #include "nvs_flash.h"
#include "esp_console.h" #include "esp_console.h"
#include "argtable3/argtable3.h" #include "argtable3/argtable3.h"
@ -39,9 +40,9 @@ static void event_handler(void* arg, esp_event_base_t event_base,
wifi_ap_record_t ap_info; wifi_ap_record_t ap_info;
esp_wifi_sta_get_ap_info(&ap_info); esp_wifi_sta_get_ap_info(&ap_info);
ESP_LOGI(TAG, "Connected to AP SSID:%s, BSSID:" MACSTR " Channel:%d RSSI:%d dBm", ESP_LOGI(TAG, "Connected to AP SSID:%.*s, BSSID:" MACSTR " Channel:%d RSSI:%d dBm",
event->ssid, MAC2STR(event->bssid), event->channel, ap_info.rssi); event->ssid_len, (const char *)event->ssid, MAC2STR(event->bssid), event->channel, ap_info.rssi);
} else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { } else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
if (s_retry_num < WIFI_MAXIMUM_RETRY) { if (s_retry_num < WIFI_MAXIMUM_RETRY) {
esp_wifi_connect(); esp_wifi_connect();
s_retry_num++; s_retry_num++;
@ -251,7 +252,7 @@ static int cmd_iperf(int argc, char **argv)
return 1; return 1;
} }
cfg.dip = ipaddr_addr(iperf_args.ip->sval[0]); cfg.dip = ipaddr_addr(iperf_args.ip->sval[0]);
if (cfg.dip == IPADDR_NONE) { if (cfg.dip == 0) {
ESP_LOGE(TAG, "Invalid IP address: %s", iperf_args.ip->sval[0]); ESP_LOGE(TAG, "Invalid IP address: %s", iperf_args.ip->sval[0]);
return 1; return 1;
} }