more scripts

This commit is contained in:
Bob 2025-11-12 15:24:16 -08:00
parent 289efa97a6
commit 42d0ce9a7b
6 changed files with 908 additions and 23 deletions

198
config_device.py Executable file
View File

@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
ESP32 WiFi Configuration Tool - With verbose mode
"""
import serial
import time
import sys
import argparse
def log_verbose(message, verbose=False):
"""Print message only if verbose is enabled"""
if verbose:
print(f"[VERBOSE] {message}")
def config_device(port, ip, ssid="ClubHouse2G", password="ez2remember",
gateway="192.168.1.1", netmask="255.255.255.0", verbose=False):
"""Configure ESP32 device via serial"""
print(f"\n{'='*70}")
print(f"ESP32 WiFi Configuration")
print(f"{'='*70}")
print(f"Port: {port}")
print(f"SSID: {ssid}")
print(f"Password: {'*' * len(password)}")
print(f"IP: {ip}")
print(f"Gateway: {gateway}")
print(f"Netmask: {netmask}")
print(f"Verbose: {verbose}")
print(f"{'='*70}\n")
try:
# Open serial connection
log_verbose(f"Opening serial port {port} at 115200 baud...", verbose)
ser = serial.Serial(port, 115200, timeout=0.5, write_timeout=0.5)
log_verbose(f"Serial port opened successfully", verbose)
log_verbose(f"Port settings: {ser}", verbose)
time.sleep(0.2)
# Check if there's any data waiting
if ser.in_waiting:
log_verbose(f"{ser.in_waiting} bytes waiting in buffer", verbose)
existing = ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
log_verbose(f"Existing data: {existing[:100]}", verbose)
# Build config message
config_lines = [
"CFG",
f"SSID:{ssid}",
f"PASS:{password}",
f"IP:{ip}",
f"MASK:{netmask}",
f"GW:{gateway}",
"DHCP:0",
"END"
]
config = '\n'.join(config_lines) + '\n'
log_verbose(f"Config message size: {len(config)} bytes", verbose)
if verbose:
print("[VERBOSE] Config message:")
for line in config_lines:
display_line = line if not line.startswith("PASS:") else "PASS:********"
print(f"[VERBOSE] {display_line}")
# Send config
print("Sending configuration...")
start_time = time.time()
bytes_written = ser.write(config.encode('utf-8'))
ser.flush()
send_time = time.time() - start_time
log_verbose(f"Wrote {bytes_written} bytes in {send_time:.3f}s", verbose)
print("Sent! Waiting for response...")
time.sleep(2)
# Read response
if ser.in_waiting:
response_size = ser.in_waiting
log_verbose(f"Response available: {response_size} bytes", verbose)
response = ser.read(response_size).decode('utf-8', errors='ignore')
if verbose:
print("[VERBOSE] Raw response:")
for line in response.split('\n')[:20]: # Show first 20 lines
if line.strip():
print(f"[VERBOSE] {line}")
# Check for key indicators
if "OK" in response:
print("✓ Device acknowledged configuration (OK)")
if "got ip:" in response.lower():
print("✓ Device connected to WiFi!")
# Extract IP from response
import re
ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', response, re.IGNORECASE)
if ip_match:
print(f" Assigned IP: {ip_match.group(1)}")
if "connected" in response.lower():
print("✓ WiFi connection established")
if "failed" in response.lower() or "disconnect" in response.lower():
print("✗ WiFi connection may have failed")
if verbose:
print("[VERBOSE] Check response above for error details")
else:
log_verbose("No immediate response from device", verbose)
print("⚠ No response (device may still be processing)")
# Get final port stats
if verbose:
log_verbose(f"Input buffer: {ser.in_waiting} bytes", verbose)
log_verbose(f"Output buffer empty: {ser.out_waiting == 0}", verbose)
ser.close()
log_verbose("Serial port closed", verbose)
print(f"\nConfiguration sent to {port}")
print(f"Expected IP: {ip}")
print(f"Test with: ping {ip}")
print(f" iperf -c {ip}")
return True
except serial.SerialException as e:
print(f"\n✗ Serial error: {e}")
log_verbose(f"Serial exception details: {type(e).__name__}", verbose)
print(" Is another program using this port? (idf.py monitor, screen, etc.)")
return False
except KeyboardInterrupt:
print("\n\nConfiguration cancelled by user")
if 'ser' in locals() and ser.is_open:
ser.close()
log_verbose("Serial port closed after interrupt", verbose)
return False
except Exception as e:
print(f"\n✗ Error: {e}")
if verbose:
import traceback
print("\n[VERBOSE] Full traceback:")
traceback.print_exc()
return False
def main():
parser = argparse.ArgumentParser(
description='Configure ESP32 WiFi via serial',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic configuration
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.51
# With verbose output
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 -v
# Custom WiFi credentials
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.52 -s MyWiFi -P mypass -v
# Custom gateway
%(prog)s -p /dev/ttyUSB0 -i 10.0.0.100 -g 10.0.0.1
"""
)
parser.add_argument('-p', '--port', required=True,
help='Serial port (e.g., /dev/ttyUSB0)')
parser.add_argument('-i', '--ip', required=True,
help='Static IP address')
parser.add_argument('-s', '--ssid', default='ClubHouse2G',
help='WiFi SSID (default: ClubHouse2G)')
parser.add_argument('-P', '--password', default='ez2remember',
help='WiFi password (default: ez2remember)')
parser.add_argument('-g', '--gateway', default='192.168.1.1',
help='Gateway IP (default: 192.168.1.1)')
parser.add_argument('-m', '--netmask', default='255.255.255.0',
help='Netmask (default: 255.255.255.0)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output (show detailed debug info)')
args = parser.parse_args()
success = config_device(
port=args.port,
ip=args.ip,
ssid=args.ssid,
password=args.password,
gateway=args.gateway,
netmask=args.netmask,
verbose=args.verbose
)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()

