ESP32/reconfig_simple_nextip.py

316 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import glob
import re
import serial
import time
import sys
from serial.tools import list_ports
import json
import os
DEFAULT_PATTERN = "/dev/ttyUSB*"
MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json")
YELLOW_TOKENS = [
"NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG",
"YELLOW", "LED_STATE_NO_CONFIG"
]
IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE)
def eprint(*a, **kw):
print(*a, file=sys.stderr, **kw)
def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None):
ser.timeout = timeout
ser.write_timeout = timeout
def now(): return time.time()
def read_and_collect(sleep_s=0.05):
buf = b""
# sleep but respect deadline
t_end = now() + sleep_s
while now() < t_end:
time.sleep(0.01)
try:
while True:
if deadline and now() >= deadline: break
chunk = ser.read(256)
if not chunk:
break
buf += chunk
except Exception:
pass
return buf.decode('utf-8', errors='ignore')
text = ""
# initial settle
t_end = now() + settle
while now() < t_end:
time.sleep(0.01)
text += read_and_collect(0.0)
# probes
for cmd in probes:
if deadline and now() >= deadline: break
try:
ser.write(cmd)
except Exception:
pass
text += read_and_collect(0.1)
if verbose and text.strip():
eprint("--- STATUS TEXT BEGIN ---")
eprint(text)
eprint("--- STATUS TEXT END ---")
utext = text.upper()
return any(tok in utext for tok in YELLOW_TOKENS), text
def parse_ip_from_text(text):
for m in IP_REGEX.finditer(text or ""):
ip = m.group(1)
try:
octs = [int(x) for x in ip.split(".")]
if all(0 <= x <= 255 for x in octs):
return ip
except Exception:
pass
return None
def next_free_ip(used_last_octets, start_ip_octet, max_octet=254):
x = start_ip_octet
while x <= max_octet:
if x not in used_last_octets:
used_last_octets.add(x)
return x
x += 1
raise RuntimeError("No free IPs left in the range")
def load_map(path):
if os.path.exists(path):
try:
with open(path, "r") as f:
return json.load(f)
except Exception:
return {}
return {}
def usb_serial_for_port(dev):
for p in list_ports.comports():
if p.device == dev:
return p.serial_number or p.hwid or dev
return dev
def configure_device(ser, ssid, password, ip, dhcp, verbose=False):
def writeln(s):
if isinstance(s, str):
s = s.encode()
ser.write(s + b"\n")
time.sleep(0.05)
time.sleep(0.15)
writeln("CFG")
writeln(f"SSID:{ssid}")
writeln(f"PASS:{password}")
if dhcp:
writeln("DHCP:1")
else:
writeln(f"IP:{ip}")
writeln("MASK:255.255.255.0")
writeln("GW:192.168.1.1")
writeln("DHCP:0")
writeln("END")
time.sleep(0.2)
resp = b""
try:
while True:
chunk = ser.read(256)
if not chunk:
break
resp += chunk
except Exception:
pass
text = resp.decode('utf-8', errors='ignore')
if verbose and text.strip():
eprint("--- CONFIG RESPONSE BEGIN ---")
eprint(text)
eprint("--- CONFIG RESPONSE END ---")
ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper())
return ok, text
def main():
parser = argparse.ArgumentParser(
description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex."
)
parser.add_argument("--ssid", default="ClubHouse2G", help="WiFi SSID")
parser.add_argument("--password", default="ez2remember", help="WiFi password")
parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})")
parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'")
parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate")
parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)")
parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)")
parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe")
parser.add_argument("--only-yellow", action="store_true",
help="Only program devices that appear to be in 'no WiFi config' (solid yellow) state")
parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP")
parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)")
parser.add_argument("--persist-map", action="store_true",
help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs")
parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)")
parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr")
parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen")
args = parser.parse_args()
if args.list:
print("Ports:")
for p in list_ports.comports():
print(f" {p.device:>12} sn={p.serial_number} desc={p.description}")
return
devices = sorted(glob.glob(args.pattern))
if args.exclude:
devices = [d for d in devices if not re.search(args.exclude, d)]
print(f"Found {len(devices)} devices matching {args.pattern}", flush=True)
if args.exclude:
print(f"Excluding devices matching /{args.exclude}/", flush=True)
ip_map = load_map(MAP_FILE) if args.persist_map else {}
used_last_octets = set()
prepass_info = {}
for i, dev in enumerate(devices):
print(f"[pre] {i+1}/{len(devices)} probing {dev}", flush=True)
start_t = time.time()
already_ip = None
no_cfg = False
try:
ser = serial.Serial(
dev,
args.baud,
timeout=args.timeout,
write_timeout=args.timeout,
rtscts=False,
dsrdtr=False,
xonxoff=False,
)
# gentle DTR/RTS toggle
try:
ser.dtr = False; ser.rts = False; time.sleep(0.02)
ser.dtr = True; ser.rts = True; time.sleep(0.02)
except Exception:
pass
probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n")
deadline = start_t + max(0.4, args.per_device_cap)
no_cfg, text = detect_no_config(
ser, verbose=args.verbose, settle=args.settle,
timeout=args.timeout, probes=probes, deadline=deadline
)
already_ip = parse_ip_from_text(text)
ser.close()
except Exception as e:
eprint(f" [warn] {dev} probe error: {e}")
dur = time.time() - start_t
print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True)
prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip}
if already_ip and not args.dhcp:
try:
last = int(already_ip.split(".")[-1])
used_last_octets.add(last)
except Exception:
pass
ok_devices = 0
skipped = 0
errors = 0
for idx, dev in enumerate(devices):
info = prepass_info.get(dev, {})
already_ip = info.get("ip")
no_cfg = info.get("no_cfg", False)
usb_key = usb_serial_for_port(dev)
if already_ip and not args.dhcp:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True)
skipped += 1
if args.persist_map:
ip_map[usb_key] = already_ip
continue
if args.only_yellow and not no_cfg:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True)
skipped += 1
continue
# pick target IP
if args.dhcp:
target_ip = None
mode = "DHCP"
else:
target_last_octet = None
if args.persist_map and usb_key in ip_map:
try:
prev_ip = ip_map[usb_key]
target_last_octet = int(prev_ip.split(".")[-1])
if target_last_octet in used_last_octets:
target_last_octet = None
except Exception:
target_last_octet = None
if target_last_octet is None:
target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254)
target_ip = f"192.168.1.{target_last_octet}"
mode = f"Static {target_ip}"
print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True)
if args.dry_run:
print(" (dry-run) Would send CFG/END", flush=True)
ok = True
else:
try:
ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout)
ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose)
ser.close()
except Exception as e:
print(f" ✗ Error opening/configuring: {e}", flush=True)
ok = False
if ok:
print(" ✓ OK", flush=True)
ok_devices += 1
if not args.dhcp and args.persist_map and target_ip:
ip_map[usb_key] = target_ip
else:
print(" ✗ Failed", flush=True)
errors += 1
time.sleep(0.05)
if args.persist_map:
try:
with open(MAP_FILE, "w") as f:
json.dump(ip_map, f, indent=2, sort_keys=True)
print(f"Persisted mapping to {MAP_FILE}", flush=True)
except Exception as e:
print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True)
print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True)
if __name__ == "__main__":
main()