#!/usr/bin/env python3 import argparse, os, sys, time, fnmatch from pathlib import Path from typing import List, Dict import serial, serial.tools.list_ports as list_ports import subprocess KNOWN_VIDS = {0x0403, 0x10C4, 0x1A86, 0x067B, 0x303A} def run(cmd, cwd=None, check=True): print(">>", " ".join(cmd)) p = subprocess.run(cmd, cwd=cwd) if check and p.returncode != 0: raise RuntimeError(f"Command failed ({p.returncode}): {' '.join(cmd)}") return p.returncode def detect_chip_type(port: str) -> str: try: out = subprocess.check_output(["esptool.py", "--port", port, "chip_id"], stderr=subprocess.STDOUT, text=True, timeout=6) if "ESP32-S3" in out.upper(): return "ESP32-S3" if "ESP32-S2" in out.upper(): return "ESP32-S2" if "ESP32-C3" in out.upper(): return "ESP32-C3" if "ESP32-C6" in out.upper(): return "ESP32-C6" if "ESP32" in out.upper(): return "ESP32" except Exception: pass return "Unknown" def map_chip_to_idf_target(chip: str) -> str: s = chip.upper() if s.startswith("ESP32-S3"): return "esp32s3" if s.startswith("ESP32-S2"): return "esp32s2" if s.startswith("ESP32-C3"): return "esp32c3" if s.startswith("ESP32-C6"): return "esp32c6" if s.startswith("ESP32-H2"): return "esp32h2" if s.startswith("ESP32"): return "esp32" return "unknown" def list_ports_filtered(patterns: List[str] = None): ports = list_ports.comports() out = [] for p in ports: dev = p.device if patterns: if not any(fnmatch.fnmatch(dev, pat) for pat in patterns): continue else: if not (dev.startswith("/dev/ttyUSB") or dev.startswith("/dev/ttyACM")): continue vid = getattr(p, "vid", None) if vid is not None and vid not in KNOWN_VIDS: continue out.append(p) return out def ensure_target(project_dir: str, target: str): if not target or target == "unknown": raise ValueError("Unknown IDF target; cannot set-target") run(["idf.py", "set-target", target], cwd=project_dir, check=True) def flash_device(project_dir: str, port: str, idf_target: str, baud: int = 460800) -> bool: try: ensure_target(project_dir, idf_target) run(["idf.py", "-p", port, "-b", str(baud), "flash"], cwd=project_dir, check=True) return True except Exception as e: print(f" Flash failed on {port}: {e}") return False def toggle_reset(ser): try: ser.dtr = False ser.rts = True time.sleep(0.05) ser.dtr = True ser.rts = False time.sleep(0.05) except Exception: pass def send_wifi_config(port: str, ssid: str, password: str, ip: str, mask: str, gw: str, dhcp: bool, baud: int = 115200, retries: int = 3) -> bool: for attempt in range(1, retries+1): try: print(f" [{port}] opening serial @ {baud} (attempt {attempt}/{retries})") with serial.Serial(port, baudrate=baud, timeout=2) as ser: time.sleep(0.3) toggle_reset(ser) time.sleep(0.8) ser.reset_input_buffer() ser.reset_output_buffer() lines = [ "CFG\n", f"SSID:{ssid}\n" if ssid else "", f"PASS:{password}\n" if password else "", f"IP:{ip}\n" if ip else "", f"MASK:{mask}\n" if mask else "", f"GW:{gw}\n" if gw else "", f"DHCP:{1 if dhcp else 0}\n", "END\n", ] payload = "".join([l for l in lines if l]) ser.write(payload.encode("utf-8")) ser.flush() t0 = time.time() buf = b"" while time.time() - t0 < 3.0: chunk = ser.read(64) if chunk: buf += chunk if b"OK" in buf: print(f" [{port}] config applied: {buf.decode(errors='ignore').strip()}") return True print(f" [{port}] no OK from device, got: {buf.decode(errors='ignore').strip()}") except Exception as e: print(f" [{port}] serial error: {e}") time.sleep(0.6) return False def main(): ap = argparse.ArgumentParser(description="Mass flash ESP32 devices and push Wi‑Fi/IP config over serial.") ap.add_argument("--project", required=True, help="Path to ESP‑IDF project") ap.add_argument("--ssid", required=True, help="Wi‑Fi SSID") ap.add_argument("--password", required=True, help="Wi‑Fi password") ap.add_argument("--start-ip", default="192.168.1.50", help="Base IP (only used if --dhcp=0)") ap.add_argument("--mask", default="255.255.255.0", help="Netmask for static IP") ap.add_argument("--gw", default="192.168.1.1", help="Gateway for static IP") ap.add_argument("--dhcp", type=int, choices=[0,1], default=1, help="1=use DHCP, 0=set static IPs") ap.add_argument("--baud", type=int, default=460800, help="Flashing baud rate") ap.add_argument("--cfg-baud", type=int, default=115200, help="Serial baud for config exchange") ap.add_argument("--ports", help="Comma-separated globs to override port selection, e.g. '/dev/ttyUSB*,/dev/ttyACM*'") ap.add_argument("--dry-run", action="store_true", help="Plan only; do not flash or configure") args = ap.parse_args() project_dir = os.path.abspath(args.project) if not os.path.isdir(project_dir): print(f"Project directory not found: {project_dir}") sys.exit(1) patterns = [p.strip() for p in args.ports.split(",")] if args.ports else None devices = list_ports_filtered(patterns) if not devices: print("No candidate USB serial ports found.") sys.exit(2) print(f"Found {len(devices)} devices.") base = args.start_ip.split(".") try: base = [int(x) for x in base] except Exception: base = [192,168,1,50] plan = [] for idx, dev in enumerate(devices, 1): port = dev.device chip = detect_chip_type(port) target = map_chip_to_idf_target(chip) ip = f"{base[0]}.{base[1]}.{base[2]}.{base[3] + idx - 1}" if args.dhcp == 0 else None plan.append(dict(idx=idx, port=port, chip=chip, target=target, ip=ip)) print("\nPlan:") for d in plan: print(f" {d['idx']:2d} {d['port']} {d['chip']} -> target {d['target']} IP:{d['ip'] or 'DHCP'}") if args.dry_run: print("\nDry run only.") return failed = [] for d in plan: if d['target'] == 'unknown': print(f"\nERROR: Unknown IDF target for {d['port']} (chip '{d['chip']}'). Skipping.") failed.append(d['idx']) continue print(f"\nFlashing {d['port']} as {d['target']}...") if not flash_device(project_dir, d['port'], d['target'], baud=args.baud): failed.append(d['idx']) continue print(f"Configuring Wi‑Fi on {d['port']}...") ok = send_wifi_config( d['port'], args.ssid, args.password, d['ip'], args.mask if d['ip'] else None, args.gw if d['ip'] else None, dhcp=(args.dhcp == 1), baud=args.cfg_baud, ) if not ok: print(f" WARN: config not acknowledged on {d['port']}") if failed: print(f"\nCompleted with flashing failures on: {failed}") sys.exit(3) print("\nAll done.") if __name__ == "__main__": main()