more on builds at scale

This commit is contained in:
Bob 2025-12-11 12:25:23 -08:00
parent 7aa45b9e25
commit d0597d048b
3 changed files with 328 additions and 1 deletions

235
esp32_reconfig.py Executable file
View File

@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""
ESP32 Reconfiguration Tool
Iterates through connected devices and updates their settings (SSID, IP, etc.)
without reflashing firmware. Runs sequentially for reliability.
"""
import sys
import os
import argparse
import ipaddress
import re
import time
import serial
from serial.tools import list_ports
# --- Configuration ---
BAUD_RATE = 115200
TIMEOUT = 0.5 # Serial read timeout
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
def detect_devices():
"""Returns a sorted list of ESP32 USB serial ports."""
candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60"))
ports = [p.device for p in candidates]
ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)])
return ports
def extract_device_number(device_path):
match = re.search(r'(\d+)$', device_path)
return int(match.group(1)) if match else 0
def configure_device(port, target_ip, args):
"""
Connects to a single device, resets it, and injects the config.
Returns True if verification succeeds.
"""
try:
ser = serial.Serial(port, BAUD_RATE, timeout=0.1)
except Exception as e:
print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}")
return False
try:
# 1. Reset Device
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
ser.dtr = True
# 2. Wait for App to Settle (Handle GPS delay)
print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True)
start_time = time.time()
# We wait until we see the "esp32>" prompt OR specific log lines
# The prompt is the safest indicator that the console is ready.
prompt_detected = False
buffer = ""
while time.time() - start_time < 10.0:
try:
# Read char by char to catch prompts that don't end in newline
chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore')
if chunk:
buffer += chunk
# Check for prompt or end of init
if "esp32>" in buffer or "Entering console loop" in buffer:
prompt_detected = True
break
# Keep buffer size manageable
if len(buffer) > 1000: buffer = buffer[-1000:]
except Exception:
pass
time.sleep(0.05)
if not prompt_detected:
print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}")
else:
print(" OK")
# 3. Clear Buffers & Wakeup
ser.reset_input_buffer()
ser.write(b'\n') # Send an Enter to clear any partial commands
time.sleep(0.2)
# 4. Construct Config String (Using CRLF \r\n for safety)
csi_val = '1' if args.csi_enable else '0'
role_str = "SERVER" if args.iperf_server else "CLIENT"
iperf_enable_val = '0' if args.no_iperf else '1'
period_us = int(args.iperf_period * 1000000)
# Note: We send \r\n explicitly
config_lines = [
"CFG",
f"SSID:{args.ssid}",
f"PASS:{args.password}",
f"IP:{target_ip}",
f"MASK:{args.netmask}",
f"GW:{args.gateway}",
f"DHCP:0",
f"BAND:{args.band}",
f"BW:{args.bandwidth}",
f"POWERSAVE:{args.powersave}",
f"MODE:{args.mode}",
f"MON_CH:{args.monitor_channel}",
f"CSI:{csi_val}",
f"IPERF_PERIOD_US:{period_us}",
f"IPERF_ROLE:{role_str}",
f"IPERF_PROTO:{args.iperf_proto}",
f"IPERF_DEST_IP:{args.iperf_dest_ip}",
f"IPERF_PORT:{args.iperf_port}",
f"IPERF_BURST:{args.iperf_burst}",
f"IPERF_LEN:{args.iperf_len}",
f"IPERF_ENABLED:{iperf_enable_val}",
"END"
]
config_payload = "\r\n".join(config_lines) + "\r\n"
# 5. Send Config
print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True)
ser.write(config_payload.encode('utf-8'))
ser.flush()
# 6. Verify
verify_start = time.time()
verified = False
ser.timeout = 0.5 # Increase timeout for line reading
while time.time() - verify_start < 8.0:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if not line: continue
# Check for success indicators
if "Config saved" in line or "CSI enable state saved" in line:
verified = True
break
# Check for IP confirmation
if f"got ip:{target_ip}" in line:
verified = True
break
if verified:
print(f" {Colors.GREEN}SUCCESS{Colors.RESET}")
# Final Reset to apply settings cleanly
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
return True
else:
print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}")
return False
except Exception as e:
print(f"[{port}] Error: {e}")
return False
finally:
if ser.is_open:
ser.close()
def main():
parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool')
parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)')
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
parser.add_argument('-P', '--password', default='ez2remember')
parser.add_argument('-g', '--gateway', default='192.168.1.1')
parser.add_argument('-m', '--netmask', default='255.255.255.0')
parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'])
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'])
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('--iperf-period', type=float, default=0.01)
parser.add_argument('--iperf-burst', type=int, default=1)
parser.add_argument('--iperf-len', type=int, default=1470)
parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'])
parser.add_argument('--iperf-dest-ip', default='192.168.1.50')
parser.add_argument('--iperf-port', type=int, default=5001)
parser.add_argument('--no-iperf', action='store_true')
parser.add_argument('--iperf-client', action='store_true')
parser.add_argument('--iperf-server', action='store_true')
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'])
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true')
parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device")
args = parser.parse_args()
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" ESP32 Sequential Reconfig Tool")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
devices = detect_devices()
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
sys.exit(1)
print(f"Found {len(devices)} devices. Starting reconfiguration...\n")
start_ip = ipaddress.IPv4Address(args.start_ip)
for i, port in enumerate(devices):
offset = extract_device_number(port)
target_ip = str(start_ip + offset)
print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}")
success = False
for attempt in range(1, args.retries + 1):
if attempt > 1:
print(f" Retry {attempt}/{args.retries}...")
if configure_device(port, target_ip, args):
success = True
break
time.sleep(1.0)
if not success:
print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n")
else:
print("")
print(f"{Colors.BLUE}Done.{Colors.RESET}")
if __name__ == '__main__':
main()

85
identiy_port.py Executable file
View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
identify_port.py
Tool to identify a specific physical device's port name.
1. Lists all connected USB Serial devices.
2. Asks you to power down or unplug ONE device.
3. Tells you exactly which /dev/ttyUSBx port disappeared.
"""
import time
import sys
from serial.tools import list_ports
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
def get_current_ports():
"""
Returns a set of all detected USB/ACM serial ports.
We look for standard USB-to-Serial drivers.
"""
# Grab all ports that look like USB serial adapters
ports = list(list_ports.grep("USB|ACM|CP210|FT232"))
# Return a set of device paths (e.g. {'/dev/ttyUSB0', '/dev/ttyUSB1'})
return set(p.device for p in ports)
def main():
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" USB Port Identifier")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
# 1. Initial Scan
print("Scanning for connected devices...", end='', flush=True)
initial_ports = get_current_ports()
print(f" Found {len(initial_ports)} devices.")
if not initial_ports:
print(f"{Colors.RED}No USB serial devices found!{Colors.RESET}")
sys.exit(1)
# Print current list nicely
sorted_ports = sorted(list(initial_ports))
print(f"\n{Colors.GREEN}Active Ports:{Colors.RESET}")
for p in sorted_ports:
print(f" - {p}")
# 2. Prompt User
print(f"\n{Colors.YELLOW}>>> ACTION REQUIRED: Power down (or unplug) the device you want to identify.{Colors.RESET}")
input(f"{Colors.YELLOW}>>> Press ENTER once the device is off...{Colors.RESET}")
# 3. Final Scan
print("Rescanning...", end='', flush=True)
# Give the OS a moment to deregister the device if it was just pulled
time.sleep(1.0)
final_ports = get_current_ports()
print(" Done.")
# 4. Calculate Difference
removed_ports = initial_ports - final_ports
added_ports = final_ports - initial_ports
print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" Result")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
if len(removed_ports) == 1:
target = list(removed_ports)[0]
print(f"The device you powered down was: {Colors.RED}{target}{Colors.RESET}")
elif len(removed_ports) > 1:
print(f"{Colors.RED}Multiple ports disappeared!{Colors.RESET}")
for p in removed_ports:
print(f" - {p}")
elif len(added_ports) > 0:
print(f"{Colors.YELLOW}No ports removed, but new ones appeared?{Colors.RESET}")
for p in added_ports:
print(f" + {p}")
else:
print(f"{Colors.YELLOW}No changes detected. Did you power it down?{Colors.RESET}")
if __name__ == '__main__':
main()

View File

@ -69,7 +69,14 @@ static void event_handler(void* arg, esp_event_base_t event_base, int32_t event_
// --- Main ---------------------------------------------------------- // --- Main ----------------------------------------------------------
void app_main(void) { void app_main(void) {
// 1. System Init // 1. System Init
ESP_ERROR_CHECK(nvs_flash_init()); esp_err_t ret = nvs_flash_init();
if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {
// NVS partition was truncated and needs to be erased
// Retry nvs_flash_init
ESP_ERROR_CHECK(nvs_flash_erase());
ret = nvs_flash_init();
}
ESP_ERROR_CHECK(ret);
ESP_ERROR_CHECK(esp_netif_init()); ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default()); ESP_ERROR_CHECK(esp_event_loop_create_default());