309 lines
9.0 KiB
Python
Executable File
309 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 Mass Flash Script
|
|
Automatically detects, configures, and flashes multiple ESP32 devices with unique IPs
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import time
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# Import the detection script
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
try:
|
|
import detect_esp32
|
|
except ImportError:
|
|
print("Error: detect_esp32.py must be in the same directory")
|
|
sys.exit(1)
|
|
|
|
|
|
# Device type detection based on USB chip
|
|
def detect_device_type(port_info):
|
|
"""
|
|
Try to detect ESP32 variant based on USB chip and other heuristics.
|
|
Returns: 'esp32', 'esp32s2', 'esp32s3', or 'unknown'
|
|
"""
|
|
# Espressif's own USB JTAG is used in ESP32-C3 and ESP32-S3
|
|
if port_info.vid == 0x303A:
|
|
if port_info.pid == 0x1001:
|
|
return 'esp32s3' # Most likely S3
|
|
return 'esp32s3' # Default to S3 for Espressif USB
|
|
|
|
# For FTDI and CP210x, we need to probe the chip
|
|
# Default assumption based on quantity in your setup
|
|
# You may need to adjust this logic
|
|
return 'esp32' # Default to ESP32 for FTDI/CP210x
|
|
|
|
|
|
def probe_chip_type(port):
|
|
"""
|
|
Probe the actual chip type using esptool.py
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
['esptool.py', '--port', port, 'chip_id'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
output = result.stdout + result.stderr
|
|
|
|
if 'ESP32-S3' in output:
|
|
return 'esp32s3'
|
|
elif 'ESP32-S2' in output:
|
|
return 'esp32s2'
|
|
elif 'ESP32-C3' in output:
|
|
return 'esp32c3'
|
|
elif 'ESP32' in output:
|
|
return 'esp32'
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Could not probe {port}: {e}")
|
|
|
|
return 'unknown'
|
|
|
|
|
|
def create_sdkconfig(project_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'):
|
|
"""
|
|
Create sdkconfig.defaults file with WiFi and IP configuration
|
|
"""
|
|
sdkconfig_path = os.path.join(project_dir, 'sdkconfig.defaults')
|
|
|
|
config_content = f"""# WiFi Configuration
|
|
CONFIG_WIFI_SSID="{ssid}"
|
|
CONFIG_WIFI_PASSWORD="{password}"
|
|
CONFIG_WIFI_MAXIMUM_RETRY=5
|
|
|
|
# Static IP Configuration
|
|
CONFIG_USE_STATIC_IP=y
|
|
CONFIG_STATIC_IP_ADDR="{ip_addr}"
|
|
CONFIG_STATIC_GATEWAY_ADDR="{gateway}"
|
|
CONFIG_STATIC_NETMASK_ADDR="{netmask}"
|
|
"""
|
|
|
|
with open(sdkconfig_path, 'w') as f:
|
|
f.write(config_content)
|
|
|
|
print(f" Created sdkconfig.defaults with IP {ip_addr}")
|
|
|
|
|
|
def flash_device(port, chip_type, device_num, ssid, password, base_ip, project_dir):
|
|
"""
|
|
Configure and flash a single device
|
|
"""
|
|
print(f"\n{'='*60}")
|
|
print(f"Device {device_num}: {port} ({chip_type})")
|
|
print(f"{'='*60}")
|
|
|
|
# Calculate IP address
|
|
base_parts = base_ip.split('.')
|
|
ip_last_octet = int(base_parts[3]) + device_num - 1
|
|
|
|
if ip_last_octet > 254:
|
|
print(f" ERROR: IP address overflow! Device {device_num} would exceed .254")
|
|
return False
|
|
|
|
ip_addr = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last_octet}"
|
|
|
|
print(f" Assigned IP: {ip_addr}")
|
|
|
|
# Create sdkconfig.defaults
|
|
create_sdkconfig(project_dir, ssid, password, ip_addr)
|
|
|
|
# Clean previous build if target changed
|
|
print(" Cleaning previous build...")
|
|
subprocess.run(['idf.py', 'fullclean'], cwd=project_dir,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
# Set target
|
|
print(f" Setting target to {chip_type}...")
|
|
result = subprocess.run(
|
|
['idf.py', 'set-target', chip_type],
|
|
cwd=project_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f" ERROR: Failed to set target: {result.stderr}")
|
|
return False
|
|
|
|
# Build
|
|
print(f" Building for {chip_type}...")
|
|
result = subprocess.run(
|
|
['idf.py', 'build'],
|
|
cwd=project_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f" ERROR: Build failed!")
|
|
print(result.stderr[-1000:]) # Print last 1000 chars of error
|
|
return False
|
|
|
|
print(f" Build successful!")
|
|
|
|
# Flash
|
|
print(f" Flashing to {port}...")
|
|
result = subprocess.run(
|
|
['idf.py', '-p', port, 'flash'],
|
|
cwd=project_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f" ERROR: Flash failed!")
|
|
print(result.stderr[-1000:])
|
|
return False
|
|
|
|
print(f" ✓ Successfully flashed device {device_num} at {ip_addr}")
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Mass flash ESP32 devices with unique IPs')
|
|
parser.add_argument('--ssid', required=True, help='WiFi SSID')
|
|
parser.add_argument('--password', required=True, help='WiFi password')
|
|
parser.add_argument('--start-ip', default='192.168.1.50',
|
|
help='Starting IP address (default: 192.168.1.50)')
|
|
parser.add_argument('--gateway', default='192.168.1.1',
|
|
help='Gateway IP (default: 192.168.1.1)')
|
|
parser.add_argument('--project-dir', default=None,
|
|
help='ESP32 iperf project directory')
|
|
parser.add_argument('--probe', action='store_true',
|
|
help='Probe each device to detect exact chip type (slower)')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be done without flashing')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Find project directory
|
|
if args.project_dir:
|
|
project_dir = args.project_dir
|
|
else:
|
|
# Try to find it relative to script location
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_dir = os.path.join(script_dir, 'esp32-iperf')
|
|
|
|
if not os.path.exists(project_dir):
|
|
project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf')
|
|
|
|
if not os.path.exists(project_dir):
|
|
print(f"ERROR: Project directory not found: {project_dir}")
|
|
print("Please specify --project-dir")
|
|
sys.exit(1)
|
|
|
|
print(f"Using project directory: {project_dir}")
|
|
|
|
# Detect devices
|
|
print("\nDetecting ESP32 devices...")
|
|
devices = detect_esp32.detect_esp32_devices()
|
|
|
|
if not devices:
|
|
print("No ESP32 devices detected!")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(devices)} device(s)")
|
|
|
|
# Detect chip types
|
|
device_list = []
|
|
for idx, device in enumerate(devices, 1):
|
|
if args.probe:
|
|
print(f"Probing {device.device}...")
|
|
chip_type = probe_chip_type(device.device)
|
|
else:
|
|
chip_type = detect_device_type(device)
|
|
|
|
device_list.append({
|
|
'number': idx,
|
|
'port': device.device,
|
|
'chip': chip_type,
|
|
'info': device
|
|
})
|
|
|
|
# Display plan
|
|
print(f"\n{'='*60}")
|
|
print("FLASH PLAN")
|
|
print(f"{'='*60}")
|
|
print(f"SSID: {args.ssid}")
|
|
print(f"Starting IP: {args.start_ip}")
|
|
print(f"Gateway: {args.gateway}")
|
|
print()
|
|
|
|
base_parts = args.start_ip.split('.')
|
|
for dev in device_list:
|
|
ip_last = int(base_parts[3]) + dev['number'] - 1
|
|
ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}"
|
|
print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {ip}")
|
|
|
|
if args.dry_run:
|
|
print("\nDry run - no devices will be flashed")
|
|
return
|
|
|
|
# Confirm
|
|
print(f"\n{'='*60}")
|
|
response = input("Proceed with flashing? (yes/no): ").strip().lower()
|
|
if response != 'yes':
|
|
print("Aborted.")
|
|
return
|
|
|
|
# Flash devices
|
|
success_count = 0
|
|
failed_devices = []
|
|
|
|
for dev in device_list:
|
|
try:
|
|
success = flash_device(
|
|
dev['port'],
|
|
dev['chip'],
|
|
dev['number'],
|
|
args.ssid,
|
|
args.password,
|
|
args.start_ip,
|
|
project_dir
|
|
)
|
|
|
|
if success:
|
|
success_count += 1
|
|
else:
|
|
failed_devices.append(dev['number'])
|
|
|
|
time.sleep(1) # Brief pause between devices
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nFlashing interrupted by user!")
|
|
break
|
|
except Exception as e:
|
|
print(f"\n ERROR: Exception during flash: {e}")
|
|
failed_devices.append(dev['number'])
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
print("FLASH SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f"Successfully flashed: {success_count}/{len(device_list)} devices")
|
|
|
|
if failed_devices:
|
|
print(f"Failed devices: {', '.join(map(str, failed_devices))}")
|
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\nInterrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\nFATAL ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|