316 lines
11 KiB
Python
Executable File
316 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import argparse
|
||
import glob
|
||
import re
|
||
import serial
|
||
import time
|
||
import sys
|
||
from serial.tools import list_ports
|
||
import json
|
||
import os
|
||
|
||
DEFAULT_PATTERN = "/dev/ttyUSB*"
|
||
MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json")
|
||
|
||
YELLOW_TOKENS = [
|
||
"NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG",
|
||
"YELLOW", "LED_STATE_NO_CONFIG"
|
||
]
|
||
|
||
IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE)
|
||
|
||
def eprint(*a, **kw):
|
||
print(*a, file=sys.stderr, **kw)
|
||
|
||
def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None):
|
||
ser.timeout = timeout
|
||
ser.write_timeout = timeout
|
||
|
||
def now(): return time.time()
|
||
|
||
def read_and_collect(sleep_s=0.05):
|
||
buf = b""
|
||
# sleep but respect deadline
|
||
t_end = now() + sleep_s
|
||
while now() < t_end:
|
||
time.sleep(0.01)
|
||
try:
|
||
while True:
|
||
if deadline and now() >= deadline: break
|
||
chunk = ser.read(256)
|
||
if not chunk:
|
||
break
|
||
buf += chunk
|
||
except Exception:
|
||
pass
|
||
return buf.decode('utf-8', errors='ignore')
|
||
|
||
text = ""
|
||
# initial settle
|
||
t_end = now() + settle
|
||
while now() < t_end:
|
||
time.sleep(0.01)
|
||
text += read_and_collect(0.0)
|
||
|
||
# probes
|
||
for cmd in probes:
|
||
if deadline and now() >= deadline: break
|
||
try:
|
||
ser.write(cmd)
|
||
except Exception:
|
||
pass
|
||
text += read_and_collect(0.1)
|
||
|
||
if verbose and text.strip():
|
||
eprint("--- STATUS TEXT BEGIN ---")
|
||
eprint(text)
|
||
eprint("--- STATUS TEXT END ---")
|
||
|
||
utext = text.upper()
|
||
return any(tok in utext for tok in YELLOW_TOKENS), text
|
||
|
||
|
||
def parse_ip_from_text(text):
|
||
for m in IP_REGEX.finditer(text or ""):
|
||
ip = m.group(1)
|
||
try:
|
||
octs = [int(x) for x in ip.split(".")]
|
||
if all(0 <= x <= 255 for x in octs):
|
||
return ip
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def next_free_ip(used_last_octets, start_ip_octet, max_octet=254):
|
||
x = start_ip_octet
|
||
while x <= max_octet:
|
||
if x not in used_last_octets:
|
||
used_last_octets.add(x)
|
||
return x
|
||
x += 1
|
||
raise RuntimeError("No free IPs left in the range")
|
||
|
||
|
||
def load_map(path):
|
||
if os.path.exists(path):
|
||
try:
|
||
with open(path, "r") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
return {}
|
||
return {}
|
||
|
||
|
||
def usb_serial_for_port(dev):
|
||
for p in list_ports.comports():
|
||
if p.device == dev:
|
||
return p.serial_number or p.hwid or dev
|
||
return dev
|
||
|
||
|
||
def configure_device(ser, ssid, password, ip, dhcp, verbose=False):
|
||
def writeln(s):
|
||
if isinstance(s, str):
|
||
s = s.encode()
|
||
ser.write(s + b"\n")
|
||
time.sleep(0.05)
|
||
|
||
time.sleep(0.15)
|
||
|
||
writeln("CFG")
|
||
writeln(f"SSID:{ssid}")
|
||
writeln(f"PASS:{password}")
|
||
if dhcp:
|
||
writeln("DHCP:1")
|
||
else:
|
||
writeln(f"IP:{ip}")
|
||
writeln("MASK:255.255.255.0")
|
||
writeln("GW:192.168.1.1")
|
||
writeln("DHCP:0")
|
||
writeln("END")
|
||
|
||
time.sleep(0.2)
|
||
resp = b""
|
||
try:
|
||
while True:
|
||
chunk = ser.read(256)
|
||
if not chunk:
|
||
break
|
||
resp += chunk
|
||
except Exception:
|
||
pass
|
||
text = resp.decode('utf-8', errors='ignore')
|
||
if verbose and text.strip():
|
||
eprint("--- CONFIG RESPONSE BEGIN ---")
|
||
eprint(text)
|
||
eprint("--- CONFIG RESPONSE END ---")
|
||
|
||
ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper())
|
||
return ok, text
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex."
|
||
)
|
||
parser.add_argument("--ssid", default="ClubHouse2G", help="Wi‑Fi SSID")
|
||
parser.add_argument("--password", default="ez2remember", help="Wi‑Fi password")
|
||
parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})")
|
||
parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'")
|
||
parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate")
|
||
parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)")
|
||
parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)")
|
||
parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe")
|
||
parser.add_argument("--only-yellow", action="store_true",
|
||
help="Only program devices that appear to be in 'no Wi‑Fi config' (solid yellow) state")
|
||
parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP")
|
||
parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)")
|
||
parser.add_argument("--persist-map", action="store_true",
|
||
help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs")
|
||
parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)")
|
||
parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit")
|
||
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr")
|
||
parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen")
|
||
args = parser.parse_args()
|
||
|
||
if args.list:
|
||
print("Ports:")
|
||
for p in list_ports.comports():
|
||
print(f" {p.device:>12} sn={p.serial_number} desc={p.description}")
|
||
return
|
||
|
||
devices = sorted(glob.glob(args.pattern))
|
||
if args.exclude:
|
||
devices = [d for d in devices if not re.search(args.exclude, d)]
|
||
|
||
print(f"Found {len(devices)} devices matching {args.pattern}", flush=True)
|
||
if args.exclude:
|
||
print(f"Excluding devices matching /{args.exclude}/", flush=True)
|
||
|
||
ip_map = load_map(MAP_FILE) if args.persist_map else {}
|
||
used_last_octets = set()
|
||
|
||
prepass_info = {}
|
||
for i, dev in enumerate(devices):
|
||
print(f"[pre] {i+1}/{len(devices)} probing {dev} …", flush=True)
|
||
start_t = time.time()
|
||
already_ip = None
|
||
no_cfg = False
|
||
try:
|
||
ser = serial.Serial(
|
||
dev,
|
||
args.baud,
|
||
timeout=args.timeout,
|
||
write_timeout=args.timeout,
|
||
rtscts=False,
|
||
dsrdtr=False,
|
||
xonxoff=False,
|
||
)
|
||
# gentle DTR/RTS toggle
|
||
try:
|
||
ser.dtr = False; ser.rts = False; time.sleep(0.02)
|
||
ser.dtr = True; ser.rts = True; time.sleep(0.02)
|
||
except Exception:
|
||
pass
|
||
|
||
probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n")
|
||
deadline = start_t + max(0.4, args.per_device_cap)
|
||
no_cfg, text = detect_no_config(
|
||
ser, verbose=args.verbose, settle=args.settle,
|
||
timeout=args.timeout, probes=probes, deadline=deadline
|
||
)
|
||
already_ip = parse_ip_from_text(text)
|
||
ser.close()
|
||
except Exception as e:
|
||
eprint(f" [warn] {dev} probe error: {e}")
|
||
dur = time.time() - start_t
|
||
print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True)
|
||
|
||
prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip}
|
||
if already_ip and not args.dhcp:
|
||
try:
|
||
last = int(already_ip.split(".")[-1])
|
||
used_last_octets.add(last)
|
||
except Exception:
|
||
pass
|
||
|
||
ok_devices = 0
|
||
skipped = 0
|
||
errors = 0
|
||
|
||
for idx, dev in enumerate(devices):
|
||
info = prepass_info.get(dev, {})
|
||
already_ip = info.get("ip")
|
||
no_cfg = info.get("no_cfg", False)
|
||
usb_key = usb_serial_for_port(dev)
|
||
|
||
if already_ip and not args.dhcp:
|
||
print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True)
|
||
skipped += 1
|
||
if args.persist_map:
|
||
ip_map[usb_key] = already_ip
|
||
continue
|
||
|
||
if args.only_yellow and not no_cfg:
|
||
print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True)
|
||
skipped += 1
|
||
continue
|
||
|
||
# pick target IP
|
||
if args.dhcp:
|
||
target_ip = None
|
||
mode = "DHCP"
|
||
else:
|
||
target_last_octet = None
|
||
if args.persist_map and usb_key in ip_map:
|
||
try:
|
||
prev_ip = ip_map[usb_key]
|
||
target_last_octet = int(prev_ip.split(".")[-1])
|
||
if target_last_octet in used_last_octets:
|
||
target_last_octet = None
|
||
except Exception:
|
||
target_last_octet = None
|
||
|
||
if target_last_octet is None:
|
||
target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254)
|
||
target_ip = f"192.168.1.{target_last_octet}"
|
||
mode = f"Static {target_ip}"
|
||
|
||
print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True)
|
||
if args.dry_run:
|
||
print(" (dry-run) Would send CFG/END", flush=True)
|
||
ok = True
|
||
else:
|
||
try:
|
||
ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout)
|
||
ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose)
|
||
ser.close()
|
||
except Exception as e:
|
||
print(f" ✗ Error opening/configuring: {e}", flush=True)
|
||
ok = False
|
||
|
||
if ok:
|
||
print(" ✓ OK", flush=True)
|
||
ok_devices += 1
|
||
if not args.dhcp and args.persist_map and target_ip:
|
||
ip_map[usb_key] = target_ip
|
||
else:
|
||
print(" ✗ Failed", flush=True)
|
||
errors += 1
|
||
|
||
time.sleep(0.05)
|
||
|
||
if args.persist_map:
|
||
try:
|
||
with open(MAP_FILE, "w") as f:
|
||
json.dump(ip_map, f, indent=2, sort_keys=True)
|
||
print(f"Persisted mapping to {MAP_FILE}", flush=True)
|
||
except Exception as e:
|
||
print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True)
|
||
|
||
print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|