#!/usr/bin/env python3 import argparse import glob import re import serial import time import sys from serial.tools import list_ports import json import os DEFAULT_PATTERN = "/dev/ttyUSB*" MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json") YELLOW_TOKENS = [ "NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG", "YELLOW", "LED_STATE_NO_CONFIG" ] IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE) def eprint(*a, **kw): print(*a, file=sys.stderr, **kw) def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None): ser.timeout = timeout ser.write_timeout = timeout def now(): return time.time() def read_and_collect(sleep_s=0.05): buf = b"" # sleep but respect deadline t_end = now() + sleep_s while now() < t_end: time.sleep(0.01) try: while True: if deadline and now() >= deadline: break chunk = ser.read(256) if not chunk: break buf += chunk except Exception: pass return buf.decode('utf-8', errors='ignore') text = "" # initial settle t_end = now() + settle while now() < t_end: time.sleep(0.01) text += read_and_collect(0.0) # probes for cmd in probes: if deadline and now() >= deadline: break try: ser.write(cmd) except Exception: pass text += read_and_collect(0.1) if verbose and text.strip(): eprint("--- STATUS TEXT BEGIN ---") eprint(text) eprint("--- STATUS TEXT END ---") utext = text.upper() return any(tok in utext for tok in YELLOW_TOKENS), text def parse_ip_from_text(text): for m in IP_REGEX.finditer(text or ""): ip = m.group(1) try: octs = [int(x) for x in ip.split(".")] if all(0 <= x <= 255 for x in octs): return ip except Exception: pass return None def next_free_ip(used_last_octets, start_ip_octet, max_octet=254): x = start_ip_octet while x <= max_octet: if x not in used_last_octets: used_last_octets.add(x) return x x += 1 raise RuntimeError("No free IPs left in the range") def load_map(path): if os.path.exists(path): try: with open(path, "r") as f: return json.load(f) except Exception: return {} return {} def usb_serial_for_port(dev): for p in list_ports.comports(): if p.device == dev: return p.serial_number or p.hwid or dev return dev def configure_device(ser, ssid, password, ip, dhcp, verbose=False): def writeln(s): if isinstance(s, str): s = s.encode() ser.write(s + b"\n") time.sleep(0.05) time.sleep(0.15) writeln("CFG") writeln(f"SSID:{ssid}") writeln(f"PASS:{password}") if dhcp: writeln("DHCP:1") else: writeln(f"IP:{ip}") writeln("MASK:255.255.255.0") writeln("GW:192.168.1.1") writeln("DHCP:0") writeln("END") time.sleep(0.2) resp = b"" try: while True: chunk = ser.read(256) if not chunk: break resp += chunk except Exception: pass text = resp.decode('utf-8', errors='ignore') if verbose and text.strip(): eprint("--- CONFIG RESPONSE BEGIN ---") eprint(text) eprint("--- CONFIG RESPONSE END ---") ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper()) return ok, text def main(): parser = argparse.ArgumentParser( description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex." ) parser.add_argument("--ssid", default="ClubHouse2G", help="Wi‑Fi SSID") parser.add_argument("--password", default="ez2remember", help="Wi‑Fi password") parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})") parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'") parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate") parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)") parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)") parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe") parser.add_argument("--only-yellow", action="store_true", help="Only program devices that appear to be in 'no Wi‑Fi config' (solid yellow) state") parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP") parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)") parser.add_argument("--persist-map", action="store_true", help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs") parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)") parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr") parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen") args = parser.parse_args() if args.list: print("Ports:") for p in list_ports.comports(): print(f" {p.device:>12} sn={p.serial_number} desc={p.description}") return devices = sorted(glob.glob(args.pattern)) if args.exclude: devices = [d for d in devices if not re.search(args.exclude, d)] print(f"Found {len(devices)} devices matching {args.pattern}", flush=True) if args.exclude: print(f"Excluding devices matching /{args.exclude}/", flush=True) ip_map = load_map(MAP_FILE) if args.persist_map else {} used_last_octets = set() prepass_info = {} for i, dev in enumerate(devices): print(f"[pre] {i+1}/{len(devices)} probing {dev} …", flush=True) start_t = time.time() already_ip = None no_cfg = False try: ser = serial.Serial( dev, args.baud, timeout=args.timeout, write_timeout=args.timeout, rtscts=False, dsrdtr=False, xonxoff=False, ) # gentle DTR/RTS toggle try: ser.dtr = False; ser.rts = False; time.sleep(0.02) ser.dtr = True; ser.rts = True; time.sleep(0.02) except Exception: pass probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n") deadline = start_t + max(0.4, args.per_device_cap) no_cfg, text = detect_no_config( ser, verbose=args.verbose, settle=args.settle, timeout=args.timeout, probes=probes, deadline=deadline ) already_ip = parse_ip_from_text(text) ser.close() except Exception as e: eprint(f" [warn] {dev} probe error: {e}") dur = time.time() - start_t print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True) prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip} if already_ip and not args.dhcp: try: last = int(already_ip.split(".")[-1]) used_last_octets.add(last) except Exception: pass ok_devices = 0 skipped = 0 errors = 0 for idx, dev in enumerate(devices): info = prepass_info.get(dev, {}) already_ip = info.get("ip") no_cfg = info.get("no_cfg", False) usb_key = usb_serial_for_port(dev) if already_ip and not args.dhcp: print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True) skipped += 1 if args.persist_map: ip_map[usb_key] = already_ip continue if args.only_yellow and not no_cfg: print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True) skipped += 1 continue # pick target IP if args.dhcp: target_ip = None mode = "DHCP" else: target_last_octet = None if args.persist_map and usb_key in ip_map: try: prev_ip = ip_map[usb_key] target_last_octet = int(prev_ip.split(".")[-1]) if target_last_octet in used_last_octets: target_last_octet = None except Exception: target_last_octet = None if target_last_octet is None: target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254) target_ip = f"192.168.1.{target_last_octet}" mode = f"Static {target_ip}" print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True) if args.dry_run: print(" (dry-run) Would send CFG/END", flush=True) ok = True else: try: ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout) ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose) ser.close() except Exception as e: print(f" ✗ Error opening/configuring: {e}", flush=True) ok = False if ok: print(" ✓ OK", flush=True) ok_devices += 1 if not args.dhcp and args.persist_map and target_ip: ip_map[usb_key] = target_ip else: print(" ✗ Failed", flush=True) errors += 1 time.sleep(0.05) if args.persist_map: try: with open(MAP_FILE, "w") as f: json.dump(ip_map, f, indent=2, sort_keys=True) print(f"Persisted mapping to {MAP_FILE}", flush=True) except Exception as e: print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True) print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True) if __name__ == "__main__": main()