View File

@ -1,15 +1,5 @@
idf_component_register(
SRCS "main.c"
INCLUDE_DIRS "."
REQUIRES
esp_wifi
nvs_flash
esp_netif
console
lwip
driver
esp_driver_uart
vfs
iperf
wifi_cfg
PRIV_REQUIRES esp_driver_gpio
)

View File

@ -9,28 +9,139 @@
#include "esp_system.h"
#include "esp_event.h"
#include "esp_log.h"
#include "esp_wifi.h"
#include "nvs_flash.h"
#include "esp_netif.h"
#include "lwip/inet.h"
#include "led_strip.h"
#include "iperf.h"
#include "wifi_cfg.h"
static const char *TAG = "main";
// WS2812 RGB LED - try both GPIO 38 and 48
#define RGB_LED_GPIO 48 // If this doesn't work, try 38
static led_strip_handle_t led_strip;
// WiFi connection status
static bool wifi_connected = false;
static bool has_config = false;
// LED states
typedef enum {
LED_STATE_NO_CONFIG, // Yellow - no WiFi config in NVS
LED_STATE_WAITING, // Blue slow blink - trying to connect
LED_STATE_CONNECTED, // Blue solid - connected successfully
LED_STATE_FAILED // Red fast blink - connection failed
} led_state_t;
static led_state_t current_led_state = LED_STATE_NO_CONFIG;
static void rgb_led_init(void)
{
led_strip_config_t strip_config = {
.strip_gpio_num = RGB_LED_GPIO,
.max_leds = 1,
};
led_strip_rmt_config_t rmt_config = {
.resolution_hz = 10 * 1000 * 1000, // 10MHz
};
ESP_ERROR_CHECK(led_strip_new_rmt_device(&strip_config, &rmt_config, &led_strip));
led_strip_clear(led_strip);
ESP_LOGI(TAG, "WS2812 RGB LED initialized on GPIO %d", RGB_LED_GPIO);
}
static void set_led_color(uint8_t r, uint8_t g, uint8_t b)
{
led_strip_set_pixel(led_strip, 0, r, g, b);
led_strip_refresh(led_strip);
}
static void led_task(void *arg)
{
int blink_state = 0;
while(1) {
switch(current_led_state) {
case LED_STATE_NO_CONFIG:
// Yellow solid - no WiFi config
set_led_color(255, 255, 0); // Red + Green = Yellow
vTaskDelay(pdMS_TO_TICKS(1000));
break;
case LED_STATE_WAITING:
// Blue slow blink - waiting for WiFi
if (blink_state) {
set_led_color(0, 0, 255); // Blue
} else {
set_led_color(0, 0, 0); // Off
}
blink_state = !blink_state;
vTaskDelay(pdMS_TO_TICKS(1000));
break;
case LED_STATE_CONNECTED:
// Blue solid - connected successfully
set_led_color(0, 0, 255); // Blue
vTaskDelay(pdMS_TO_TICKS(1000));
break;
case LED_STATE_FAILED:
// Red fast blink - connection failed
if (blink_state) {
set_led_color(255, 0, 0); // Red
} else {
set_led_color(0, 0, 0); // Off
}
blink_state = !blink_state;
vTaskDelay(pdMS_TO_TICKS(200));
break;
}
}
}
static void event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
if (event_base == WIFI_EVENT) {
switch (event_id) {
case WIFI_EVENT_STA_START:
ESP_LOGI(TAG, "WiFi started, attempting connection...");
if (has_config) {
current_led_state = LED_STATE_WAITING;
}
break;
case WIFI_EVENT_STA_DISCONNECTED:
wifi_event_sta_disconnected_t* event = (wifi_event_sta_disconnected_t*) event_data;
ESP_LOGW(TAG, "WiFi disconnected, reason: %d", event->reason);
if (!wifi_connected && has_config) {
current_led_state = LED_STATE_FAILED;
ESP_LOGE(TAG, "WiFi connection FAILED - RED LED blinking");
}
break;
}
} else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
ip_event_got_ip_t* event = (ip_event_got_ip_t*) event_data;
ESP_LOGI(TAG, "got ip:" IPSTR " gw:" IPSTR " netmask:" IPSTR,
IP2STR(&event->ip_info.ip),
IP2STR(&event->ip_info.gw),
IP2STR(&event->ip_info.netmask));
// Auto-start iperf server after getting IP
vTaskDelay(pdMS_TO_TICKS(1000)); // Wait 1 second for stability
wifi_connected = true;
current_led_state = LED_STATE_CONNECTED;
ESP_LOGI(TAG, "WiFi CONNECTED - BLUE LED solid");
// Auto-start iperf server
vTaskDelay(pdMS_TO_TICKS(1000));
iperf_cfg_t cfg;
memset(&cfg, 0, sizeof(cfg));
@ -48,13 +159,37 @@ void app_main(void)
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
// Initialize RGB LED
rgb_led_init();
// Start LED task
xTaskCreate(led_task, "led_task", 4096, NULL, 5, NULL);
// Register WiFi events
ESP_ERROR_CHECK(esp_event_handler_instance_register(
WIFI_EVENT, ESP_EVENT_ANY_ID, &event_handler, NULL, NULL));
ESP_ERROR_CHECK(esp_event_handler_instance_register(
IP_EVENT, IP_EVENT_STA_GOT_IP, &event_handler, NULL, NULL));
// Initialize WiFi config
wifi_cfg_init();
wifi_cfg_apply_from_nvs();
ESP_LOGI(TAG, "System init complete. iperf server will start after WiFi connects.");
// Try to load config from NVS
if (wifi_cfg_apply_from_nvs()) {
has_config = true;
current_led_state = LED_STATE_WAITING;
ESP_LOGI(TAG, "WiFi config loaded from NVS");
} else {
has_config = false;
current_led_state = LED_STATE_NO_CONFIG;
ESP_LOGI(TAG, "No WiFi config - YELLOW LED");
}
ESP_LOGI(TAG, "LED Status:");
ESP_LOGI(TAG, " YELLOW solid = NO CONFIG (send CFG/END)");
ESP_LOGI(TAG, " BLUE slow blink = Connecting");
ESP_LOGI(TAG, " BLUE solid = Connected ✓");
ESP_LOGI(TAG, " RED fast blink = Failed ✗");
while(1) {
vTaskDelay(pdMS_TO_TICKS(1000));

244
map_usb_to_ip.py Executable file
View File

@ -0,0 +1,244 @@
#!/usr/bin/env python3
"""
Map ESP32 USB ports to IP addresses
Creates and manages mapping between /dev/ttyUSB* and assigned IPs
"""
import serial.tools.list_ports
import argparse
import json
import glob
import re
from pathlib import Path
class USBIPMapper:
def __init__(self, start_ip="192.168.1.51", config_file="usb_ip_map.json"):
self.start_ip = start_ip
self.config_file = config_file
self.mapping = {}
def get_ip_for_index(self, index):
"""Calculate IP address for a given index"""
ip_parts = self.start_ip.split('.')
base_ip = int(ip_parts[3])
ip_parts[3] = str(base_ip + index)
return '.'.join(ip_parts)
def extract_usb_number(self, port):
"""Extract number from /dev/ttyUSBX"""
match = re.search(r'ttyUSB(\d+)', port)
if match:
return int(match.group(1))
return None
def detect_devices(self):
"""Detect all ESP32 USB devices and create mapping"""
devices = sorted(glob.glob('/dev/ttyUSB*'))
print(f"\n{'='*70}")
print(f"ESP32 USB to IP Address Mapping")
print(f"{'='*70}")
print(f"Start IP: {self.start_ip}")
print(f"Detected {len(devices)} USB device(s)\n")
self.mapping = {}
for idx, port in enumerate(devices):
usb_num = self.extract_usb_number(port)
ip = self.get_ip_for_index(idx)
# Get device info
try:
ports = serial.tools.list_ports.comports()
device_info = next((p for p in ports if p.device == port), None)
if device_info:
serial_num = device_info.serial_number or "Unknown"
description = device_info.description or "Unknown"
else:
serial_num = "Unknown"
description = "Unknown"
except:
serial_num = "Unknown"
description = "Unknown"
self.mapping[port] = {
'index': idx,
'usb_number': usb_num,
'ip': ip,
'serial': serial_num,
'description': description
}
print(f"[{idx:2d}] {port:14s}{ip:15s} (USB{usb_num}, SN: {serial_num})")
print(f"\n{'='*70}")
print(f"Total: {len(devices)} devices mapped")
print(f"IP Range: {self.mapping[devices[0]]['ip']} - {self.mapping[devices[-1]]['ip']}" if devices else "")
print(f"{'='*70}\n")
return self.mapping
def save_mapping(self):
"""Save mapping to JSON file"""
with open(self.config_file, 'w') as f:
json.dump(self.mapping, f, indent=2)
print(f"✓ Mapping saved to {self.config_file}")
def load_mapping(self):
"""Load mapping from JSON file"""
try:
with open(self.config_file, 'r') as f:
self.mapping = json.load(f)
print(f"✓ Mapping loaded from {self.config_file}")
return self.mapping
except FileNotFoundError:
print(f"✗ No saved mapping found at {self.config_file}")
return {}
def get_ip(self, port):
"""Get IP address for a specific USB port"""
if port in self.mapping:
return self.mapping[port]['ip']
return None
def get_port(self, ip):
"""Get USB port for a specific IP address"""
for port, info in self.mapping.items():
if info['ip'] == ip:
return port
return None
def print_mapping(self):
"""Print current mapping"""
if not self.mapping:
print("No mapping loaded. Run with --detect first.")
return
print(f"\n{'='*70}")
print(f"Current USB to IP Mapping")
print(f"{'='*70}")
for port, info in sorted(self.mapping.items(), key=lambda x: x[1]['index']):
print(f"[{info['index']:2d}] {port:14s}{info['ip']:15s} (USB{info['usb_number']})")
print(f"{'='*70}\n")
def export_bash_script(self, filename="usb_ip_vars.sh"):
"""Export mapping as bash variables"""
with open(filename, 'w') as f:
f.write("#!/bin/bash\n")
f.write("# USB to IP mapping - Auto-generated\n\n")
# Create associative array
f.write("declare -A USB_TO_IP\n")
for port, info in self.mapping.items():
f.write(f"USB_TO_IP[{port}]=\"{info['ip']}\"\n")
f.write("\n# Create reverse mapping\n")
f.write("declare -A IP_TO_USB\n")
for port, info in self.mapping.items():
f.write(f"IP_TO_USB[{info['ip']}]=\"{port}\"\n")
f.write("\n# Helper functions\n")
f.write("get_ip_for_usb() { echo \"${USB_TO_IP[$1]}\"; }\n")
f.write("get_usb_for_ip() { echo \"${IP_TO_USB[$1]}\"; }\n")
print(f"✓ Bash script exported to {filename}")
print(f" Usage: source {filename} && get_ip_for_usb /dev/ttyUSB0")
def main():
parser = argparse.ArgumentParser(
description='Map ESP32 USB ports to IP addresses',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Detect devices and create mapping
%(prog)s --detect
# Detect and save to file
%(prog)s --detect --save
# Load saved mapping and display
%(prog)s --load --print
# Get IP for specific USB port
%(prog)s --load --port /dev/ttyUSB5
# Get USB port for specific IP
%(prog)s --load --ip 192.168.1.55
# Export as bash script
%(prog)s --load --export
# Use custom IP range
%(prog)s --detect --start-ip 10.0.0.100
"""
)
parser.add_argument('--detect', action='store_true',
help='Detect USB devices and create mapping')
parser.add_argument('--save', action='store_true',
help='Save mapping to file')
parser.add_argument('--load', action='store_true',
help='Load mapping from file')
parser.add_argument('--print', action='store_true',
help='Print current mapping')
parser.add_argument('--start-ip', default='192.168.1.51',
help='Starting IP address (default: 192.168.1.51)')
parser.add_argument('--config', default='usb_ip_map.json',
help='Config file path (default: usb_ip_map.json)')
parser.add_argument('--port', metavar='PORT',
help='Get IP for specific USB port (e.g., /dev/ttyUSB5)')
parser.add_argument('--ip', metavar='IP',
help='Get USB port for specific IP address')
parser.add_argument('--export', action='store_true',
help='Export mapping as bash script')
args = parser.parse_args()
mapper = USBIPMapper(start_ip=args.start_ip, config_file=args.config)
# Detect devices
if args.detect:
mapper.detect_devices()
if args.save:
mapper.save_mapping()
# Load mapping
if args.load:
mapper.load_mapping()
# Print mapping
if args.print:
mapper.print_mapping()
# Query specific port
if args.port:
if not mapper.mapping:
mapper.load_mapping()
ip = mapper.get_ip(args.port)
if ip:
print(f"{args.port}{ip}")
else:
print(f"Port {args.port} not found in mapping")
# Query specific IP
if args.ip:
if not mapper.mapping:
mapper.load_mapping()
port = mapper.get_port(args.ip)
if port:
print(f"{args.ip}{port}")
else:
print(f"IP {args.ip} not found in mapping")
# Export bash script
if args.export:
if not mapper.mapping:
mapper.load_mapping()
mapper.export_bash_script()
# Default: detect and print
if not any([args.detect, args.load, args.print, args.port, args.ip, args.export]):
mapper.detect_devices()
if __name__ == '__main__':
main()

View File

@ -8,7 +8,9 @@ PASSWORD = "ez2remember"
START_IP = 51
devices = sorted(glob.glob('/dev/ttyUSB*'))
print(f"Found {len(devices)} devices\n")
num_devices = len(devices)
ok_devices = 0
print(f"Found {num_devices} devices\n")
for idx, dev in enumerate(devices):
ip = f"192.168.1.{START_IP + idx}"
@ -40,13 +42,14 @@ for idx, dev in enumerate(devices):
response = ser.read(100).decode('utf-8', errors='ignore')
if 'OK' in response:
print(f" ✓ Got OK response")
ok_devices += 1
ser.close()
except Exception as e:
print(f" ✗ Error: {e}")
time.sleep(0.5)
print(f"\nOk={ok_devices} Not ok={num_devices - ok_devices}")
print("\nWaiting 30s for connections...")
time.sleep(30)
print("Done! Test with: NUM_DEVICES=31 ./test_devices.sh")
print(f"Done! Test with: NUM_DEVICES={num_devices} ./test_devices.sh")

315
reconfig_simple_nextip.py Executable file
View File

@ -0,0 +1,315 @@
#!/usr/bin/env python3
import argparse
import glob
import re
import serial
import time
import sys
from serial.tools import list_ports
import json
import os
DEFAULT_PATTERN = "/dev/ttyUSB*"
MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json")
YELLOW_TOKENS = [
"NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG",
"YELLOW", "LED_STATE_NO_CONFIG"
]
IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE)
def eprint(*a, **kw):
print(*a, file=sys.stderr, **kw)
def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None):
ser.timeout = timeout
ser.write_timeout = timeout
def now(): return time.time()
def read_and_collect(sleep_s=0.05):
buf = b""
# sleep but respect deadline
t_end = now() + sleep_s
while now() < t_end:
time.sleep(0.01)
try:
while True:
if deadline and now() >= deadline: break
chunk = ser.read(256)
if not chunk:
break
buf += chunk
except Exception:
pass
return buf.decode('utf-8', errors='ignore')
text = ""
# initial settle
t_end = now() + settle
while now() < t_end:
time.sleep(0.01)
text += read_and_collect(0.0)
# probes
for cmd in probes:
if deadline and now() >= deadline: break
try:
ser.write(cmd)
except Exception:
pass
text += read_and_collect(0.1)
if verbose and text.strip():
eprint("--- STATUS TEXT BEGIN ---")
eprint(text)
eprint("--- STATUS TEXT END ---")
utext = text.upper()
return any(tok in utext for tok in YELLOW_TOKENS), text
def parse_ip_from_text(text):
for m in IP_REGEX.finditer(text or ""):
ip = m.group(1)
try:
octs = [int(x) for x in ip.split(".")]
if all(0 <= x <= 255 for x in octs):
return ip
except Exception:
pass
return None
def next_free_ip(used_last_octets, start_ip_octet, max_octet=254):
x = start_ip_octet
while x <= max_octet:
if x not in used_last_octets:
used_last_octets.add(x)
return x
x += 1
raise RuntimeError("No free IPs left in the range")
def load_map(path):
if os.path.exists(path):
try:
with open(path, "r") as f:
return json.load(f)
except Exception:
return {}
return {}
def usb_serial_for_port(dev):
for p in list_ports.comports():
if p.device == dev:
return p.serial_number or p.hwid or dev
return dev
def configure_device(ser, ssid, password, ip, dhcp, verbose=False):
def writeln(s):
if isinstance(s, str):
s = s.encode()
ser.write(s + b"\n")
time.sleep(0.05)
time.sleep(0.15)
writeln("CFG")
writeln(f"SSID:{ssid}")
writeln(f"PASS:{password}")
if dhcp:
writeln("DHCP:1")
else:
writeln(f"IP:{ip}")
writeln("MASK:255.255.255.0")
writeln("GW:192.168.1.1")
writeln("DHCP:0")
writeln("END")
time.sleep(0.2)
resp = b""
try:
while True:
chunk = ser.read(256)
if not chunk:
break
resp += chunk
except Exception:
pass
text = resp.decode('utf-8', errors='ignore')
if verbose and text.strip():
eprint("--- CONFIG RESPONSE BEGIN ---")
eprint(text)
eprint("--- CONFIG RESPONSE END ---")
ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper())
return ok, text
def main():
parser = argparse.ArgumentParser(
description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex."
)
parser.add_argument("--ssid", default="ClubHouse2G", help="WiFi SSID")
parser.add_argument("--password", default="ez2remember", help="WiFi password")
parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})")
parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'")
parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate")
parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)")
parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)")
parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe")
parser.add_argument("--only-yellow", action="store_true",
help="Only program devices that appear to be in 'no WiFi config' (solid yellow) state")
parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP")
parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)")
parser.add_argument("--persist-map", action="store_true",
help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs")
parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)")
parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr")
parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen")
args = parser.parse_args()
if args.list:
print("Ports:")
for p in list_ports.comports():
print(f" {p.device:>12} sn={p.serial_number} desc={p.description}")
return
devices = sorted(glob.glob(args.pattern))
if args.exclude:
devices = [d for d in devices if not re.search(args.exclude, d)]
print(f"Found {len(devices)} devices matching {args.pattern}", flush=True)
if args.exclude:
print(f"Excluding devices matching /{args.exclude}/", flush=True)
ip_map = load_map(MAP_FILE) if args.persist_map else {}
used_last_octets = set()
prepass_info = {}
for i, dev in enumerate(devices):
print(f"[pre] {i+1}/{len(devices)} probing {dev}", flush=True)
start_t = time.time()
already_ip = None
no_cfg = False
try:
ser = serial.Serial(
dev,
args.baud,
timeout=args.timeout,
write_timeout=args.timeout,
rtscts=False,
dsrdtr=False,
xonxoff=False,
)
# gentle DTR/RTS toggle
try:
ser.dtr = False; ser.rts = False; time.sleep(0.02)
ser.dtr = True; ser.rts = True; time.sleep(0.02)
except Exception:
pass
probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n")
deadline = start_t + max(0.4, args.per_device_cap)
no_cfg, text = detect_no_config(
ser, verbose=args.verbose, settle=args.settle,
timeout=args.timeout, probes=probes, deadline=deadline
)
already_ip = parse_ip_from_text(text)
ser.close()
except Exception as e:
eprint(f" [warn] {dev} probe error: {e}")
dur = time.time() - start_t
print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True)
prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip}
if already_ip and not args.dhcp:
try:
last = int(already_ip.split(".")[-1])
used_last_octets.add(last)
except Exception:
pass
ok_devices = 0
skipped = 0
errors = 0
for idx, dev in enumerate(devices):
info = prepass_info.get(dev, {})
already_ip = info.get("ip")
no_cfg = info.get("no_cfg", False)
usb_key = usb_serial_for_port(dev)
if already_ip and not args.dhcp:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True)
skipped += 1
if args.persist_map:
ip_map[usb_key] = already_ip
continue
if args.only_yellow and not no_cfg:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True)
skipped += 1
continue
# pick target IP
if args.dhcp:
target_ip = None
mode = "DHCP"
else:
target_last_octet = None
if args.persist_map and usb_key in ip_map:
try:
prev_ip = ip_map[usb_key]
target_last_octet = int(prev_ip.split(".")[-1])
if target_last_octet in used_last_octets:
target_last_octet = None
except Exception:
target_last_octet = None
if target_last_octet is None:
target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254)
target_ip = f"192.168.1.{target_last_octet}"
mode = f"Static {target_ip}"
print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True)
if args.dry_run:
print(" (dry-run) Would send CFG/END", flush=True)
ok = True
else:
try:
ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout)
ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose)
ser.close()
except Exception as e:
print(f" ✗ Error opening/configuring: {e}", flush=True)
ok = False
if ok:
print(" ✓ OK", flush=True)
ok_devices += 1
if not args.dhcp and args.persist_map and target_ip:
ip_map[usb_key] = target_ip
else:
print(" ✗ Failed", flush=True)
errors += 1
time.sleep(0.05)
if args.persist_map:
try:
with open(MAP_FILE, "w") as f:
json.dump(ip_map, f, indent=2, sort_keys=True)
print(f"Persisted mapping to {MAP_FILE}", flush=True)
except Exception as e:
print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True)
print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True)
if __name__ == "__main__":
main()