236 lines
8.2 KiB
Python
Executable File
236 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 Reconfiguration Tool
|
|
Iterates through connected devices and updates their settings (SSID, IP, etc.)
|
|
without reflashing firmware. Runs sequentially for reliability.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import ipaddress
|
|
import re
|
|
import time
|
|
import serial
|
|
from serial.tools import list_ports
|
|
|
|
# --- Configuration ---
|
|
BAUD_RATE = 115200
|
|
TIMEOUT = 0.5 # Serial read timeout
|
|
|
|
class Colors:
|
|
GREEN = '\033[92m'
|
|
RED = '\033[91m'
|
|
YELLOW = '\033[93m'
|
|
BLUE = '\033[94m'
|
|
CYAN = '\033[96m'
|
|
RESET = '\033[0m'
|
|
|
|
def detect_devices():
|
|
"""Returns a sorted list of ESP32 USB serial ports."""
|
|
candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60"))
|
|
ports = [p.device for p in candidates]
|
|
ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)])
|
|
return ports
|
|
|
|
def extract_device_number(device_path):
|
|
match = re.search(r'(\d+)$', device_path)
|
|
return int(match.group(1)) if match else 0
|
|
|
|
def configure_device(port, target_ip, args):
|
|
"""
|
|
Connects to a single device, resets it, and injects the config.
|
|
Returns True if verification succeeds.
|
|
"""
|
|
try:
|
|
ser = serial.Serial(port, BAUD_RATE, timeout=0.1)
|
|
except Exception as e:
|
|
print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}")
|
|
return False
|
|
|
|
try:
|
|
# 1. Reset Device
|
|
ser.dtr = False
|
|
ser.rts = True
|
|
time.sleep(0.1)
|
|
ser.rts = False
|
|
ser.dtr = True
|
|
|
|
# 2. Wait for App to Settle (Handle GPS delay)
|
|
print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True)
|
|
start_time = time.time()
|
|
|
|
# We wait until we see the "esp32>" prompt OR specific log lines
|
|
# The prompt is the safest indicator that the console is ready.
|
|
prompt_detected = False
|
|
buffer = ""
|
|
|
|
while time.time() - start_time < 10.0:
|
|
try:
|
|
# Read char by char to catch prompts that don't end in newline
|
|
chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore')
|
|
if chunk:
|
|
buffer += chunk
|
|
# Check for prompt or end of init
|
|
if "esp32>" in buffer or "Entering console loop" in buffer:
|
|
prompt_detected = True
|
|
break
|
|
# Keep buffer size manageable
|
|
if len(buffer) > 1000: buffer = buffer[-1000:]
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.05)
|
|
|
|
if not prompt_detected:
|
|
print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}")
|
|
else:
|
|
print(" OK")
|
|
|
|
# 3. Clear Buffers & Wakeup
|
|
ser.reset_input_buffer()
|
|
ser.write(b'\n') # Send an Enter to clear any partial commands
|
|
time.sleep(0.2)
|
|
|
|
# 4. Construct Config String (Using CRLF \r\n for safety)
|
|
csi_val = '1' if args.csi_enable else '0'
|
|
role_str = "SERVER" if args.iperf_server else "CLIENT"
|
|
iperf_enable_val = '0' if args.no_iperf else '1'
|
|
period_us = int(args.iperf_period * 1000000)
|
|
|
|
# Note: We send \r\n explicitly
|
|
config_lines = [
|
|
"CFG",
|
|
f"SSID:{args.ssid}",
|
|
f"PASS:{args.password}",
|
|
f"IP:{target_ip}",
|
|
f"MASK:{args.netmask}",
|
|
f"GW:{args.gateway}",
|
|
f"DHCP:0",
|
|
f"BAND:{args.band}",
|
|
f"BW:{args.bandwidth}",
|
|
f"POWERSAVE:{args.powersave}",
|
|
f"MODE:{args.mode}",
|
|
f"MON_CH:{args.monitor_channel}",
|
|
f"CSI:{csi_val}",
|
|
f"IPERF_PERIOD_US:{period_us}",
|
|
f"IPERF_ROLE:{role_str}",
|
|
f"IPERF_PROTO:{args.iperf_proto}",
|
|
f"IPERF_DEST_IP:{args.iperf_dest_ip}",
|
|
f"IPERF_PORT:{args.iperf_port}",
|
|
f"IPERF_BURST:{args.iperf_burst}",
|
|
f"IPERF_LEN:{args.iperf_len}",
|
|
f"IPERF_ENABLED:{iperf_enable_val}",
|
|
"END"
|
|
]
|
|
|
|
config_payload = "\r\n".join(config_lines) + "\r\n"
|
|
|
|
# 5. Send Config
|
|
print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True)
|
|
ser.write(config_payload.encode('utf-8'))
|
|
ser.flush()
|
|
|
|
# 6. Verify
|
|
verify_start = time.time()
|
|
verified = False
|
|
ser.timeout = 0.5 # Increase timeout for line reading
|
|
|
|
while time.time() - verify_start < 8.0:
|
|
line = ser.readline().decode('utf-8', errors='ignore').strip()
|
|
if not line: continue
|
|
|
|
# Check for success indicators
|
|
if "Config saved" in line or "CSI enable state saved" in line:
|
|
verified = True
|
|
break
|
|
|
|
# Check for IP confirmation
|
|
if f"got ip:{target_ip}" in line:
|
|
verified = True
|
|
break
|
|
|
|
if verified:
|
|
print(f" {Colors.GREEN}SUCCESS{Colors.RESET}")
|
|
# Final Reset to apply settings cleanly
|
|
ser.dtr = False
|
|
ser.rts = True
|
|
time.sleep(0.1)
|
|
ser.rts = False
|
|
return True
|
|
else:
|
|
print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[{port}] Error: {e}")
|
|
return False
|
|
finally:
|
|
if ser.is_open:
|
|
ser.close()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool')
|
|
|
|
parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)')
|
|
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
|
|
parser.add_argument('-P', '--password', default='ez2remember')
|
|
parser.add_argument('-g', '--gateway', default='192.168.1.1')
|
|
parser.add_argument('-m', '--netmask', default='255.255.255.0')
|
|
parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'])
|
|
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'])
|
|
parser.add_argument('-ps', '--powersave', default='NONE')
|
|
parser.add_argument('--iperf-period', type=float, default=0.01)
|
|
parser.add_argument('--iperf-burst', type=int, default=1)
|
|
parser.add_argument('--iperf-len', type=int, default=1470)
|
|
parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'])
|
|
parser.add_argument('--iperf-dest-ip', default='192.168.1.50')
|
|
parser.add_argument('--iperf-port', type=int, default=5001)
|
|
parser.add_argument('--no-iperf', action='store_true')
|
|
parser.add_argument('--iperf-client', action='store_true')
|
|
parser.add_argument('--iperf-server', action='store_true')
|
|
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'])
|
|
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
|
|
parser.add_argument('--csi', dest='csi_enable', action='store_true')
|
|
parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device")
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
|
|
print(f" ESP32 Sequential Reconfig Tool")
|
|
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
|
|
|
|
devices = detect_devices()
|
|
if not devices:
|
|
print(f"{Colors.RED}No devices found.{Colors.RESET}")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(devices)} devices. Starting reconfiguration...\n")
|
|
|
|
start_ip = ipaddress.IPv4Address(args.start_ip)
|
|
|
|
for i, port in enumerate(devices):
|
|
offset = extract_device_number(port)
|
|
target_ip = str(start_ip + offset)
|
|
|
|
print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}")
|
|
|
|
success = False
|
|
for attempt in range(1, args.retries + 1):
|
|
if attempt > 1:
|
|
print(f" Retry {attempt}/{args.retries}...")
|
|
|
|
if configure_device(port, target_ip, args):
|
|
success = True
|
|
break
|
|
time.sleep(1.0)
|
|
|
|
if not success:
|
|
print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n")
|
|
else:
|
|
print("")
|
|
|
|
print(f"{Colors.BLUE}Done.{Colors.RESET}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|