From 42d0ce9a7bfea7bbe0a12bacd4a2faa29c5366c6 Mon Sep 17 00:00:00 2001 From: Bob Date: Wed, 12 Nov 2025 15:24:16 -0800 Subject: [PATCH] more scripts --- config_device.py | 198 ++++++++++++++++++++++++ main/CMakeLists.txt | 12 +- main/main.c | 145 +++++++++++++++++- map_usb_to_ip.py | 244 +++++++++++++++++++++++++++++ reconfig_simple.py | 17 +- reconfig_simple_nextip.py | 315 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 908 insertions(+), 23 deletions(-) create mode 100755 config_device.py create mode 100755 map_usb_to_ip.py create mode 100755 reconfig_simple_nextip.py diff --git a/config_device.py b/config_device.py new file mode 100755 index 0000000..61097bf --- /dev/null +++ b/config_device.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +ESP32 WiFi Configuration Tool - With verbose mode +""" + +import serial +import time +import sys +import argparse + +def log_verbose(message, verbose=False): + """Print message only if verbose is enabled""" + if verbose: + print(f"[VERBOSE] {message}") + +def config_device(port, ip, ssid="ClubHouse2G", password="ez2remember", + gateway="192.168.1.1", netmask="255.255.255.0", verbose=False): + """Configure ESP32 device via serial""" + + print(f"\n{'='*70}") + print(f"ESP32 WiFi Configuration") + print(f"{'='*70}") + print(f"Port: {port}") + print(f"SSID: {ssid}") + print(f"Password: {'*' * len(password)}") + print(f"IP: {ip}") + print(f"Gateway: {gateway}") + print(f"Netmask: {netmask}") + print(f"Verbose: {verbose}") + print(f"{'='*70}\n") + + try: + # Open serial connection + log_verbose(f"Opening serial port {port} at 115200 baud...", verbose) + ser = serial.Serial(port, 115200, timeout=0.5, write_timeout=0.5) + log_verbose(f"Serial port opened successfully", verbose) + log_verbose(f"Port settings: {ser}", verbose) + + time.sleep(0.2) + + # Check if there's any data waiting + if ser.in_waiting: + log_verbose(f"{ser.in_waiting} bytes waiting in buffer", verbose) + existing = ser.read(ser.in_waiting).decode('utf-8', errors='ignore') + log_verbose(f"Existing data: {existing[:100]}", verbose) + + # Build config message + config_lines = [ + "CFG", + f"SSID:{ssid}", + f"PASS:{password}", + f"IP:{ip}", + f"MASK:{netmask}", + f"GW:{gateway}", + "DHCP:0", + "END" + ] + + config = '\n'.join(config_lines) + '\n' + + log_verbose(f"Config message size: {len(config)} bytes", verbose) + if verbose: + print("[VERBOSE] Config message:") + for line in config_lines: + display_line = line if not line.startswith("PASS:") else "PASS:********" + print(f"[VERBOSE] {display_line}") + + # Send config + print("Sending configuration...") + start_time = time.time() + + bytes_written = ser.write(config.encode('utf-8')) + ser.flush() + + send_time = time.time() - start_time + log_verbose(f"Wrote {bytes_written} bytes in {send_time:.3f}s", verbose) + + print("Sent! Waiting for response...") + time.sleep(2) + + # Read response + if ser.in_waiting: + response_size = ser.in_waiting + log_verbose(f"Response available: {response_size} bytes", verbose) + + response = ser.read(response_size).decode('utf-8', errors='ignore') + + if verbose: + print("[VERBOSE] Raw response:") + for line in response.split('\n')[:20]: # Show first 20 lines + if line.strip(): + print(f"[VERBOSE] {line}") + + # Check for key indicators + if "OK" in response: + print("✓ Device acknowledged configuration (OK)") + if "got ip:" in response.lower(): + print("✓ Device connected to WiFi!") + # Extract IP from response + import re + ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', response, re.IGNORECASE) + if ip_match: + print(f" Assigned IP: {ip_match.group(1)}") + if "connected" in response.lower(): + print("✓ WiFi connection established") + if "failed" in response.lower() or "disconnect" in response.lower(): + print("✗ WiFi connection may have failed") + if verbose: + print("[VERBOSE] Check response above for error details") + else: + log_verbose("No immediate response from device", verbose) + print("⚠ No response (device may still be processing)") + + # Get final port stats + if verbose: + log_verbose(f"Input buffer: {ser.in_waiting} bytes", verbose) + log_verbose(f"Output buffer empty: {ser.out_waiting == 0}", verbose) + + ser.close() + log_verbose("Serial port closed", verbose) + + print(f"\nConfiguration sent to {port}") + print(f"Expected IP: {ip}") + print(f"Test with: ping {ip}") + print(f" iperf -c {ip}") + + return True + + except serial.SerialException as e: + print(f"\n✗ Serial error: {e}") + log_verbose(f"Serial exception details: {type(e).__name__}", verbose) + print(" Is another program using this port? (idf.py monitor, screen, etc.)") + return False + except KeyboardInterrupt: + print("\n\nConfiguration cancelled by user") + if 'ser' in locals() and ser.is_open: + ser.close() + log_verbose("Serial port closed after interrupt", verbose) + return False + except Exception as e: + print(f"\n✗ Error: {e}") + if verbose: + import traceback + print("\n[VERBOSE] Full traceback:") + traceback.print_exc() + return False + +def main(): + parser = argparse.ArgumentParser( + description='Configure ESP32 WiFi via serial', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic configuration + %(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 + + # With verbose output + %(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 -v + + # Custom WiFi credentials + %(prog)s -p /dev/ttyUSB0 -i 192.168.1.52 -s MyWiFi -P mypass -v + + # Custom gateway + %(prog)s -p /dev/ttyUSB0 -i 10.0.0.100 -g 10.0.0.1 + """ + ) + + parser.add_argument('-p', '--port', required=True, + help='Serial port (e.g., /dev/ttyUSB0)') + parser.add_argument('-i', '--ip', required=True, + help='Static IP address') + parser.add_argument('-s', '--ssid', default='ClubHouse2G', + help='WiFi SSID (default: ClubHouse2G)') + parser.add_argument('-P', '--password', default='ez2remember', + help='WiFi password (default: ez2remember)') + parser.add_argument('-g', '--gateway', default='192.168.1.1', + help='Gateway IP (default: 192.168.1.1)') + parser.add_argument('-m', '--netmask', default='255.255.255.0', + help='Netmask (default: 255.255.255.0)') + parser.add_argument('-v', '--verbose', action='store_true', + help='Enable verbose output (show detailed debug info)') + + args = parser.parse_args() + + success = config_device( + port=args.port, + ip=args.ip, + ssid=args.ssid, + password=args.password, + gateway=args.gateway, + netmask=args.netmask, + verbose=args.verbose + ) + + sys.exit(0 if success else 1) + +if __name__ == '__main__': + main() diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index 5d48ff4..ac46b82 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -1,15 +1,5 @@ idf_component_register( SRCS "main.c" INCLUDE_DIRS "." - REQUIRES - esp_wifi - nvs_flash - esp_netif - console - lwip - driver - esp_driver_uart - vfs - iperf - wifi_cfg + PRIV_REQUIRES esp_driver_gpio ) diff --git a/main/main.c b/main/main.c index 6ec26aa..518ca00 100644 --- a/main/main.c +++ b/main/main.c @@ -9,28 +9,139 @@ #include "esp_system.h" #include "esp_event.h" #include "esp_log.h" +#include "esp_wifi.h" #include "nvs_flash.h" #include "esp_netif.h" #include "lwip/inet.h" +#include "led_strip.h" + #include "iperf.h" #include "wifi_cfg.h" static const char *TAG = "main"; +// WS2812 RGB LED - try both GPIO 38 and 48 +#define RGB_LED_GPIO 48 // If this doesn't work, try 38 + +static led_strip_handle_t led_strip; + +// WiFi connection status +static bool wifi_connected = false; +static bool has_config = false; + +// LED states +typedef enum { + LED_STATE_NO_CONFIG, // Yellow - no WiFi config in NVS + LED_STATE_WAITING, // Blue slow blink - trying to connect + LED_STATE_CONNECTED, // Blue solid - connected successfully + LED_STATE_FAILED // Red fast blink - connection failed +} led_state_t; + +static led_state_t current_led_state = LED_STATE_NO_CONFIG; + +static void rgb_led_init(void) +{ + led_strip_config_t strip_config = { + .strip_gpio_num = RGB_LED_GPIO, + .max_leds = 1, + }; + + led_strip_rmt_config_t rmt_config = { + .resolution_hz = 10 * 1000 * 1000, // 10MHz + }; + + ESP_ERROR_CHECK(led_strip_new_rmt_device(&strip_config, &rmt_config, &led_strip)); + led_strip_clear(led_strip); + + ESP_LOGI(TAG, "WS2812 RGB LED initialized on GPIO %d", RGB_LED_GPIO); +} + +static void set_led_color(uint8_t r, uint8_t g, uint8_t b) +{ + led_strip_set_pixel(led_strip, 0, r, g, b); + led_strip_refresh(led_strip); +} + +static void led_task(void *arg) +{ + int blink_state = 0; + + while(1) { + switch(current_led_state) { + case LED_STATE_NO_CONFIG: + // Yellow solid - no WiFi config + set_led_color(255, 255, 0); // Red + Green = Yellow + vTaskDelay(pdMS_TO_TICKS(1000)); + break; + + case LED_STATE_WAITING: + // Blue slow blink - waiting for WiFi + if (blink_state) { + set_led_color(0, 0, 255); // Blue + } else { + set_led_color(0, 0, 0); // Off + } + blink_state = !blink_state; + vTaskDelay(pdMS_TO_TICKS(1000)); + break; + + case LED_STATE_CONNECTED: + // Blue solid - connected successfully + set_led_color(0, 0, 255); // Blue + vTaskDelay(pdMS_TO_TICKS(1000)); + break; + + case LED_STATE_FAILED: + // Red fast blink - connection failed + if (blink_state) { + set_led_color(255, 0, 0); // Red + } else { + set_led_color(0, 0, 0); // Off + } + blink_state = !blink_state; + vTaskDelay(pdMS_TO_TICKS(200)); + break; + } + } +} + static void event_handler(void* arg, esp_event_base_t event_base, int32_t event_id, void* event_data) { - if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { + if (event_base == WIFI_EVENT) { + switch (event_id) { + case WIFI_EVENT_STA_START: + ESP_LOGI(TAG, "WiFi started, attempting connection..."); + if (has_config) { + current_led_state = LED_STATE_WAITING; + } + break; + + case WIFI_EVENT_STA_DISCONNECTED: + wifi_event_sta_disconnected_t* event = (wifi_event_sta_disconnected_t*) event_data; + ESP_LOGW(TAG, "WiFi disconnected, reason: %d", event->reason); + + if (!wifi_connected && has_config) { + current_led_state = LED_STATE_FAILED; + ESP_LOGE(TAG, "WiFi connection FAILED - RED LED blinking"); + } + break; + } + } else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { ip_event_got_ip_t* event = (ip_event_got_ip_t*) event_data; ESP_LOGI(TAG, "got ip:" IPSTR " gw:" IPSTR " netmask:" IPSTR, IP2STR(&event->ip_info.ip), IP2STR(&event->ip_info.gw), IP2STR(&event->ip_info.netmask)); - // Auto-start iperf server after getting IP - vTaskDelay(pdMS_TO_TICKS(1000)); // Wait 1 second for stability + wifi_connected = true; + current_led_state = LED_STATE_CONNECTED; + ESP_LOGI(TAG, "WiFi CONNECTED - BLUE LED solid"); + + // Auto-start iperf server + vTaskDelay(pdMS_TO_TICKS(1000)); iperf_cfg_t cfg; memset(&cfg, 0, sizeof(cfg)); @@ -48,13 +159,37 @@ void app_main(void) ESP_ERROR_CHECK(esp_netif_init()); ESP_ERROR_CHECK(esp_event_loop_create_default()); + // Initialize RGB LED + rgb_led_init(); + + // Start LED task + xTaskCreate(led_task, "led_task", 4096, NULL, 5, NULL); + + // Register WiFi events + ESP_ERROR_CHECK(esp_event_handler_instance_register( + WIFI_EVENT, ESP_EVENT_ANY_ID, &event_handler, NULL, NULL)); ESP_ERROR_CHECK(esp_event_handler_instance_register( IP_EVENT, IP_EVENT_STA_GOT_IP, &event_handler, NULL, NULL)); + // Initialize WiFi config wifi_cfg_init(); - wifi_cfg_apply_from_nvs(); - ESP_LOGI(TAG, "System init complete. iperf server will start after WiFi connects."); + // Try to load config from NVS + if (wifi_cfg_apply_from_nvs()) { + has_config = true; + current_led_state = LED_STATE_WAITING; + ESP_LOGI(TAG, "WiFi config loaded from NVS"); + } else { + has_config = false; + current_led_state = LED_STATE_NO_CONFIG; + ESP_LOGI(TAG, "No WiFi config - YELLOW LED"); + } + + ESP_LOGI(TAG, "LED Status:"); + ESP_LOGI(TAG, " YELLOW solid = NO CONFIG (send CFG/END)"); + ESP_LOGI(TAG, " BLUE slow blink = Connecting"); + ESP_LOGI(TAG, " BLUE solid = Connected ✓"); + ESP_LOGI(TAG, " RED fast blink = Failed ✗"); while(1) { vTaskDelay(pdMS_TO_TICKS(1000)); diff --git a/map_usb_to_ip.py b/map_usb_to_ip.py new file mode 100755 index 0000000..48f23b8 --- /dev/null +++ b/map_usb_to_ip.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +""" +Map ESP32 USB ports to IP addresses +Creates and manages mapping between /dev/ttyUSB* and assigned IPs +""" + +import serial.tools.list_ports +import argparse +import json +import glob +import re +from pathlib import Path + +class USBIPMapper: + def __init__(self, start_ip="192.168.1.51", config_file="usb_ip_map.json"): + self.start_ip = start_ip + self.config_file = config_file + self.mapping = {} + + def get_ip_for_index(self, index): + """Calculate IP address for a given index""" + ip_parts = self.start_ip.split('.') + base_ip = int(ip_parts[3]) + ip_parts[3] = str(base_ip + index) + return '.'.join(ip_parts) + + def extract_usb_number(self, port): + """Extract number from /dev/ttyUSBX""" + match = re.search(r'ttyUSB(\d+)', port) + if match: + return int(match.group(1)) + return None + + def detect_devices(self): + """Detect all ESP32 USB devices and create mapping""" + devices = sorted(glob.glob('/dev/ttyUSB*')) + + print(f"\n{'='*70}") + print(f"ESP32 USB to IP Address Mapping") + print(f"{'='*70}") + print(f"Start IP: {self.start_ip}") + print(f"Detected {len(devices)} USB device(s)\n") + + self.mapping = {} + + for idx, port in enumerate(devices): + usb_num = self.extract_usb_number(port) + ip = self.get_ip_for_index(idx) + + # Get device info + try: + ports = serial.tools.list_ports.comports() + device_info = next((p for p in ports if p.device == port), None) + if device_info: + serial_num = device_info.serial_number or "Unknown" + description = device_info.description or "Unknown" + else: + serial_num = "Unknown" + description = "Unknown" + except: + serial_num = "Unknown" + description = "Unknown" + + self.mapping[port] = { + 'index': idx, + 'usb_number': usb_num, + 'ip': ip, + 'serial': serial_num, + 'description': description + } + + print(f"[{idx:2d}] {port:14s} → {ip:15s} (USB{usb_num}, SN: {serial_num})") + + print(f"\n{'='*70}") + print(f"Total: {len(devices)} devices mapped") + print(f"IP Range: {self.mapping[devices[0]]['ip']} - {self.mapping[devices[-1]]['ip']}" if devices else "") + print(f"{'='*70}\n") + + return self.mapping + + def save_mapping(self): + """Save mapping to JSON file""" + with open(self.config_file, 'w') as f: + json.dump(self.mapping, f, indent=2) + print(f"✓ Mapping saved to {self.config_file}") + + def load_mapping(self): + """Load mapping from JSON file""" + try: + with open(self.config_file, 'r') as f: + self.mapping = json.load(f) + print(f"✓ Mapping loaded from {self.config_file}") + return self.mapping + except FileNotFoundError: + print(f"✗ No saved mapping found at {self.config_file}") + return {} + + def get_ip(self, port): + """Get IP address for a specific USB port""" + if port in self.mapping: + return self.mapping[port]['ip'] + return None + + def get_port(self, ip): + """Get USB port for a specific IP address""" + for port, info in self.mapping.items(): + if info['ip'] == ip: + return port + return None + + def print_mapping(self): + """Print current mapping""" + if not self.mapping: + print("No mapping loaded. Run with --detect first.") + return + + print(f"\n{'='*70}") + print(f"Current USB to IP Mapping") + print(f"{'='*70}") + for port, info in sorted(self.mapping.items(), key=lambda x: x[1]['index']): + print(f"[{info['index']:2d}] {port:14s} → {info['ip']:15s} (USB{info['usb_number']})") + print(f"{'='*70}\n") + + def export_bash_script(self, filename="usb_ip_vars.sh"): + """Export mapping as bash variables""" + with open(filename, 'w') as f: + f.write("#!/bin/bash\n") + f.write("# USB to IP mapping - Auto-generated\n\n") + + # Create associative array + f.write("declare -A USB_TO_IP\n") + for port, info in self.mapping.items(): + f.write(f"USB_TO_IP[{port}]=\"{info['ip']}\"\n") + + f.write("\n# Create reverse mapping\n") + f.write("declare -A IP_TO_USB\n") + for port, info in self.mapping.items(): + f.write(f"IP_TO_USB[{info['ip']}]=\"{port}\"\n") + + f.write("\n# Helper functions\n") + f.write("get_ip_for_usb() { echo \"${USB_TO_IP[$1]}\"; }\n") + f.write("get_usb_for_ip() { echo \"${IP_TO_USB[$1]}\"; }\n") + + print(f"✓ Bash script exported to {filename}") + print(f" Usage: source {filename} && get_ip_for_usb /dev/ttyUSB0") + +def main(): + parser = argparse.ArgumentParser( + description='Map ESP32 USB ports to IP addresses', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Detect devices and create mapping + %(prog)s --detect + + # Detect and save to file + %(prog)s --detect --save + + # Load saved mapping and display + %(prog)s --load --print + + # Get IP for specific USB port + %(prog)s --load --port /dev/ttyUSB5 + + # Get USB port for specific IP + %(prog)s --load --ip 192.168.1.55 + + # Export as bash script + %(prog)s --load --export + + # Use custom IP range + %(prog)s --detect --start-ip 10.0.0.100 + """ + ) + + parser.add_argument('--detect', action='store_true', + help='Detect USB devices and create mapping') + parser.add_argument('--save', action='store_true', + help='Save mapping to file') + parser.add_argument('--load', action='store_true', + help='Load mapping from file') + parser.add_argument('--print', action='store_true', + help='Print current mapping') + parser.add_argument('--start-ip', default='192.168.1.51', + help='Starting IP address (default: 192.168.1.51)') + parser.add_argument('--config', default='usb_ip_map.json', + help='Config file path (default: usb_ip_map.json)') + parser.add_argument('--port', metavar='PORT', + help='Get IP for specific USB port (e.g., /dev/ttyUSB5)') + parser.add_argument('--ip', metavar='IP', + help='Get USB port for specific IP address') + parser.add_argument('--export', action='store_true', + help='Export mapping as bash script') + + args = parser.parse_args() + + mapper = USBIPMapper(start_ip=args.start_ip, config_file=args.config) + + # Detect devices + if args.detect: + mapper.detect_devices() + if args.save: + mapper.save_mapping() + + # Load mapping + if args.load: + mapper.load_mapping() + + # Print mapping + if args.print: + mapper.print_mapping() + + # Query specific port + if args.port: + if not mapper.mapping: + mapper.load_mapping() + ip = mapper.get_ip(args.port) + if ip: + print(f"{args.port} → {ip}") + else: + print(f"Port {args.port} not found in mapping") + + # Query specific IP + if args.ip: + if not mapper.mapping: + mapper.load_mapping() + port = mapper.get_port(args.ip) + if port: + print(f"{args.ip} → {port}") + else: + print(f"IP {args.ip} not found in mapping") + + # Export bash script + if args.export: + if not mapper.mapping: + mapper.load_mapping() + mapper.export_bash_script() + + # Default: detect and print + if not any([args.detect, args.load, args.print, args.port, args.ip, args.export]): + mapper.detect_devices() + +if __name__ == '__main__': + main() diff --git a/reconfig_simple.py b/reconfig_simple.py index febe0f0..658328a 100755 --- a/reconfig_simple.py +++ b/reconfig_simple.py @@ -8,16 +8,18 @@ PASSWORD = "ez2remember" START_IP = 51 devices = sorted(glob.glob('/dev/ttyUSB*')) -print(f"Found {len(devices)} devices\n") +num_devices = len(devices) +ok_devices = 0 +print(f"Found {num_devices} devices\n") for idx, dev in enumerate(devices): ip = f"192.168.1.{START_IP + idx}" print(f"[{idx}] Configuring {dev} → {ip}") - + try: ser = serial.Serial(dev, 115200, timeout=1) time.sleep(0.5) # Let serial port stabilize - + # Send configuration ser.write(b"CFG\n") time.sleep(0.1) @@ -34,19 +36,20 @@ for idx, dev in enumerate(devices): ser.write(b"DHCP:0\n") time.sleep(0.1) ser.write(b"END\n") - + # Wait for OK response time.sleep(0.5) response = ser.read(100).decode('utf-8', errors='ignore') if 'OK' in response: print(f" ✓ Got OK response") - + ok_devices += 1 ser.close() except Exception as e: print(f" ✗ Error: {e}") - + time.sleep(0.5) +print(f"\nOk={ok_devices} Not ok={num_devices - ok_devices}") print("\nWaiting 30s for connections...") time.sleep(30) -print("Done! Test with: NUM_DEVICES=31 ./test_devices.sh") +print(f"Done! Test with: NUM_DEVICES={num_devices} ./test_devices.sh") diff --git a/reconfig_simple_nextip.py b/reconfig_simple_nextip.py new file mode 100755 index 0000000..35e40c5 --- /dev/null +++ b/reconfig_simple_nextip.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +import argparse +import glob +import re +import serial +import time +import sys +from serial.tools import list_ports +import json +import os + +DEFAULT_PATTERN = "/dev/ttyUSB*" +MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json") + +YELLOW_TOKENS = [ + "NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG", + "YELLOW", "LED_STATE_NO_CONFIG" +] + +IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE) + +def eprint(*a, **kw): + print(*a, file=sys.stderr, **kw) + +def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None): + ser.timeout = timeout + ser.write_timeout = timeout + + def now(): return time.time() + + def read_and_collect(sleep_s=0.05): + buf = b"" + # sleep but respect deadline + t_end = now() + sleep_s + while now() < t_end: + time.sleep(0.01) + try: + while True: + if deadline and now() >= deadline: break + chunk = ser.read(256) + if not chunk: + break + buf += chunk + except Exception: + pass + return buf.decode('utf-8', errors='ignore') + + text = "" + # initial settle + t_end = now() + settle + while now() < t_end: + time.sleep(0.01) + text += read_and_collect(0.0) + + # probes + for cmd in probes: + if deadline and now() >= deadline: break + try: + ser.write(cmd) + except Exception: + pass + text += read_and_collect(0.1) + + if verbose and text.strip(): + eprint("--- STATUS TEXT BEGIN ---") + eprint(text) + eprint("--- STATUS TEXT END ---") + + utext = text.upper() + return any(tok in utext for tok in YELLOW_TOKENS), text + + +def parse_ip_from_text(text): + for m in IP_REGEX.finditer(text or ""): + ip = m.group(1) + try: + octs = [int(x) for x in ip.split(".")] + if all(0 <= x <= 255 for x in octs): + return ip + except Exception: + pass + return None + + +def next_free_ip(used_last_octets, start_ip_octet, max_octet=254): + x = start_ip_octet + while x <= max_octet: + if x not in used_last_octets: + used_last_octets.add(x) + return x + x += 1 + raise RuntimeError("No free IPs left in the range") + + +def load_map(path): + if os.path.exists(path): + try: + with open(path, "r") as f: + return json.load(f) + except Exception: + return {} + return {} + + +def usb_serial_for_port(dev): + for p in list_ports.comports(): + if p.device == dev: + return p.serial_number or p.hwid or dev + return dev + + +def configure_device(ser, ssid, password, ip, dhcp, verbose=False): + def writeln(s): + if isinstance(s, str): + s = s.encode() + ser.write(s + b"\n") + time.sleep(0.05) + + time.sleep(0.15) + + writeln("CFG") + writeln(f"SSID:{ssid}") + writeln(f"PASS:{password}") + if dhcp: + writeln("DHCP:1") + else: + writeln(f"IP:{ip}") + writeln("MASK:255.255.255.0") + writeln("GW:192.168.1.1") + writeln("DHCP:0") + writeln("END") + + time.sleep(0.2) + resp = b"" + try: + while True: + chunk = ser.read(256) + if not chunk: + break + resp += chunk + except Exception: + pass + text = resp.decode('utf-8', errors='ignore') + if verbose and text.strip(): + eprint("--- CONFIG RESPONSE BEGIN ---") + eprint(text) + eprint("--- CONFIG RESPONSE END ---") + + ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper()) + return ok, text + + +def main(): + parser = argparse.ArgumentParser( + description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex." + ) + parser.add_argument("--ssid", default="ClubHouse2G", help="Wi‑Fi SSID") + parser.add_argument("--password", default="ez2remember", help="Wi‑Fi password") + parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})") + parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'") + parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate") + parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)") + parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)") + parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe") + parser.add_argument("--only-yellow", action="store_true", + help="Only program devices that appear to be in 'no Wi‑Fi config' (solid yellow) state") + parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP") + parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)") + parser.add_argument("--persist-map", action="store_true", + help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs") + parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)") + parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr") + parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen") + args = parser.parse_args() + + if args.list: + print("Ports:") + for p in list_ports.comports(): + print(f" {p.device:>12} sn={p.serial_number} desc={p.description}") + return + + devices = sorted(glob.glob(args.pattern)) + if args.exclude: + devices = [d for d in devices if not re.search(args.exclude, d)] + + print(f"Found {len(devices)} devices matching {args.pattern}", flush=True) + if args.exclude: + print(f"Excluding devices matching /{args.exclude}/", flush=True) + + ip_map = load_map(MAP_FILE) if args.persist_map else {} + used_last_octets = set() + + prepass_info = {} + for i, dev in enumerate(devices): + print(f"[pre] {i+1}/{len(devices)} probing {dev} …", flush=True) + start_t = time.time() + already_ip = None + no_cfg = False + try: + ser = serial.Serial( + dev, + args.baud, + timeout=args.timeout, + write_timeout=args.timeout, + rtscts=False, + dsrdtr=False, + xonxoff=False, + ) + # gentle DTR/RTS toggle + try: + ser.dtr = False; ser.rts = False; time.sleep(0.02) + ser.dtr = True; ser.rts = True; time.sleep(0.02) + except Exception: + pass + + probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n") + deadline = start_t + max(0.4, args.per_device_cap) + no_cfg, text = detect_no_config( + ser, verbose=args.verbose, settle=args.settle, + timeout=args.timeout, probes=probes, deadline=deadline + ) + already_ip = parse_ip_from_text(text) + ser.close() + except Exception as e: + eprint(f" [warn] {dev} probe error: {e}") + dur = time.time() - start_t + print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True) + + prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip} + if already_ip and not args.dhcp: + try: + last = int(already_ip.split(".")[-1]) + used_last_octets.add(last) + except Exception: + pass + + ok_devices = 0 + skipped = 0 + errors = 0 + + for idx, dev in enumerate(devices): + info = prepass_info.get(dev, {}) + already_ip = info.get("ip") + no_cfg = info.get("no_cfg", False) + usb_key = usb_serial_for_port(dev) + + if already_ip and not args.dhcp: + print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True) + skipped += 1 + if args.persist_map: + ip_map[usb_key] = already_ip + continue + + if args.only_yellow and not no_cfg: + print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True) + skipped += 1 + continue + + # pick target IP + if args.dhcp: + target_ip = None + mode = "DHCP" + else: + target_last_octet = None + if args.persist_map and usb_key in ip_map: + try: + prev_ip = ip_map[usb_key] + target_last_octet = int(prev_ip.split(".")[-1]) + if target_last_octet in used_last_octets: + target_last_octet = None + except Exception: + target_last_octet = None + + if target_last_octet is None: + target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254) + target_ip = f"192.168.1.{target_last_octet}" + mode = f"Static {target_ip}" + + print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True) + if args.dry_run: + print(" (dry-run) Would send CFG/END", flush=True) + ok = True + else: + try: + ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout) + ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose) + ser.close() + except Exception as e: + print(f" ✗ Error opening/configuring: {e}", flush=True) + ok = False + + if ok: + print(" ✓ OK", flush=True) + ok_devices += 1 + if not args.dhcp and args.persist_map and target_ip: + ip_map[usb_key] = target_ip + else: + print(" ✗ Failed", flush=True) + errors += 1 + + time.sleep(0.05) + + if args.persist_map: + try: + with open(MAP_FILE, "w") as f: + json.dump(ip_map, f, indent=2, sort_keys=True) + print(f"Persisted mapping to {MAP_FILE}", flush=True) + except Exception as e: + print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True) + + print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True) + +if __name__ == "__main__": + main()