Compare commits

..

6 Commits

Author SHA1 Message Date
Robert McMahon a303b7171a Enhance error handling and logging in WiFi connection process
- Added error checks in wifi_do_connect to handle potential failures during connection attempts.
- Improved logging to provide clearer insights into connection status and errors for better debugging.
2025-12-27 19:35:25 -08:00
Robert McMahon 1eddb8e84f Refactor file headers and clean up comments in multiple source files
- Updated file headers in iperf.c to include trip-time support in the brief description.
- Removed unnecessary comment blocks in cmd_ip.c, cmd_nvs.c, app_console.c, and wifi_cfg.c to streamline the codebase.
2025-12-27 19:34:21 -08:00
Robert McMahon 56ea987f75 Improve WiFi configuration handling by ensuring proper string termination
- Updated strncpy calls in wifi_cfg_apply_from_nvs and wifi_do_connect to prevent buffer overflows by reserving space for null termination.
- Added explicit null termination for SSID and password fields in the wifi_config_t structure.
2025-12-27 18:00:19 -08:00
Robert McMahon 128596bd67 Clean up repository and improve console initialization
- Remove Emacs backup files (cmd_ip.c~, cmd_wifi.c~)
- Add new_rules.part to .gitignore (temp file used by gen_udev_rules.py)
- Update version to 2.1.0-CONSOLE-DEBUG for debugging
- Add debug logging around console REPL initialization
- Improve error handling for console initialization failures
- Remove unreachable code after esp_console_start_repl()
2025-12-27 17:56:46 -08:00
Robert McMahon d4cd861b80 Add error handling for UART and GPIO configurations in gps_sync_init
- Implemented error checks for uart_driver_install, uart_set_pin, gpio_config, gpio_install_isr_service, and gpio_isr_handler_add to ensure robust initialization of GPS synchronization components.
- Enhanced logging to provide detailed error messages for easier debugging.
2025-12-27 17:07:45 -08:00
Robert McMahon feb0d4d142 Remove deprecated Python scripts and clean up repository
- Removed 13 deprecated deployment/configuration scripts superseded by esp32_deploy.py:
  * flash_all.py, flash_all_parallel.py, flash_all_serial_config.py
  * flash_and_config.py, mass_deploy.py, async_mass_deploy.py
  * async_batch_config.py, batch_config.py, config_device.py
  * esp32_reconfig.py, reconfig_simple.py, reconfig_simple_nextip.py
  * map_usb_to_ip.py

- Updated .gitignore to exclude:
  * Emacs backup files (#filename#)
  * firmware/ directory and flash_args_* build artifacts

- Repository now contains only active scripts:
  * esp32_deploy.py (main unified deployment tool)
  * detect_esp32.py, control_iperf.py, parse_csi.py
  * add_license.py, gen_udev_rules.py, identiy_port.py
  * async_find_failed.py (diagnostic utility)
2025-12-27 16:42:09 -08:00
28 changed files with 690 additions and 3397 deletions

7
.gitignore vendored
View File

@ -9,6 +9,11 @@ sdkconfig.old
*.bin *.bin
*.elf *.elf
*.map *.map
firmware/
flash_args_*
# Temporary files
new_rules.part
# IDE # IDE
.vscode/ .vscode/
@ -16,6 +21,8 @@ sdkconfig.old
*.swp *.swp
*.swo *.swo
*~ *~
# Emacs backup files
\#*\#
# Dependencies # Dependencies
dependencies/ dependencies/

View File

@ -1,267 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Async Batch Configuration Tool
The definitive parallel configuration tool.
Configures 30+ ESP32 devices concurrently using non-blocking I/O.
Features:
- Concurrent execution (configure 30 devices in <20 seconds)
- Robust Regex-based state detection
- Supports verifying both Station Mode (IP check) and Monitor Mode
- Context-aware logging
- CSI enable/disable control
"""
import asyncio
import serial_asyncio
import sys
import os
import argparse
import ipaddress
import re
import time
import logging
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Logging Setup ---
class DeviceLoggerAdapter(logging.LoggerAdapter):
"""Prefixes log messages with the device port name"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("BatchConfig")
class Esp32Configurator:
"""
Manages the lifecycle of configuring a single ESP32 device via Async Serial.
"""
def __init__(self, port, target_ip, args):
self.port = port
self.target_ip = target_ip
self.args = args
self.log = DeviceLoggerAdapter(logger, {'connid': port})
# --- Regex Patterns ---
# Success indicators
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_monitor_success = re.compile(r'Monitor mode active', re.IGNORECASE)
self.regex_csi_saved = re.compile(r'CSI enable state saved', re.IGNORECASE)
# Prompts indicating device is booting/ready
self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE)
# Error indicators
self.regex_error = re.compile(r'Error:|Failed|Disconnect', re.IGNORECASE)
async def run(self):
"""Main execution workflow for this device"""
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
self.log.error(f"Failed to open port: {e}")
return False
try:
# 1. Hardware Reset (DTR/RTS)
self.log.info("Resetting...")
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
await asyncio.sleep(0.1)
writer.transport.serial.dtr = True
# 2. Wait for Boot
# We assume the device is ready when we see logs or a prompt
if not await self._wait_for_boot(reader):
self.log.warning("Boot prompt missed, attempting config anyway...")
# 3. Send Configuration
await self._send_config(writer)
# 4. Verify Success
return await self._verify_configuration(reader)
except Exception as e:
self.log.error(f"Exception: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _wait_for_boot(self, reader):
"""Reads stream until a known 'ready' prompt appears"""
self.log.info("Waiting for boot...")
timeout = time.time() + 5 # 5 second boot timeout
while time.time() < timeout:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
if self.regex_ready.search(line):
return True
except asyncio.TimeoutError:
continue
return False
async def _send_config(self, writer):
"""Builds and transmits the configuration command"""
csi_val = '1' if self.args.csi_enable else '0'
self.log.info(f"Sending config for IP {self.target_ip} (CSI:{csi_val})...")
# Construct command block
config_str = (
f"CFG\n"
f"SSID:{self.args.ssid}\n"
f"PASS:{self.args.password}\n"
f"IP:{self.target_ip}\n"
f"MASK:{self.args.netmask}\n"
f"GW:{self.args.gateway}\n"
f"DHCP:0\n"
f"BAND:{self.args.band}\n"
f"BW:{self.args.bandwidth}\n"
f"POWERSAVE:{self.args.powersave}\n"
f"MODE:{self.args.mode}\n"
f"MON_CH:{self.args.monitor_channel}\n"
f"CSI:{csi_val}\n"
f"END\n"
)
writer.write(config_str.encode('utf-8'))
await writer.drain()
async def _verify_configuration(self, reader):
"""Monitors output for confirmation of Success"""
self.log.info("Verifying configuration...")
timeout = time.time() + 15 # 15s verification timeout
csi_saved = False
while time.time() < timeout:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
# Check for CSI save confirmation
if self.regex_csi_saved.search(line):
csi_saved = True
# Check for Station Mode Success (IP Address)
m_ip = self.regex_got_ip.search(line)
if m_ip:
got_ip = m_ip.group(1)
if got_ip == self.target_ip:
csi_status = "CSI saved" if csi_saved else ""
self.log.info(f"SUCCESS: Assigned {got_ip} {csi_status}")
return True
else:
self.log.warning(f"MISMATCH: Wanted {self.target_ip}, got {got_ip}")
# Check for Monitor Mode Success
if self.regex_monitor_success.search(line):
self.log.info("SUCCESS: Monitor Mode Active")
return True
# Check for errors
if self.regex_error.search(line):
self.log.warning(f"Device Reported Error: {line}")
except asyncio.TimeoutError:
continue
self.log.error("Timeout: Device did not confirm configuration.")
return False
async def main_async():
parser = argparse.ArgumentParser(
description='Async ESP32 Batch Config with CSI Control',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Configure 20 iperf baseline devices (NO CSI)
%(prog)s --start-ip 192.168.1.81
# Configure devices WITH CSI enabled
%(prog)s --start-ip 192.168.1.111 --csi
# Configure for monitor mode on channel 36
%(prog)s --start-ip 192.168.1.90 -M MONITOR -mc 36
# 5GHz with 40MHz bandwidth
%(prog)s --start-ip 192.168.1.81 -b 5G -B HT40
"""
)
# Arguments
parser.add_argument('--start-ip', required=True, help='Starting Static IP')
parser.add_argument('-s', '--ssid', default='ClubHouse2G', help='WiFi SSID')
parser.add_argument('-P', '--password', default='ez2remember', help='WiFi password')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'])
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'])
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('-M', '--mode', default='STA')
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true',
help='Enable CSI capture (default: disabled)')
args = parser.parse_args()
# 1. Detect
print("Step 1: Detecting Devices...")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print("No devices found.")
return
# Sort naturally (ttyUSB2 before ttyUSB10)
def natural_keys(d):
return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]
devices.sort(key=natural_keys)
try:
start_ip_obj = ipaddress.IPv4Address(args.start_ip)
except:
print(f"Invalid IP: {args.start_ip}")
return
# 2. Configure Concurrently
csi_status = "ENABLED" if args.csi_enable else "DISABLED"
print(f"Step 2: Configuring {len(devices)} devices concurrently (CSI: {csi_status})...")
tasks = []
for i, dev in enumerate(devices):
current_ip = str(start_ip_obj + i)
configurator = Esp32Configurator(dev.device, current_ip, args)
tasks.append(configurator.run())
# Run everything at once
results = await asyncio.gather(*tasks)
# 3. Report
success_count = results.count(True)
print("\n" + "="*40)
print(f"Total Devices: {len(devices)}")
print(f"Success: {success_count}")
print(f"Failed: {len(devices) - success_count}")
print(f"CSI Setting: {csi_status}")
print("="*40)
if __name__ == '__main__':
try:
if os.name == 'nt':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
asyncio.run(main_async())
except KeyboardInterrupt:
print("\nAborted by user.")

View File

@ -1,280 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Async Mass Deployment Tool
Combines parallel flashing (via esptool) with async configuration.
"""
import asyncio
import serial_asyncio
import sys
import os
import argparse
import ipaddress
import re
import time
import logging
from pathlib import Path
# Ensure detection script is available
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: 'detect_esp32.py' not found.")
sys.exit(1)
# --- Configuration ---
MAX_CONCURRENT_FLASH = 8
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
class DeviceLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger("Deploy")
class DeployWorker:
def __init__(self, port, target_ip, args, build_dir, flash_sem):
self.port = port
self.target_ip = target_ip
self.args = args
self.build_dir = build_dir
self.flash_sem = flash_sem
self.log = DeviceLoggerAdapter(logger, {'connid': port})
# Regex Patterns
self.regex_ready = re.compile(r'Initialization complete|GPS synced|No WiFi config found', re.IGNORECASE)
# Match output from 'mode_status' command
self.regex_status_connected = re.compile(r'WiFi connected: Yes', re.IGNORECASE)
self.regex_status_ip = re.compile(r'Got IP: (\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
async def run(self):
try:
# 1. Flash Phase
async with self.flash_sem:
if self.args.erase:
if not await self._erase_flash(): return False
if not await self._flash_firmware(): return False
# 2. Config Phase
await asyncio.sleep(1.0) # Wait for port to stabilize after flash reset
if self.args.ssid and self.args.password:
if not await self._configure_device(): return False
else:
self.log.info(f"{Colors.GREEN}Flash Complete (NVS Preserved){Colors.RESET}")
return True
except Exception as e:
self.log.error(f"Worker Exception: {e}")
return False
async def _erase_flash(self):
self.log.info("Erasing flash...")
cmd = ['esptool.py', '-p', self.port, '-b', '115200', 'erase_flash']
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
self.log.info("Erase successful.")
return True
self.log.error(f"Erase failed: {stderr.decode()}")
return False
async def _flash_firmware(self):
self.log.info("Flashing firmware...")
cmd = ['esptool.py', '-p', self.port, '-b', str(self.args.baud),
'--before', 'default_reset', '--after', 'hard_reset',
'write_flash', '@flash_args']
proc = await asyncio.create_subprocess_exec(
*cmd, cwd=self.build_dir,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
except asyncio.TimeoutError:
proc.kill()
self.log.error("Flash timed out.")
return False
if proc.returncode == 0:
self.log.info("Flash successful.")
return True
self.log.error(f"Flash failed: {stderr.decode()}")
return False
async def _configure_device(self):
self.log.info("Connecting to console...")
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
self.log.error(f"Serial open failed: {e}")
return False
try:
# A. Boot Wait
self.log.info("Waiting for boot...")
booted = False
end_time = time.time() + 10
while time.time() < end_time:
try:
line_b = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_b.decode('utf-8', errors='ignore').strip()
if self.regex_ready.search(line):
booted = True
break
except asyncio.TimeoutError:
continue
if not booted:
self.log.warning("Boot prompt missed, attempting config anyway...")
# B. Send Config
self.log.info(f"Sending config for {self.target_ip}...")
config_str = (f"CFG\nSSID:{self.args.ssid}\nPASS:{self.args.password}\n"
f"IP:{self.target_ip}\nMASK:{self.args.netmask}\nGW:{self.args.gateway}\n"
f"DHCP:0\nEND\n")
writer.write(config_str.encode('utf-8'))
await writer.drain()
# C. Active Polling Verification
self.log.info("Verifying configuration (Polling)...")
start_verify = time.time()
while time.time() < start_verify + 30:
# 1. Clear buffer
try:
while True:
await asyncio.wait_for(reader.read(1024), timeout=0.01)
except asyncio.TimeoutError:
pass
# 2. Send status request
writer.write(b"\nmode_status\n")
await writer.drain()
# 3. Read response for ~2 seconds
poll_end = time.time() + 2.0
while time.time() < poll_end:
try:
line_b = await asyncio.wait_for(reader.readline(), timeout=0.5)
line = line_b.decode('utf-8', errors='ignore').strip()
# Check for success indicators in status output
if self.regex_status_connected.search(line):
self.log.info(f"{Colors.GREEN}SUCCESS: Connected{Colors.RESET}")
return True
# Also catch passive "Got IP" logs if they appear
m = self.regex_status_ip.search(line)
if m:
if m.group(1) == self.target_ip:
self.log.info(f"{Colors.GREEN}SUCCESS: IP Confirmed ({m.group(1)}){Colors.RESET}")
return True
except asyncio.TimeoutError:
break
# Wait a bit before next poll
await asyncio.sleep(1.0)
self.log.error("Timeout: Device did not confirm connection.")
return False
except Exception as e:
self.log.error(f"Config error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
def parse_args():
parser = argparse.ArgumentParser(description='Async ESP32 Mass Deployment')
parser.add_argument('-d', '--dir', default=os.getcwd(), help='Project dir')
parser.add_argument('-s', '--ssid', help='WiFi SSID')
parser.add_argument('-P', '--password', help='WiFi Password')
parser.add_argument('--start-ip', default='192.168.1.51', help='Start IP')
parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud')
parser.add_argument('--erase', action='store_true', help='Full erase first')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
return parser.parse_args()
async def run_deployment(args):
project_dir = Path(args.dir).resolve()
build_dir = project_dir / 'build'
# 1. Build Firmware
print(f"{Colors.YELLOW}[1/3] Building Firmware...{Colors.RESET}")
proc = await asyncio.create_subprocess_exec(
'idf.py', 'build',
cwd=project_dir,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"{Colors.RED}Build Failed:\n{stderr.decode()}{Colors.RESET}")
return
if not (build_dir / 'flash_args').exists():
print(f"{Colors.RED}Error: build/flash_args missing.{Colors.RESET}")
return
print(f"{Colors.GREEN}Build Complete.{Colors.RESET}")
# 2. Detect Devices
print(f"{Colors.YELLOW}[2/3] Scanning Devices...{Colors.RESET}")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
return
def natural_keys(d):
return [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', d.device)]
devices.sort(key=natural_keys)
# 3. Deploy
print(f"{Colors.YELLOW}[3/3] Deploying to {len(devices)} devices...{Colors.RESET}")
try:
start_ip_obj = ipaddress.IPv4Address(args.start_ip)
except:
print("Invalid Start IP")
return
flash_sem = asyncio.Semaphore(MAX_CONCURRENT_FLASH)
tasks = []
for i, dev in enumerate(devices):
target_ip = str(start_ip_obj + i)
worker = DeployWorker(dev.device, target_ip, args, build_dir, flash_sem)
tasks.append(worker.run())
results = await asyncio.gather(*tasks)
# 4. Summary
success = results.count(True)
print(f"\n{Colors.BLUE}{'='*40}{Colors.RESET}")
print(f"Total: {len(devices)}")
print(f"Success: {Colors.GREEN}{success}{Colors.RESET}")
print(f"Failed: {Colors.RED}{len(devices) - success}{Colors.RESET}")
print(f"{Colors.BLUE}{'='*40}{Colors.RESET}")
def main():
args = parse_args()
if os.name == 'nt':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
try:
asyncio.run(run_deployment(args))
except KeyboardInterrupt:
print("\nCancelled.")
if __name__ == '__main__':
main()

View File

@ -1,158 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Batch Configuration Tool
Detects all connected ESP32s and configures them with sequential Static IPs.
Requires: detect_esp32.py and config_device.py in the same directory.
"""
import sys
import os
import argparse
import time
import ipaddress
import re
# Ensure we can import the other scripts
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
import config_device
except ImportError as e:
print(f"Error: Could not import required modules ({e}).")
print("Make sure 'detect_esp32.py' and 'config_device.py' are in the same folder.")
sys.exit(1)
def natural_sort_key(device_obj):
"""
Sorts ports naturally (ttyUSB2 comes before ttyUSB10)
"""
s = device_obj.device
# Split string into a list of integers and non-integers
return [int(text) if text.isdigit() else text.lower()
for text in re.split('([0-9]+)', s)]
def main():
parser = argparse.ArgumentParser(
description='Batch Config: Detects all ESP32s and configures sequential IPs',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Arguments matching config_device.py options
parser.add_argument('--start-ip', required=True,
help='Starting Static IP (e.g., 192.168.1.101). Will increment for each device.')
parser.add_argument('-s', '--ssid', default='ClubHouse2G',
help='WiFi SSID')
parser.add_argument('-P', '--password', default='ez2remember',
help='WiFi password')
parser.add_argument('-g', '--gateway', default='192.168.1.1',
help='Gateway IP')
parser.add_argument('-m', '--netmask', default='255.255.255.0',
help='Netmask')
parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'],
help='WiFi band')
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'],
help='Channel bandwidth')
parser.add_argument('-ps', '--powersave', default='NONE',
choices=['NONE', 'MIN', 'MIN_MODEM', 'MAX', 'MAX_MODEM'],
help='Power save mode')
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'],
help='Operating mode')
parser.add_argument('-mc', '--monitor-channel', type=int, default=36,
help='Monitor mode channel')
parser.add_argument('-r', '--no-reboot', action='store_true',
help='Do NOT reboot devices after configuration')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
args = parser.parse_args()
# 1. Detect Devices
print(f"{'='*60}")
print("Step 1: Detecting ESP32 Devices...")
print(f"{'='*60}")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print("No ESP32 devices found! Check USB connections.")
sys.exit(1)
# Sort devices naturally so IPs are assigned in order (USB0, USB1, USB2...)
devices.sort(key=natural_sort_key)
print(f"Found {len(devices)} devices:")
for d in devices:
print(f" - {d.device} ({d.description})")
print()
# 2. Parse Starting IP
try:
start_ip_obj = ipaddress.IPv4Address(args.start_ip)
except ipaddress.AddressValueError:
print(f"Error: Invalid IP address format: {args.start_ip}")
sys.exit(1)
# 3. Configure Each Device
print(f"{'='*60}")
print("Step 2: Configuring Devices Sequentially")
print(f"{'='*60}")
success_count = 0
fail_count = 0
failed_devices = []
for index, device in enumerate(devices):
# Calculate current IP
current_ip = str(start_ip_obj + index)
port = device.device
print(f"\n[{index+1}/{len(devices)}] Configuring {port} with IP {current_ip}...")
# Call the config function from your existing script
result = config_device.config_device(
port=port,
ip=current_ip,
ssid=args.ssid,
password=args.password,
gateway=args.gateway,
netmask=args.netmask,
band=args.band,
bandwidth=args.bandwidth,
powersave=args.powersave,
mode=args.mode,
monitor_channel=args.monitor_channel,
reboot=not args.no_reboot,
verbose=args.verbose
)
if result:
print(f"✓ Success: {port} -> {current_ip}")
success_count += 1
else:
print(f"✗ Failed: {port}")
fail_count += 1
failed_devices.append(port)
# Small delay to prevent USB power spikes if multiple devices reboot simultaneously
if not args.no_reboot and index < len(devices) - 1:
time.sleep(1.0)
# 4. Summary
print(f"\n{'='*60}")
print("Batch Configuration Complete")
print(f"{'='*60}")
print(f"Total Devices: {len(devices)}")
print(f"Successful: {success_count}")
print(f"Failed: {fail_count}")
if failed_devices:
print("\nFailed Ports:")
for p in failed_devices:
print(f" - {p}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nBatch process interrupted by user.")

View File

@ -30,12 +30,6 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
/*
* app_console.c
*
* Copyright (c) 2025 Umber Networks & Robert McMahon
* All rights reserved.
*/
#include "app_console.h" #include "app_console.h"
#include "esp_console.h" #include "esp_console.h"

View File

@ -30,12 +30,6 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
/*
* cmd_ip.c
*
* Copyright (c) 2025 Umber Networks & Robert McMahon
* All rights reserved.
*/
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>

View File

@ -30,12 +30,6 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
/*
* cmd_nvs.c
*
* Copyright (c) 2025 Umber Networks & Robert McMahon
* All rights reserved.
*/
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>

View File

@ -120,8 +120,10 @@ static int wifi_do_connect(int argc, char **argv) {
// Apply // Apply
wifi_config_t wifi_config = {0}; wifi_config_t wifi_config = {0};
strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid)); strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid) - 1);
strncpy((char *)wifi_config.sta.password, pass, sizeof(wifi_config.sta.password)); wifi_config.sta.ssid[sizeof(wifi_config.sta.ssid) - 1] = '\0';
strncpy((char *)wifi_config.sta.password, pass, sizeof(wifi_config.sta.password) - 1);
wifi_config.sta.password[sizeof(wifi_config.sta.password) - 1] = '\0';
ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config)); ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));
ESP_ERROR_CHECK(esp_wifi_connect()); ESP_ERROR_CHECK(esp_wifi_connect());

View File

@ -38,6 +38,7 @@
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "freertos/task.h" #include "freertos/task.h"
#include "esp_log.h" #include "esp_log.h"
#include "esp_err.h"
#include "esp_timer.h" #include "esp_timer.h"
#include "driver/uart.h" #include "driver/uart.h"
#include "driver/gpio.h" #include "driver/gpio.h"
@ -212,19 +213,41 @@ void gps_sync_init(const gps_sync_config_t *cfg, bool force_enable) {
.flow_ctrl = UART_HW_FLOWCTRL_DISABLE, .flow_ctrl = UART_HW_FLOWCTRL_DISABLE,
.source_clk = UART_SCLK_DEFAULT, .source_clk = UART_SCLK_DEFAULT,
}; };
uart_driver_install(s_cfg.uart_port, GPS_BUF_SIZE * 2, 0, 0, NULL, 0); esp_err_t err = uart_driver_install(s_cfg.uart_port, GPS_BUF_SIZE * 2, 0, 0, NULL, 0);
if (err != ESP_OK && err != ESP_ERR_INVALID_STATE) {
ESP_LOGE(TAG, "Failed to install UART driver: %s", esp_err_to_name(err));
return;
}
uart_param_config(s_cfg.uart_port, &uart_config); uart_param_config(s_cfg.uart_port, &uart_config);
uart_set_pin(s_cfg.uart_port, s_cfg.tx_pin, s_cfg.rx_pin, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE); err = uart_set_pin(s_cfg.uart_port, s_cfg.tx_pin, s_cfg.rx_pin, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE);
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to set UART pins: %s", esp_err_to_name(err));
return;
}
gpio_config_t io_conf = {}; gpio_config_t io_conf = {};
io_conf.intr_type = GPIO_INTR_POSEDGE; io_conf.intr_type = GPIO_INTR_POSEDGE;
io_conf.pin_bit_mask = (1ULL << s_cfg.pps_pin); io_conf.pin_bit_mask = (1ULL << s_cfg.pps_pin);
io_conf.mode = GPIO_MODE_INPUT; io_conf.mode = GPIO_MODE_INPUT;
io_conf.pull_up_en = 1; io_conf.pull_up_en = 1;
gpio_config(&io_conf); err = gpio_config(&io_conf);
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to configure PPS GPIO %d: %s", s_cfg.pps_pin, esp_err_to_name(err));
return;
}
gpio_install_isr_service(0); // Install ISR service (ignore error if already installed)
gpio_isr_handler_add(s_cfg.pps_pin, pps_gpio_isr_handler, NULL); err = gpio_install_isr_service(0);
if (err != ESP_OK && err != ESP_ERR_INVALID_STATE) {
ESP_LOGE(TAG, "Failed to install GPIO ISR service: %s", esp_err_to_name(err));
return;
}
err = gpio_isr_handler_add(s_cfg.pps_pin, pps_gpio_isr_handler, NULL);
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to add PPS GPIO ISR handler: %s", esp_err_to_name(err));
return;
}
xTaskCreate(gps_task, "gps_task", 4096, NULL, 5, NULL); xTaskCreate(gps_task, "gps_task", 4096, NULL, 5, NULL);

View File

@ -30,9 +30,10 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
/** /**
* @file iperf.c * @file iperf.c
* @brief ESP32 iPerf Traffic Generator (UDP Client Only) * @brief ESP32 iPerf Traffic Generator (UDP Client Only) with Trip-Time Support
* *
* This module implements a lightweight UDP traffic generator compatible with iPerf2. * This module implements a lightweight UDP traffic generator compatible with iPerf2.
* It features: * It features:
@ -41,13 +42,6 @@
* - Non-Volatile Storage (NVS) for persistent configuration. * - Non-Volatile Storage (NVS) for persistent configuration.
* - Detailed error tracking (ENOMEM vs Route errors). * - Detailed error tracking (ENOMEM vs Route errors).
* - GPS Timestamp integration for status reporting. * - GPS Timestamp integration for status reporting.
* @brief ESP32 iPerf Traffic Generator (UDP Client Only) with Trip-Time Support
*/
/*
* iperf.c
*
* Copyright (c) 2025 Umber Networks & Robert McMahon
* All rights reserved.
*/ */
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>

View File

@ -30,12 +30,6 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
/*
* wifi_cfg.c
*
* Copyright (c) 2025 Umber Networks & Robert McMahon
* All rights reserved.
*/
#include "wifi_cfg.h" #include "wifi_cfg.h"
#include <string.h> #include <string.h>
@ -78,9 +72,11 @@ bool wifi_cfg_apply_from_nvs(void) {
} }
wifi_config_t wifi_config = {0}; wifi_config_t wifi_config = {0};
strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid)); strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid) - 1);
wifi_config.sta.ssid[sizeof(wifi_config.sta.ssid) - 1] = '\0';
if (pass) { if (pass) {
strncpy((char *)wifi_config.sta.password, pass, sizeof(wifi_config.sta.password)); strncpy((char *)wifi_config.sta.password, pass, sizeof(wifi_config.sta.password) - 1);
wifi_config.sta.password[sizeof(wifi_config.sta.password) - 1] = '\0';
} }
ESP_LOGI(TAG, "Applying WiFi Config: SSID=%s", ssid); ESP_LOGI(TAG, "Applying WiFi Config: SSID=%s", ssid);

View File

@ -1,389 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 WiFi Configuration Tool - Static IP with auto-disable DHCP and CSI control
"""
import serial
import time
import sys
import argparse
def log_verbose(message, verbose=False):
"""Print message only if verbose is enabled"""
if verbose:
print(f"[VERBOSE] {message}")
def config_device(port, ip, ssid="ClubHouse2G", password="ez2remember",
gateway="192.168.1.1", netmask="255.255.255.0",
band="2.4G", bandwidth="HT20", powersave="NONE",
mode="STA", monitor_channel=36, csi_enable=False,
reboot=True, verbose=False):
"""Configure ESP32 device via serial with static IP and CSI control"""
print(f"\n{'='*70}")
print(f"ESP32 WiFi Configuration (Static IP + Mode + CSI)")
print(f"{'='*70}")
print(f"Port: {port}")
print(f"SSID: {ssid}")
print(f"Password: {'*' * len(password)}")
print(f"IP: {ip} (DHCP disabled)")
print(f"Gateway: {gateway}")
print(f"Netmask: {netmask}")
print(f"Mode: {mode}")
if mode == "MONITOR":
print(f"Mon Ch: {monitor_channel}")
print(f"Band: {band}")
print(f"Bandwidth: {bandwidth}")
print(f"PowerSave: {powersave}")
print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}")
print(f"Reboot: {'Yes' if reboot else 'No'}")
print(f"Verbose: {verbose}")
print(f"{'='*70}\n")
try:
# Open serial connection
log_verbose(f"Opening serial port {port} at 115200 baud...", verbose)
ser = serial.Serial(port, 115200, timeout=0.5, write_timeout=0.5)
log_verbose(f"Serial port opened successfully", verbose)
log_verbose(f"Port settings: {ser}", verbose)
time.sleep(0.2)
# Check if there's any data waiting
if ser.in_waiting:
log_verbose(f"{ser.in_waiting} bytes waiting in buffer", verbose)
existing = ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
log_verbose(f"Existing data: {existing[:100]}", verbose)
# Build config message
# DHCP is always disabled (0) when IP address is provided
config_lines = [
"CFG",
f"SSID:{ssid}",
f"PASS:{password}",
f"IP:{ip}",
f"MASK:{netmask}",
f"GW:{gateway}",
"DHCP:0", # Always disabled for static IP
f"BAND:{band}",
f"BW:{bandwidth}",
f"POWERSAVE:{powersave}",
f"MODE:{mode}",
f"MON_CH:{monitor_channel}",
f"CSI:{'1' if csi_enable else '0'}",
"END"
]
config = '\n'.join(config_lines) + '\n'
log_verbose(f"Config message size: {len(config)} bytes", verbose)
if verbose:
print("[VERBOSE] Config message:")
for line in config_lines:
display_line = line if not line.startswith("PASS:") else "PASS:********"
print(f"[VERBOSE] {display_line}")
# Send config
print("Sending configuration...")
print("\nConfiguration being sent:")
for line in config_lines:
display_line = line if not line.startswith("PASS:") else "PASS:********"
print(f" {display_line}")
print()
start_time = time.time()
bytes_written = ser.write(config.encode('utf-8'))
ser.flush()
send_time = time.time() - start_time
log_verbose(f"Wrote {bytes_written} bytes in {send_time:.3f}s", verbose)
print(f"Sent {bytes_written} bytes")
print("Waiting for response...")
time.sleep(3)
# Read response
if ser.in_waiting:
response_size = ser.in_waiting
print(f"\n✓ Response received: {response_size} bytes")
response = ser.read(response_size).decode('utf-8', errors='ignore')
print("\nDevice response:")
print("-" * 70)
for line in response.split('\n')[:30]:
if line.strip():
print(f" {line}")
print("-" * 70)
# Check for key indicators
success_indicators = []
warning_indicators = []
if "OK" in response:
success_indicators.append("✓ Configuration acknowledged (OK)")
if "Config saved" in response or "saved to NVS" in response:
success_indicators.append("✓ Config saved to NVS")
if "CSI enable state saved" in response:
csi_state = "ENABLED" if csi_enable else "DISABLED"
success_indicators.append(f"✓ CSI {csi_state} saved to NVS")
if "got ip:" in response.lower():
success_indicators.append("✓ Device connected to WiFi!")
import re
ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', response, re.IGNORECASE)
if ip_match:
received_ip = ip_match.group(1)
success_indicators.append(f" Assigned IP: {received_ip}")
if received_ip != ip:
warning_indicators.append(f"⚠ Warning: Device got {received_ip} instead of configured {ip}")
warning_indicators.append(" This might indicate DHCP is still enabled")
if "connected" in response.lower():
success_indicators.append("✓ WiFi connection established")
if "failed" in response.lower() or "disconnect" in response.lower():
warning_indicators.append("⚠ WiFi connection may have failed")
if "error" in response.lower():
warning_indicators.append("⚠ Error detected in response")
if success_indicators:
print("\nStatus indicators:")
for indicator in success_indicators:
print(f" {indicator}")
if warning_indicators:
print("\nWarnings:")
for warning in warning_indicators:
print(f" {warning}")
else:
print("\n⚠ No response from device")
print(" This could mean:")
print(" - Device is not running config handler")
print(" - Wrong serial port")
print(" - Baud rate mismatch")
# Reboot device if requested
if reboot:
print("\n" + "="*70)
print("Rebooting device...")
print("="*70)
log_verbose("Performing hardware reset via DTR/RTS", verbose)
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
time.sleep(0.1)
ser.dtr = True
print("✓ Reset signal sent - waiting for boot...")
time.sleep(3)
if ser.in_waiting:
boot_msg = ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
print("\nBoot messages:")
print("-" * 70)
for line in boot_msg.split('\n')[:40]:
if line.strip():
print(f" {line}")
print("-" * 70)
# Check boot status
boot_success = []
boot_warnings = []
if "WiFi config loaded from NVS" in boot_msg:
boot_success.append("✓ Config successfully loaded from NVS")
elif "No WiFi config" in boot_msg or "YELLOW LED" in boot_msg:
boot_warnings.append("✗ NO CONFIG found in NVS")
boot_warnings.append(" Device does not see saved config")
# Check CSI status
if "CSI Capture: ENABLED" in boot_msg:
boot_success.append("✓ CSI capture is ENABLED")
elif "CSI Capture: DISABLED" in boot_msg:
if csi_enable:
boot_warnings.append("⚠ CSI is DISABLED but was configured as ENABLED")
else:
boot_success.append("✓ CSI capture is DISABLED (as configured)")
# Check if device got the correct static IP
import re
ip_match = re.search(r'got ip:(\d+\.\d+\.\d+\.\d+)', boot_msg, re.IGNORECASE)
if ip_match:
received_ip = ip_match.group(1)
if received_ip == ip:
boot_success.append(f"✓ Device got correct static IP: {ip}")
else:
boot_warnings.append(f"⚠ Device got {received_ip} instead of {ip}")
boot_warnings.append(" DHCP may still be enabled or IP conflict exists")
if "WiFi CONNECTED" in boot_msg:
boot_success.append("✓ WiFi connection confirmed")
if boot_success:
print("\nBoot Status - SUCCESS:")
for msg in boot_success:
print(f" {msg}")
if boot_warnings:
print("\nBoot Status - ISSUES:")
for msg in boot_warnings:
print(f" {msg}")
else:
print("\n⚠ No boot messages received")
print(" Device may still be booting...")
if verbose:
log_verbose(f"Input buffer: {ser.in_waiting} bytes", verbose)
log_verbose(f"Output buffer empty: {ser.out_waiting == 0}", verbose)
ser.close()
log_verbose("Serial port closed", verbose)
print(f"\n{'='*70}")
print("Configuration Summary")
print(f"{'='*70}")
print(f"Port: {port}")
print(f"Static IP: {ip}")
print(f"SSID: {ssid}")
print(f"Mode: {mode}")
print(f"Band: {band}")
print(f"Bandwidth: {bandwidth}")
print(f"PowerSave: {powersave}")
print(f"CSI: {'ENABLED' if csi_enable else 'DISABLED'}")
print(f"DHCP: Disabled (static IP mode)")
print(f"{'='*70}")
print("\nNext steps:")
print(f" 1. Test connection:")
print(f" ping {ip}")
print(f" iperf -c {ip}")
print(f"\n 2. Verify device has correct IP:")
print(f" idf.py -p {port} monitor")
print(f" Look for: 'got ip:{ip}'")
if csi_enable:
print(f"\n 3. Verify CSI is capturing:")
print(f" Look for: 'CSI Capture: ENABLED'")
print(f" 'Captured X CSI packets'")
return True
except serial.SerialException as e:
print(f"\n✗ Serial error: {e}")
log_verbose(f"Serial exception details: {type(e).__name__}", verbose)
print(" Is another program using this port?")
return False
except KeyboardInterrupt:
print("\n\nConfiguration cancelled by user")
if 'ser' in locals() and ser.is_open:
ser.close()
log_verbose("Serial port closed after interrupt", verbose)
return False
except Exception as e:
print(f"\n✗ Error: {e}")
if verbose:
import traceback
print("\n[VERBOSE] Full traceback:")
traceback.print_exc()
return False
def main():
parser = argparse.ArgumentParser(
description='Configure ESP32 WiFi with static IP (DHCP automatically disabled) and CSI control',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Configure device #1 for STA mode with CSI DISABLED (baseline testing)
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA
# Configure device #1 for STA mode with CSI ENABLED
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi
# Configure device #25 for MONITOR mode (collapse detection, CSI not needed)
%(prog)s -p /dev/ttyUSB1 -i 192.168.1.90 -M MONITOR -mc 36
# STA mode with CSI for iperf + CSI correlation testing
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA --csi -ps NONE
# Monitor mode on 2.4GHz channel 6
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.91 -M MONITOR -mc 6 -b 2.4G
# STA mode on 5GHz with 40MHz bandwidth and CSI
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.81 -M STA -b 5G -B HT40 --csi
# With verbose output
%(prog)s -p /dev/ttyUSB0 -i 192.168.1.51 -v
Note:
- Mode and CSI enable state are saved to NVS
- Device will auto-start in configured mode on boot
- CSI defaults to DISABLED unless --csi flag is used
- DHCP is always disabled when using this script (static IP mode)
"""
)
parser.add_argument('-p', '--port', required=True,
help='Serial port (e.g., /dev/ttyUSB0)')
parser.add_argument('-i', '--ip', required=True,
help='Static IP address (DHCP will be disabled)')
parser.add_argument('-s', '--ssid', default='ClubHouse2G',
help='WiFi SSID (default: ClubHouse2G)')
parser.add_argument('-P', '--password', default='ez2remember',
help='WiFi password (default: ez2remember)')
parser.add_argument('-g', '--gateway', default='192.168.1.1',
help='Gateway IP (default: 192.168.1.1)')
parser.add_argument('-m', '--netmask', default='255.255.255.0',
help='Netmask (default: 255.255.255.0)')
parser.add_argument('-b', '--band', default='2.4G', choices=['2.4G', '5G'],
help='WiFi band: 2.4G or 5G (default: 2.4G)')
parser.add_argument('-B', '--bandwidth', default='HT20',
choices=['HT20', 'HT40', 'VHT80'],
help='Channel bandwidth: HT20 (20MHz), HT40 (40MHz), VHT80 (80MHz, 5GHz only) (default: HT20)')
parser.add_argument('-ps', '--powersave', default='NONE',
choices=['NONE', 'MIN', 'MIN_MODEM', 'MAX', 'MAX_MODEM'],
help='Power save mode: NONE (no PS, best for CSI), MIN/MIN_MODEM, MAX/MAX_MODEM (default: NONE)')
parser.add_argument('-M', '--mode', default='STA',
choices=['STA', 'MONITOR'],
help='Operating mode: STA (connect to AP, CSI+iperf) or MONITOR (promiscuous, collapse detection) (default: STA)')
parser.add_argument('-mc', '--monitor-channel', type=int, default=36,
help='Monitor mode channel (1-11 for 2.4GHz, 36-165 for 5GHz) (default: 36)')
parser.add_argument('--csi', action='store_true',
help='Enable CSI capture (default: disabled). Use for devices that need CSI data collection.')
parser.add_argument('-r', '--no-reboot', action='store_true',
help='Do NOT reboot device after configuration')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
args = parser.parse_args()
# Validate bandwidth selection
if args.bandwidth == 'VHT80' and args.band == '2.4G':
print("\n✗ Error: VHT80 (80MHz) is only supported on 5GHz band")
print(" Either use -b 5G or choose HT20/HT40 bandwidth")
sys.exit(1)
success = config_device(
port=args.port,
ip=args.ip,
ssid=args.ssid,
password=args.password,
gateway=args.gateway,
netmask=args.netmask,
band=args.band,
bandwidth=args.bandwidth,
powersave=args.powersave,
mode=args.mode,
monitor_channel=args.monitor_channel,
csi_enable=args.csi,
reboot=not args.no_reboot,
verbose=args.verbose
)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()

21
dependencies.lock Normal file
View File

@ -0,0 +1,21 @@
dependencies:
espressif/led_strip:
component_hash: 28c6509a727ef74925b372ed404772aeedf11cce10b78c3f69b3c66799095e2d
dependencies:
- name: idf
require: private
version: '>=4.4'
source:
registry_url: https://components.espressif.com/
type: service
version: 2.5.5
idf:
source:
type: idf
version: 6.0.0
direct_dependencies:
- espressif/led_strip
- idf
manifest_hash: cfead66889b7175cc6aa9a766fd00dc94649d6800986f3fcc4645dc58723ed39
target: esp32c5
version: 2.0.0

View File

@ -135,7 +135,8 @@ class UnifiedDeployWorker:
# --- Semaphore Released Here --- # --- Semaphore Released Here ---
await asyncio.sleep(2.0) # Brief pause after flash to allow device to start booting (hardware timing)
await asyncio.sleep(1.0)
if not self.args.flash_only: if not self.args.flash_only:
if self.args.ssid and self.args.password: if self.args.ssid and self.args.password:
@ -171,10 +172,10 @@ class UnifiedDeployWorker:
except Exception as e: except Exception as e:
return False return False
try: try:
# Reset DTR/RTS logic # Reset DTR/RTS logic (hardware reset timing - must use sleep)
writer.transport.serial.dtr = False writer.transport.serial.dtr = False
writer.transport.serial.rts = True writer.transport.serial.rts = True
await asyncio.sleep(0.1) await asyncio.sleep(0.1) # Hardware timing requirement, not event-based
writer.transport.serial.rts = False writer.transport.serial.rts = False
writer.transport.serial.dtr = False writer.transport.serial.dtr = False
@ -184,36 +185,100 @@ class UnifiedDeployWorker:
return False return False
# 2. Send Configuration via CLI # 2. Send Configuration via CLI
# Command: wifi_config -s "SSID" -p "PASS" -i "IP" # Use separate commands: wifi connect and ip set
# Note: The Shell will auto-reboot after this command. # First, connect to WiFi (saves credentials to NVS)
cmd = f'wifi_config -s "{self.args.ssid}" -p "{self.args.password}" -i "{self.target_ip}"' # Password is optional, only include if provided
if not self.args.iperf_client and not self.args.iperf_server: if self.args.password:
# If just connecting, maybe we want DHCP? wifi_cmd = f'wifi connect "{self.args.ssid}" "{self.args.password}"'
# But if target_ip is set, we force static. else:
pass wifi_cmd = f'wifi connect "{self.args.ssid}"'
self.log.info(f"Sending: {wifi_cmd}")
self.log.info(f"Sending: {cmd}") writer.write(f"{wifi_cmd}\n".encode())
writer.write(f"{cmd}\n".encode())
await writer.drain() await writer.drain()
# 3. Wait for the reboot and new prompt # Wait for "Connecting to..." message (command acknowledged)
# The device prints "Rebooting..." then restarts. match_idx, output = await self._wait_for_pattern(reader, ["Connecting to"], timeout=2.0)
self.log.info("Waiting for reboot...") if match_idx is None:
await asyncio.sleep(3.0) # Give it time to actually reset self.log.warning("WiFi connect command response not detected, continuing...")
if not await self._wait_for_prompt(reader, writer, timeout=20): # Second, set static IP address (if IP is provided)
self.log.error("Device did not return to prompt after reboot.") if self.target_ip:
ip_cmd = f'ip set {self.target_ip} {self.args.netmask} {self.args.gateway}'
self.log.info(f"Sending: {ip_cmd}")
writer.write(f"{ip_cmd}\n".encode())
await writer.drain()
# Wait for IP set confirmation
match_idx, output = await self._wait_for_pattern(
reader,
["Static IP set", "Saved. Will apply on next init", "Invalid IP format"],
timeout=3.0
)
if match_idx is None:
self.log.warning("IP set command response not detected, continuing...")
elif match_idx == 2: # "Invalid IP format"
self.log.error("IP configuration failed: Invalid IP format")
return False return False
self.log.info(f"{Colors.GREEN}Reboot complete. Shell Ready.{Colors.RESET}") # 3. Wait for prompt to ensure command completed (may take a moment for WiFi to start connecting)
# WiFi connection is async, so we just verify the prompt returns
if not await self._wait_for_prompt(reader, writer, timeout=5.0):
self.log.warning("Prompt check failed after configuration, but continuing...")
# 4. (Optional) Start iperf if requested self.log.info(f"{Colors.GREEN}Configuration complete.{Colors.RESET}")
# The new firmware does not auto-start iperf on boot unless commanded.
# 4. (Optional) Configure and start iperf if requested
if not self.args.no_iperf: if not self.args.no_iperf:
self.log.info("Starting iperf listener...") # Configure iperf parameters if specified
iperf_params = []
if self.args.iperf_dest_ip:
iperf_params.append(f'--client {self.args.iperf_dest_ip}')
if self.args.iperf_port:
iperf_params.append(f'--port {self.args.iperf_port}')
if self.args.iperf_len:
iperf_params.append(f'--len {self.args.iperf_len}')
if self.args.iperf_burst:
iperf_params.append(f'--burst {self.args.iperf_burst}')
# Note: iperf-period not directly supported, would need PPS calculation
if iperf_params:
iperf_set_cmd = f"iperf set {' '.join(iperf_params)}\n"
self.log.info(f"Configuring iperf: {iperf_set_cmd.strip()}")
writer.write(iperf_set_cmd.encode())
await writer.drain()
# Wait for iperf set confirmation
match_idx, output = await self._wait_for_pattern(
reader,
["Configuration updated", "No changes specified"],
timeout=2.0
)
if match_idx is None:
self.log.warning("iperf set response not detected, continuing...")
# Save configuration to NVS
self.log.info("Saving iperf config to NVS...")
writer.write(b"iperf save\n")
await writer.drain()
# Wait for iperf save confirmation
match_idx, output = await self._wait_for_pattern(
reader,
["Configuration saved to NVS", "No changes to save", "Error saving"],
timeout=2.0
)
if match_idx is None:
self.log.warning("iperf save response not detected, continuing...")
elif match_idx == 2: # "Error saving"
self.log.error("iperf save failed, but continuing...")
# Start iperf (no specific output expected, just wait for prompt)
self.log.info("Starting iperf...")
writer.write(b"iperf start\n") writer.write(b"iperf start\n")
await writer.drain() await writer.drain()
await asyncio.sleep(0.5)
# Wait a brief moment for command to be processed, then check for prompt
await self._wait_for_prompt(reader, writer, timeout=2.0)
return True return True
@ -246,6 +311,48 @@ class UnifiedDeployWorker:
break break
return False return False
async def _wait_for_pattern(self, reader, patterns, timeout=5.0, consume=True):
"""
Wait for one of the given patterns to appear in serial output.
patterns: list of regex patterns or simple strings to search for
timeout: maximum time to wait
consume: if True, read and discard data; if False, only peek
Returns: tuple (matched_pattern_index, buffer) or (None, buffer) if timeout
"""
end_time = time.time() + timeout
buffer = ""
# Compile patterns
compiled_patterns = []
for p in patterns:
if isinstance(p, str):
compiled_patterns.append(re.compile(re.escape(p), re.IGNORECASE))
else:
compiled_patterns.append(p)
while time.time() < end_time:
try:
chunk = await asyncio.wait_for(reader.read(1024), timeout=0.1)
if chunk:
buffer += chunk.decode('utf-8', errors='ignore')
# Keep buffer size manageable (last 4KB)
if len(buffer) > 4096:
buffer = buffer[-4096:]
# Check each pattern
for i, pattern in enumerate(compiled_patterns):
if pattern.search(buffer):
if not consume:
# Put data back (not easily possible with async, so we'll consume)
pass
return (i, buffer)
except asyncio.TimeoutError:
continue
except Exception:
break
return (None, buffer)
# [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware AS IS] # [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware AS IS]
async def _query_version(self): async def _query_version(self):
try: try:
@ -254,7 +361,8 @@ class UnifiedDeployWorker:
writer.transport.serial.rts = False writer.transport.serial.rts = False
writer.write(b'\n') writer.write(b'\n')
await writer.drain() await writer.drain()
await asyncio.sleep(0.1) # Brief pause for prompt, then send command
await asyncio.sleep(0.1) # Hardware timing for prompt to appear
writer.write(b'version\n') writer.write(b'version\n')
await writer.drain() await writer.drain()

View File

@ -1,235 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Reconfiguration Tool
Iterates through connected devices and updates their settings (SSID, IP, etc.)
without reflashing firmware. Runs sequentially for reliability.
"""
import sys
import os
import argparse
import ipaddress
import re
import time
import serial
from serial.tools import list_ports
# --- Configuration ---
BAUD_RATE = 115200
TIMEOUT = 0.5 # Serial read timeout
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
def detect_devices():
"""Returns a sorted list of ESP32 USB serial ports."""
candidates = list(list_ports.grep("CP210|FT232|USB Serial|10C4:EA60"))
ports = [p.device for p in candidates]
ports.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)])
return ports
def extract_device_number(device_path):
match = re.search(r'(\d+)$', device_path)
return int(match.group(1)) if match else 0
def configure_device(port, target_ip, args):
"""
Connects to a single device, resets it, and injects the config.
Returns True if verification succeeds.
"""
try:
ser = serial.Serial(port, BAUD_RATE, timeout=0.1)
except Exception as e:
print(f"[{port}] {Colors.RED}Connection Failed: {e}{Colors.RESET}")
return False
try:
# 1. Reset Device
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
ser.dtr = True
# 2. Wait for App to Settle (Handle GPS delay)
print(f"[{port}] Waiting for App (GPS timeout ~3s)...", end='', flush=True)
start_time = time.time()
# We wait until we see the "esp32>" prompt OR specific log lines
# The prompt is the safest indicator that the console is ready.
prompt_detected = False
buffer = ""
while time.time() - start_time < 10.0:
try:
# Read char by char to catch prompts that don't end in newline
chunk = ser.read(ser.in_waiting or 1).decode('utf-8', errors='ignore')
if chunk:
buffer += chunk
# Check for prompt or end of init
if "esp32>" in buffer or "Entering console loop" in buffer:
prompt_detected = True
break
# Keep buffer size manageable
if len(buffer) > 1000: buffer = buffer[-1000:]
except Exception:
pass
time.sleep(0.05)
if not prompt_detected:
print(f" {Colors.YELLOW}Timeout waiting for prompt (continuing anyway){Colors.RESET}")
else:
print(" OK")
# 3. Clear Buffers & Wakeup
ser.reset_input_buffer()
ser.write(b'\n') # Send an Enter to clear any partial commands
time.sleep(0.2)
# 4. Construct Config String (Using CRLF \r\n for safety)
csi_val = '1' if args.csi_enable else '0'
role_str = "SERVER" if args.iperf_server else "CLIENT"
iperf_enable_val = '0' if args.no_iperf else '1'
period_us = int(args.iperf_period * 1000000)
# Note: We send \r\n explicitly
config_lines = [
"CFG",
f"SSID:{args.ssid}",
f"PASS:{args.password}",
f"IP:{target_ip}",
f"MASK:{args.netmask}",
f"GW:{args.gateway}",
f"DHCP:0",
f"BAND:{args.band}",
f"BW:{args.bandwidth}",
f"POWERSAVE:{args.powersave}",
f"MODE:{args.mode}",
f"MON_CH:{args.monitor_channel}",
f"CSI:{csi_val}",
f"IPERF_PERIOD_US:{period_us}",
f"IPERF_ROLE:{role_str}",
f"IPERF_PROTO:{args.iperf_proto}",
f"IPERF_DEST_IP:{args.iperf_dest_ip}",
f"IPERF_PORT:{args.iperf_port}",
f"IPERF_BURST:{args.iperf_burst}",
f"IPERF_LEN:{args.iperf_len}",
f"IPERF_ENABLED:{iperf_enable_val}",
"END"
]
config_payload = "\r\n".join(config_lines) + "\r\n"
# 5. Send Config
print(f"[{port}] Sending Config ({target_ip})...", end='', flush=True)
ser.write(config_payload.encode('utf-8'))
ser.flush()
# 6. Verify
verify_start = time.time()
verified = False
ser.timeout = 0.5 # Increase timeout for line reading
while time.time() - verify_start < 8.0:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if not line: continue
# Check for success indicators
if "Config saved" in line or "CSI enable state saved" in line:
verified = True
break
# Check for IP confirmation
if f"got ip:{target_ip}" in line:
verified = True
break
if verified:
print(f" {Colors.GREEN}SUCCESS{Colors.RESET}")
# Final Reset to apply settings cleanly
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
return True
else:
print(f" {Colors.RED}FAILED (Verify Timeout){Colors.RESET}")
return False
except Exception as e:
print(f"[{port}] Error: {e}")
return False
finally:
if ser.is_open:
ser.close()
def main():
parser = argparse.ArgumentParser(description='ESP32 Sequential Reconfiguration Tool')
parser.add_argument('--start-ip', required=True, help='Start IP (e.g., 192.168.1.51)')
parser.add_argument('-s', '--ssid', default='ClubHouse2G')
parser.add_argument('-P', '--password', default='ez2remember')
parser.add_argument('-g', '--gateway', default='192.168.1.1')
parser.add_argument('-m', '--netmask', default='255.255.255.0')
parser.add_argument('--band', default='2.4G', choices=['2.4G', '5G'])
parser.add_argument('-B', '--bandwidth', default='HT20', choices=['HT20', 'HT40', 'VHT80'])
parser.add_argument('-ps', '--powersave', default='NONE')
parser.add_argument('--iperf-period', type=float, default=0.01)
parser.add_argument('--iperf-burst', type=int, default=1)
parser.add_argument('--iperf-len', type=int, default=1470)
parser.add_argument('--iperf-proto', default='UDP', choices=['UDP', 'TCP'])
parser.add_argument('--iperf-dest-ip', default='192.168.1.50')
parser.add_argument('--iperf-port', type=int, default=5001)
parser.add_argument('--no-iperf', action='store_true')
parser.add_argument('--iperf-client', action='store_true')
parser.add_argument('--iperf-server', action='store_true')
parser.add_argument('-M', '--mode', default='STA', choices=['STA', 'MONITOR'])
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
parser.add_argument('--csi', dest='csi_enable', action='store_true')
parser.add_argument('--retries', type=int, default=3, help="Retry attempts per device")
args = parser.parse_args()
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" ESP32 Sequential Reconfig Tool")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
devices = detect_devices()
if not devices:
print(f"{Colors.RED}No devices found.{Colors.RESET}")
sys.exit(1)
print(f"Found {len(devices)} devices. Starting reconfiguration...\n")
start_ip = ipaddress.IPv4Address(args.start_ip)
for i, port in enumerate(devices):
offset = extract_device_number(port)
target_ip = str(start_ip + offset)
print(f"Device {i+1}/{len(devices)}: {Colors.CYAN}{port}{Colors.RESET} -> {Colors.YELLOW}{target_ip}{Colors.RESET}")
success = False
for attempt in range(1, args.retries + 1):
if attempt > 1:
print(f" Retry {attempt}/{args.retries}...")
if configure_device(port, target_ip, args):
success = True
break
time.sleep(1.0)
if not success:
print(f"{Colors.RED} [ERROR] Failed to configure {port} after {args.retries} attempts.{Colors.RESET}\n")
else:
print("")
print(f"{Colors.BLUE}Done.{Colors.RESET}")
if __name__ == '__main__':
main()

View File

@ -1,187 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Mass Flash Script (filtered & robust)
Changes in this version:
- Filters out non-USB system serials (e.g., /dev/ttyS*)
- Only enumerates typical USB serial ports: /dev/ttyUSB* and /dev/ttyACM*
- Further filters to known USB-serial vendor IDs by default:
* FTDI 0x0403
* SiliconLabs/CP210x 0x10C4
* QinHeng/CH34x 0x1A86
* Prolific PL2303 0x067B
* Espressif native 0x303A
- Adds --ports to override selection (glob or comma-separated patterns)
- Uses detect_esp32.detect_chip_type(port) for exact chip string
- Maps to correct idf.py target before flashing
Example:
python3 flash_all.py --project /path/to/project --start-ip 192.168.1.50
python3 flash_all.py --project . --ports '/dev/ttyUSB*,/dev/ttyACM*'
"""
import argparse
import fnmatch
import glob
import os
import subprocess
import sys
from pathlib import Path
from typing import List, Dict
try:
import serial.tools.list_ports as list_ports
except Exception:
print("pyserial is required: pip install pyserial")
raise
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
try:
import detect_esp32
except Exception as e:
print("Error: detect_esp32.py must be in the same directory and importable.")
print(f"Import error: {e}")
sys.exit(1)
# Known USB VIDs commonly used for ESP32 dev boards/adapters
KNOWN_VIDS = {0x0403, 0x10C4, 0x1A86, 0x067B, 0x303A}
def map_chip_to_idf_target(chip_str: str) -> str:
if not chip_str or chip_str == 'Unknown':
return 'unknown'
s = chip_str.upper()
if s.startswith('ESP32-S3'):
return 'esp32s3'
if s.startswith('ESP32-S2'):
return 'esp32s2'
if s.startswith('ESP32-C3'):
return 'esp32c3'
if s.startswith('ESP32-C6'):
return 'esp32c6'
if s.startswith('ESP32-H2'):
return 'esp32h2'
if s.startswith('ESP32'):
return 'esp32'
return 'unknown'
def run(cmd: List[str], cwd: str = None, check: bool = True) -> int:
print(">>", " ".join(cmd))
proc = subprocess.run(cmd, cwd=cwd)
if check and proc.returncode != 0:
raise RuntimeError(f"Command failed with code {proc.returncode}: {' '.join(cmd)}")
return proc.returncode
def ensure_target(project_dir: str, target: str):
if not target or target == 'unknown':
raise ValueError("Unknown IDF target; cannot set-target.")
run(['idf.py', 'set-target', target], cwd=project_dir, check=True)
def flash_device(project_dir: str, port: str, idf_target: str, baud: int = 460800) -> bool:
try:
ensure_target(project_dir, idf_target)
run(['idf.py', '-p', port, '-b', str(baud), 'flash'], cwd=project_dir, check=True)
return True
except Exception as e:
print(f" Flash failed on {port}: {e}")
return False
def match_any(path: str, patterns: List[str]) -> bool:
return any(fnmatch.fnmatch(path, pat) for pat in patterns)
def list_ports_filtered(patterns: List[str] = None) -> List[object]:
"""Return a filtered list of pyserial list_ports items."""
ports = list(list_ports.comports())
filtered = []
for p in ports:
dev = p.device
# Default pattern filter: only /dev/ttyUSB* and /dev/ttyACM*
if patterns:
if not match_any(dev, patterns):
continue
else:
if not (dev.startswith('/dev/ttyUSB') or dev.startswith('/dev/ttyACM')):
continue
# VID filter (allow if vid is known or missing (some systems omit it), but exclude obvious non-USB)
vid = getattr(p, 'vid', None)
if vid is not None and vid not in KNOWN_VIDS:
# Skip unknown vendor to reduce noise; user can override with --ports
continue
filtered.append(p)
return filtered
def main():
ap = argparse.ArgumentParser(description="Mass flash multiple ESP32 devices with proper chip detection.")
ap.add_argument('--project', required=True, help='Path to the ESP-IDF project to flash')
ap.add_argument('--ssid', help='WiFi SSID (optional)')
ap.add_argument('--password', help='WiFi password (optional)')
ap.add_argument('--start-ip', default='192.168.1.50', help='Base IP address for plan display')
ap.add_argument('--baud', type=int, default=460800, help='Flashing baud rate')
ap.add_argument('--dry-run', action='store_true', help='Plan only; do not flash')
ap.add_argument('--ports', help='Comma-separated glob(s), e.g. "/dev/ttyUSB*,/dev/ttyACM*" to override selection')
args = ap.parse_args()
project_dir = os.path.abspath(args.project)
if not os.path.isdir(project_dir):
print(f"Project directory not found: {project_dir}")
sys.exit(1)
patterns = None
if args.ports:
patterns = [pat.strip() for pat in args.ports.split(',') if pat.strip()]
devices = list_ports_filtered(patterns)
print(f"Found {len(devices)} USB serial device(s) after filtering")
if not devices:
print("No candidate USB serial ports found. Try --ports '/dev/ttyUSB*,/dev/ttyACM*' or check permissions.")
device_list: List[Dict] = []
for idx, dev in enumerate(devices, 1):
port = dev.device
print(f"Probing {port} for exact chip...")
raw_chip = detect_esp32.detect_chip_type(port)
idf_target = map_chip_to_idf_target(raw_chip)
if idf_target == 'unknown':
print(f" WARNING: Could not determine idf.py target for {port} (got '{raw_chip}')")
device_list.append({
'number': idx,
'port': port,
'raw_chip': raw_chip,
'idf_target': idf_target,
'info': dev,
})
# Plan output
base = args.start_ip.split('.')
try:
base0, base1, base2, base3 = int(base[0]), int(base[1]), int(base[2]), int(base[3])
except Exception:
base0, base1, base2, base3 = 192, 168, 1, 50
print("\nFlash plan:")
for d in device_list:
ip_last = base3 + d['number'] - 1
ip = f"{base0}.{base1}.{base2}.{ip_last}"
print(f" Device {d['number']:2d}: {d['port']} -> {d['raw_chip']} [{d['idf_target']}] -> {ip}")
if args.dry_run:
print("\nDry run: not flashing any devices.")
return
failed = []
for d in device_list:
if d['idf_target'] == 'unknown':
print(f"\n ERROR: Unknown IDF target for {d['port']} (raw chip '{d['raw_chip']}'). Skipping.")
failed.append(d['number'])
continue
print(f"\nFlashing {d['port']} as target {d['idf_target']}...")
ok = flash_device(project_dir, d['port'], d['idf_target'], baud=args.baud)
if not ok:
failed.append(d['number'])
if failed:
print(f"\nCompleted with failures on devices: {failed}")
sys.exit(2)
print("\nAll devices flashed successfully.")
if __name__ == '__main__':
main()

View File

@ -1,432 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Parallel Mass Flash Script
Build and flash multiple ESP32 devices concurrently for much faster deployment
"""
import subprocess
import sys
import os
import time
import argparse
import shutil
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
# Import the detection script
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
import detect_esp32
except ImportError:
print("Error: detect_esp32.py must be in the same directory")
sys.exit(1)
def detect_device_type(port_info):
"""Detect ESP32 variant based on USB chip"""
if port_info.vid == 0x303A:
return 'esp32s3'
return 'esp32'
def probe_chip_type(port):
"""Probe the actual chip type using esptool.py"""
try:
result = subprocess.run(
['esptool.py', '--port', port, 'chip_id'],
capture_output=True,
text=True,
timeout=10
)
output = result.stdout + result.stderr
if 'ESP32-S3' in output:
return 'esp32s3'
elif 'ESP32-S2' in output:
return 'esp32s2'
elif 'ESP32-C3' in output:
return 'esp32c3'
elif 'ESP32' in output:
return 'esp32'
except Exception as e:
print(f" Warning: Could not probe {port}: {e}")
return 'esp32'
def create_sdkconfig(build_dir, ssid, password, ip_addr, gateway='192.168.1.1', netmask='255.255.255.0'):
"""Create sdkconfig.defaults file with WiFi and IP configuration"""
sdkconfig_path = os.path.join(build_dir, 'sdkconfig.defaults')
config_content = f"""# WiFi Configuration
CONFIG_WIFI_SSID="{ssid}"
CONFIG_WIFI_PASSWORD="{password}"
CONFIG_WIFI_MAXIMUM_RETRY=5
# Static IP Configuration
CONFIG_USE_STATIC_IP=y
CONFIG_STATIC_IP_ADDR="{ip_addr}"
CONFIG_STATIC_GATEWAY_ADDR="{gateway}"
CONFIG_STATIC_NETMASK_ADDR="{netmask}"
"""
with open(sdkconfig_path, 'w') as f:
f.write(config_content)
def build_firmware(device_info, project_dir, build_dir, ssid, password):
"""Build firmware for a single device with unique configuration"""
dev_num = device_info['number']
chip_type = device_info['chip']
ip_addr = device_info['ip']
port = device_info['port']
print(f"[Device {dev_num}] [{port}] Chip: {chip_type.upper()} | Building with IP {ip_addr}")
try:
# Create build directory
os.makedirs(build_dir, exist_ok=True)
# Copy project files to build directory
for item in ['main', 'CMakeLists.txt']:
src = os.path.join(project_dir, item)
dst = os.path.join(build_dir, item)
if os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
# Create sdkconfig.defaults
create_sdkconfig(build_dir, ssid, password, ip_addr)
# Set target
result = subprocess.run(
['idf.py', 'set-target', chip_type],
cwd=build_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
return {
'success': False,
'device': dev_num,
'error': f"Set target failed: {result.stderr[:200]}"
}
# Build
result = subprocess.run(
['idf.py', 'build'],
cwd=build_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
return {
'success': False,
'device': dev_num,
'error': f"Build failed: {result.stderr[-500:]}"
}
print(f"[Device {dev_num}] ✓ Build complete ({chip_type.upper()})")
return {
'success': True,
'device': dev_num,
'build_dir': build_dir
}
except Exception as e:
return {
'success': False,
'device': dev_num,
'error': str(e)
}
def flash_device(device_info, build_dir):
"""Flash a single device"""
dev_num = device_info['number']
port = device_info['port']
ip_addr = device_info['ip']
chip_type = device_info['chip']
print(f"[Device {dev_num}] [{port}] {chip_type.upper()} | Flashing -> {ip_addr}")
try:
result = subprocess.run(
['idf.py', '-p', port, 'flash'],
cwd=build_dir,
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
return {
'success': False,
'device': dev_num,
'port': port,
'error': f"Flash failed: {result.stderr[-500:]}"
}
print(f"[Device {dev_num}] ✓ Flash complete ({chip_type.upper()}) at {ip_addr}")
return {
'success': True,
'device': dev_num,
'port': port,
'ip': ip_addr
}
except subprocess.TimeoutExpired:
return {
'success': False,
'device': dev_num,
'port': port,
'error': "Flash timeout"
}
except Exception as e:
return {
'success': False,
'device': dev_num,
'port': port,
'error': str(e)
}
def build_and_flash(device_info, project_dir, work_dir, ssid, password):
"""Combined build and flash for a single device"""
dev_num = device_info['number']
build_dir = os.path.join(work_dir, f'build_device_{dev_num}')
# Build
build_result = build_firmware(device_info, project_dir, build_dir, ssid, password)
if not build_result['success']:
return build_result
# Flash
flash_result = flash_device(device_info, build_dir)
# Clean up build directory to save space
try:
shutil.rmtree(build_dir)
except:
pass
return flash_result
def main():
parser = argparse.ArgumentParser(description='Parallel mass flash ESP32 devices')
parser.add_argument('--ssid', required=True, help='WiFi SSID')
parser.add_argument('--password', required=True, help='WiFi password')
parser.add_argument('--start-ip', default='192.168.1.50',
help='Starting IP address (default: 192.168.1.50)')
parser.add_argument('--gateway', default='192.168.1.1',
help='Gateway IP (default: 192.168.1.1)')
parser.add_argument('--project-dir', default=None,
help='ESP32 iperf project directory')
parser.add_argument('--probe', action='store_true',
help='Probe each device to detect exact chip type (slower)')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without building/flashing')
parser.add_argument('--build-parallel', type=int, default=None,
help='Number of parallel builds (default: CPU cores)')
parser.add_argument('--flash-parallel', type=int, default=8,
help='Number of parallel flash operations (default: 8)')
parser.add_argument('--strategy', choices=['build-then-flash', 'build-and-flash'],
default='build-and-flash',
help='Deployment strategy (default: build-and-flash)')
args = parser.parse_args()
# Determine parallelism
if args.build_parallel is None:
args.build_parallel = max(1, cpu_count() - 1)
# Find project directory
if args.project_dir:
project_dir = args.project_dir
else:
script_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = script_dir
if not os.path.exists(os.path.join(project_dir, 'main')):
project_dir = os.path.join(os.path.expanduser('~/Code/esp32'), 'esp32-iperf')
if not os.path.exists(project_dir):
print(f"ERROR: Project directory not found: {project_dir}")
sys.exit(1)
# Create work directory for builds
work_dir = os.path.join(project_dir, '.builds')
os.makedirs(work_dir, exist_ok=True)
print(f"Using project directory: {project_dir}")
print(f"Work directory: {work_dir}")
# Detect devices
print("\nDetecting ESP32 devices...")
devices = detect_esp32.detect_esp32_devices()
if not devices:
print("No ESP32 devices detected!")
sys.exit(1)
print(f"Found {len(devices)} device(s)")
# Prepare device list with IPs
base_parts = args.start_ip.split('.')
device_list = []
for idx, device in enumerate(devices, 1):
if args.probe:
print(f"Probing {device.device}...")
chip_type = probe_chip_type(device.device)
else:
chip_type = detect_device_type(device)
ip_last = int(base_parts[3]) + idx - 1
ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{ip_last}"
device_list.append({
'number': idx,
'port': device.device,
'chip': chip_type,
'ip': ip,
'info': device
})
# Display plan
print(f"\n{'='*70}")
print("PARALLEL FLASH PLAN")
print(f"{'='*70}")
print(f"SSID: {args.ssid}")
print(f"Strategy: {args.strategy}")
print(f"Build parallelism: {args.build_parallel}")
print(f"Flash parallelism: {args.flash_parallel}")
print()
for dev in device_list:
print(f"Device {dev['number']:2d}: {dev['port']} -> {dev['chip']:8s} -> {dev['ip']}")
if args.dry_run:
print("\nDry run - no devices will be built or flashed")
return
# Confirm
print(f"\n{'='*70}")
response = input("Proceed with parallel flashing? (yes/no): ").strip().lower()
if response != 'yes':
print("Aborted.")
return
print(f"\n{'='*70}")
print("STARTING PARALLEL DEPLOYMENT")
print(f"{'='*70}\n")
start_time = time.time()
if args.strategy == 'build-then-flash':
# Strategy 1: Build all, then flash all
print(f"Phase 1: Building {len(device_list)} configurations with {args.build_parallel} parallel builds...")
build_results = []
with ProcessPoolExecutor(max_workers=args.build_parallel) as executor:
futures = {}
for dev in device_list:
build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}')
future = executor.submit(
build_firmware, dev, project_dir, build_dir, args.ssid, args.password
)
futures[future] = dev
for future in as_completed(futures):
result = future.result()
build_results.append(result)
if not result['success']:
print(f"[Device {result['device']}] ✗ Build failed: {result['error']}")
# Flash phase
successful_builds = [r for r in build_results if r['success']]
print(f"\nPhase 2: Flashing {len(successful_builds)} devices with {args.flash_parallel} parallel operations...")
flash_results = []
with ThreadPoolExecutor(max_workers=args.flash_parallel) as executor:
futures = {}
for result in successful_builds:
dev = device_list[result['device'] - 1]
build_dir = os.path.join(work_dir, f'build_device_{dev["number"]}')
future = executor.submit(flash_device, dev, build_dir)
futures[future] = dev
for future in as_completed(futures):
result = future.result()
flash_results.append(result)
if not result['success']:
print(f"[Device {result['device']}] ✗ Flash failed: {result['error']}")
# Cleanup
print("\nCleaning up build directories...")
try:
shutil.rmtree(work_dir)
except:
pass
final_results = flash_results
else:
# Strategy 2: Build and flash together (limited parallelism)
print(f"Building and flashing with {args.build_parallel} parallel operations...")
final_results = []
with ProcessPoolExecutor(max_workers=args.build_parallel) as executor:
futures = {}
for dev in device_list:
future = executor.submit(
build_and_flash, dev, project_dir, work_dir, args.ssid, args.password
)
futures[future] = dev
for future in as_completed(futures):
result = future.result()
final_results.append(result)
if not result['success']:
print(f"[Device {result['device']}] ✗ Failed: {result['error']}")
# Summary
elapsed_time = time.time() - start_time
success_count = sum(1 for r in final_results if r['success'])
failed_devices = [r['device'] for r in final_results if not r['success']]
print(f"\n{'='*70}")
print("DEPLOYMENT SUMMARY")
print(f"{'='*70}")
print(f"Successfully deployed: {success_count}/{len(device_list)} devices")
print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)")
print(f"Average time per device: {elapsed_time/len(device_list):.1f} seconds")
if failed_devices:
print(f"\nFailed devices: {', '.join(map(str, failed_devices))}")
print(f"{'='*70}")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(1)
except Exception as e:
print(f"\nFATAL ERROR: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,72 +0,0 @@
#!/usr/bin/env python3
import os
import sys
import argparse
import serial
import time
from pathlib import Path
def send_cfg(port, ssid, password, dhcp, start_ip, mask, gw):
print(f"Connecting to {port}...")
with serial.Serial(port, baudrate=115200, timeout=2) as ser:
time.sleep(0.5)
ser.reset_input_buffer()
ser.write(b"CFG\n")
ser.write(f"SSID:{ssid}\n".encode())
ser.write(f"PASS:{password}\n".encode())
if dhcp == 0:
ser.write(f"IP:{start_ip}\n".encode())
ser.write(f"MASK:{mask}\n".encode())
ser.write(f"GW:{gw}\n".encode())
ser.write(f"DHCP:{dhcp}\n".encode())
ser.write(b"END\n")
time.sleep(0.3)
print("\nDevice response:")
while ser.in_waiting:
sys.stdout.write(ser.read(ser.in_waiting).decode(errors='ignore'))
sys.stdout.flush()
time.sleep(0.1)
print("\n✅ Configuration sent successfully.")
def main():
parser = argparse.ArgumentParser(description="Configure ESP32 Wi-Fi settings over serial")
parser.add_argument("--project", help="ESP-IDF project path (defaults to current working directory)")
parser.add_argument("--ssid", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--start-ip", help="Static IP address")
parser.add_argument("--mask", default="255.255.255.0")
parser.add_argument("--gw", help="Gateway address")
parser.add_argument("--dhcp", type=int, choices=[0,1], default=1)
parser.add_argument("--baud", type=int, default=460800)
parser.add_argument("--cfg-baud", type=int, default=115200)
parser.add_argument("--ports", nargs='+', help="Serial port(s), e.g., /dev/ttyUSB0 /dev/ttyUSB1")
parser.add_argument("--port", help="Single serial port (shorthand for --ports PORT)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
# Default to current working directory
project_path = args.project or os.getcwd()
print(f"Using project directory: {project_path}")
# Resolve ports
ports = []
if args.port:
ports.append(args.port)
elif args.ports:
ports = args.ports
else:
print("❌ No serial port specified. Use --port or --ports.")
sys.exit(1)
# Apply configuration
for port in ports:
if args.dry_run:
print(f"[DRY RUN] Would send Wi-Fi config to {port}")
else:
send_cfg(port, args.ssid, args.password, args.dhcp, args.start_ip, args.mask, args.gw)
if __name__ == "__main__":
main()

View File

@ -1,198 +0,0 @@
#!/usr/bin/env python3
"""
Flash uniform firmware to all ESP32 devices, then reconfigure via serial
This is much faster than building unique firmwares for each device
"""
import os
import sys
import subprocess
import argparse
import glob
from pathlib import Path
import time
def detect_devices():
"""Detect all ESP32 devices"""
devices = sorted(glob.glob('/dev/ttyUSB*'))
return devices
def build_firmware(project_dir):
"""Build the firmware once"""
print("=" * 60)
print("Building firmware (one time)...")
print("=" * 60)
result = subprocess.run(
['idf.py', 'build'],
cwd=project_dir,
capture_output=False
)
if result.returncode != 0:
print("✗ Build failed!")
return False
print("✓ Build complete")
return True
def flash_device(port, project_dir):
"""Flash a single device"""
try:
result = subprocess.run(
['idf.py', '-p', port, 'flash'],
cwd=project_dir,
capture_output=True,
text=True,
timeout=120
)
return result.returncode == 0
except Exception as e:
return False
def flash_all_devices(devices, project_dir):
"""Flash all devices with the same firmware"""
print(f"\nFlashing {len(devices)} devices...")
success_count = 0
for idx, dev in enumerate(devices, 1):
print(f"[{idx:2d}/{len(devices)}] Flashing {dev}...", end='', flush=True)
if flash_device(dev, project_dir):
print("")
success_count += 1
else:
print("")
print(f"\nFlashed: {success_count}/{len(devices)}")
return success_count
def reconfigure_devices(ssid, password, start_ip, gateway="192.168.1.1"):
"""Reconfigure devices using the reconfig script"""
script_path = os.path.join(os.path.dirname(__file__), 'reconfig_simple.py')
if not os.path.exists(script_path):
print(f"Error: {script_path} not found!")
return False
print("\n" + "=" * 60)
print("Reconfiguring WiFi settings via serial...")
print("=" * 60)
cmd = [
'python3', script_path,
'-s', ssid,
'-p', password,
'--start-ip', start_ip,
'-g', gateway
]
result = subprocess.run(cmd)
return result.returncode == 0
def main():
parser = argparse.ArgumentParser(
description='Flash and configure all ESP32 devices',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
This script:
1. Builds firmware ONCE
2. Flashes the SAME firmware to all devices (fast!)
3. Reconfigures each device via serial with unique IP
Much faster than building 32 different firmwares!
Examples:
# Basic usage
%(prog)s --ssid MyWiFi --password mypass
# Custom IP range
%(prog)s --ssid MyWiFi --password mypass --start-ip 192.168.1.100
# Build only (no flash)
%(prog)s --build-only
# Reconfigure only (no flash)
%(prog)s --reconfig-only --ssid MyWiFi --password mypass
"""
)
parser.add_argument('--ssid', help='WiFi SSID')
parser.add_argument('--password', help='WiFi password')
parser.add_argument('--start-ip', default='192.168.1.50',
help='Starting IP address (default: 192.168.1.50)')
parser.add_argument('--gateway', default='192.168.1.1',
help='Gateway IP (default: 192.168.1.1)')
parser.add_argument('--project-dir', default=None,
help='ESP32 project directory (default: current dir)')
parser.add_argument('--build-only', action='store_true',
help='Only build, do not flash')
parser.add_argument('--reconfig-only', action='store_true',
help='Only reconfigure, do not build/flash')
parser.add_argument('--skip-build', action='store_true',
help='Skip build, use existing firmware')
args = parser.parse_args()
# Determine project directory
if args.project_dir:
project_dir = args.project_dir
else:
project_dir = os.path.dirname(os.path.abspath(__file__))
if not os.path.exists(os.path.join(project_dir, 'CMakeLists.txt')):
print(f"Error: Not an ESP-IDF project directory: {project_dir}")
return 1
# Detect devices
devices = detect_devices()
if not devices and not args.build_only:
print("Error: No devices found!")
return 1
print(f"Found {len(devices)} device(s)")
# Reconfigure only mode
if args.reconfig_only:
if not args.ssid or not args.password:
print("Error: --ssid and --password required for reconfigure mode")
return 1
return 0 if reconfigure_devices(args.ssid, args.password, args.start_ip, args.gateway) else 1
# Build firmware
if not args.skip_build:
if not build_firmware(project_dir):
return 1
if args.build_only:
print("\n Build complete. Use --skip-build to flash later.")
return 0
# Flash all devices
flash_count = flash_all_devices(devices, project_dir)
if flash_count == 0:
print("✗ No devices flashed successfully")
return 1
# Reconfigure if credentials provided
if args.ssid and args.password:
print("\nWaiting for devices to boot...")
time.sleep(5)
if not reconfigure_devices(args.ssid, args.password, args.start_ip, args.gateway):
print("✗ Reconfiguration failed")
return 1
else:
print("\n" + "=" * 60)
print("Flashing complete!")
print("=" * 60)
print("\nTo configure WiFi settings, run:")
print(f" python3 reconfig_simple.py -s YourSSID -p YourPassword --start-ip {args.start_ip}")
print("\n✓ Done!")
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,68 +1,249 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
Unified udev rules generator for ESP32 devices.
Combines functionality from gen_udev_rules.py and append_new_rules.py.
Features:
- Generate rules from scratch (full scan)
- Append new rules to existing file (incremental update)
- Uses ID_PATH for stable device identification
- Sorts devices by physical topology for consistent ordering
"""
import os import os
import pyudev import re
import sys
import subprocess
from pathlib import Path
from serial.tools import list_ports
def generate_rules(): class Colors:
context = pyudev.Context() GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
# Find all TTY devices driven by usb-serial drivers (CP210x, FTDI, etc.) # Default paths
DEFAULT_RULES_FILE = "/etc/udev/rules.d/99-esp32-stable.rules"
TEMP_RULES_FILE = "new_rules.part"
def get_existing_rules(rules_path):
"""
Parse existing rules file to extract:
- Set of ID_PATH values already mapped
- Maximum port number used
Returns: (known_paths, max_port)
"""
known_paths = set()
max_port = 0
if not os.path.exists(rules_path):
return known_paths, 0
regex_path = re.compile(r'ENV\{ID_PATH\}=="([^"]+)"')
regex_port = re.compile(r'esp_port_(\d+)')
try:
with open(rules_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
path_match = regex_path.search(line)
port_match = regex_port.search(line)
if path_match:
known_paths.add(path_match.group(1))
if port_match:
port_num = int(port_match.group(1))
if port_num > max_port:
max_port = port_num
except Exception as e:
print(f"{Colors.RED}Error reading rules file: {e}{Colors.RESET}")
return known_paths, 0
return known_paths, max_port
def scan_usb_devices():
"""
Scan all USB serial devices and return list of (device, id_path, location) tuples.
Uses udevadm to get stable ID_PATH identifiers.
"""
devices = [] devices = []
for device in context.list_devices(subsystem='tty'):
if 'ID_VENDOR_ID' in device.properties and 'ID_MODEL_ID' in device.properties:
# We filter for USB serial devices only
parent = device.find_parent('usb', 'usb_interface')
if parent:
devices.append(device)
# Sort by their physical path so they are ordered by hub port # Get all USB serial devices
# The 'DEVPATH' usually looks like .../usb1/1-2/1-2.3/1-2.3.4... usb_devices = list(list_ports.grep("USB|ACM|CP210|FT232"))
devices.sort(key=lambda x: x.properties.get('DEVPATH', ''))
print(f"# Detected {len(devices)} devices. Generating rules...\n") if not usb_devices:
return devices
# Sort by physical location for consistent ordering
usb_devices.sort(key=lambda x: x.location if x.location else x.device)
for dev in usb_devices:
try:
# Get ID_PATH via udevadm (most stable identifier)
cmd = ['udevadm', 'info', '--name', dev.device, '--query=property']
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=2)
if proc.returncode != 0:
continue
props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line)
id_path = props.get('ID_PATH', '')
if not id_path:
continue
devices.append({
'device': dev.device,
'id_path': id_path,
'location': dev.location or '',
'serial': props.get('ID_SERIAL_SHORT', '')
})
except Exception as e:
print(f"{Colors.YELLOW}Warning: Could not inspect {dev.device}: {e}{Colors.RESET}")
continue
return devices
def generate_rules(devices, start_port=1):
"""
Generate udev rules for the given devices.
Returns list of rule strings.
"""
rules = [] rules = []
hub_counter = 1 port_num = start_port
port_counter = 1
last_parent_path = None
for dev in devices: for dev in devices:
# Get the unique physical path identifier (KERNELS) symlink = f"esp_port_{port_num}"
# We need the parent USB interface kernel name (e.g., '1-1.2:1.0') rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev["id_path"]}", SYMLINK+="{symlink}"'
parent = dev.find_parent('usb', 'usb_interface')
if not parent: continue
kernels_path = parent.device_path.split('/')[-1]
# Simple heuristic to detect a new hub (big jump in path length or numbering)
# You might need to manually tweak the hub numbers in the file later.
# Generate the rule
# KERNELS matches the physical port.
# SYMLINK creates the static alias.
rule = f'SUBSYSTEM=="tty", KERNELS=="{kernels_path}", SYMLINK+="esp_port_{len(rules)+1:02d}"'
rules.append(rule) rules.append(rule)
print(f"Mapped {dev.device_node} ({kernels_path}) -> /dev/esp_port_{len(rules):02d}") port_num += 1
# Write to file return rules
with open("99-esp32-static.rules", "w") as f:
f.write("# Persistent USB Serial mapping for ESP32 Hubs\n")
f.write("# Generated automatically. Do not edit unless topology changes.\n\n")
for r in rules:
f.write(r + "\n")
print(f"\n{'-'*60}") def main():
print(f"SUCCESS: Generated {len(rules)} rules in '99-esp32-static.rules'.") import argparse
print(f"To install:\n 1. Review the file to ensure order is correct.")
print(f" 2. sudo cp 99-esp32-static.rules /etc/udev/rules.d/") parser = argparse.ArgumentParser(
print(f" 3. sudo udevadm control --reload-rules") description='Generate or update udev rules for ESP32 devices',
print(f" 4. sudo udevadm trigger") formatter_class=argparse.RawDescriptionHelpFormatter,
print(f"{'-'*60}") epilog="""
Examples:
# Generate rules from scratch (overwrites existing)
%(prog)s --full
# Append only new devices to existing rules
%(prog)s --append
# Dry run (show what would be generated)
%(prog)s --append --dry-run
"""
)
parser.add_argument('--full', action='store_true',
help='Generate complete rules file from scratch')
parser.add_argument('--append', action='store_true',
help='Append only new devices to existing rules')
parser.add_argument('--rules-file', default=DEFAULT_RULES_FILE,
help=f'Path to udev rules file (default: {DEFAULT_RULES_FILE})')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without writing files')
parser.add_argument('--output', help='Write to custom file instead of rules file')
args = parser.parse_args()
# Default to append mode if neither specified
if not args.full and not args.append:
args.append = True
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f" ESP32 Udev Rules Generator")
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}\n")
# Scan connected devices
print(f"{Colors.CYAN}Scanning USB topology...{Colors.RESET}")
all_devices = scan_usb_devices()
if not all_devices:
print(f"{Colors.RED}No USB serial devices found.{Colors.RESET}")
return 1
print(f"Found {len(all_devices)} USB serial devices\n")
if args.full:
# Generate complete rules file
print(f"{Colors.CYAN}Generating complete rules file...{Colors.RESET}")
rules = generate_rules(all_devices, start_port=1)
output_file = args.output or args.rules_file
print(f"\n{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
print("-" * 65)
for i, dev in enumerate(all_devices):
port_num = i + 1
symlink = f"esp_port_{port_num}"
print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}")
if not args.dry_run:
rules_content = "# Auto-generated by gen_udev_rules.py\n"
rules_content += "# Stable port mapping based on USB physical topology (ID_PATH)\n\n"
rules_content += "\n".join(rules) + "\n"
if output_file == args.rules_file and os.geteuid() != 0:
# Write to temp file for user to copy
temp_file = "99-esp32-stable.rules"
with open(temp_file, 'w') as f:
f.write(rules_content)
print(f"\n{Colors.YELLOW}Rules written to: {temp_file}{Colors.RESET}")
print(f"To install, run:")
print(f" {Colors.GREEN}sudo cp {temp_file} {args.rules_file}{Colors.RESET}")
else:
with open(output_file, 'w') as f:
f.write(rules_content)
print(f"\n{Colors.GREEN}Rules written to: {output_file}{Colors.RESET}")
else:
print(f"\n{Colors.YELLOW}DRY RUN: Would generate {len(rules)} rules{Colors.RESET}")
else: # append mode
# Read existing rules
known_paths, max_port = get_existing_rules(args.rules_file)
print(f"Existing rules: {len(known_paths)} devices mapped (Max Port: {max_port})")
# Find new devices
new_devices = [d for d in all_devices if d['id_path'] not in known_paths]
if not new_devices:
print(f"\n{Colors.GREEN}All connected devices are already mapped. No changes needed.{Colors.RESET}")
return 0
print(f"\n{Colors.CYAN}Found {len(new_devices)} UNMAPPED devices:{Colors.RESET}")
print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
print("-" * 65)
new_rules = generate_rules(new_devices, start_port=max_port + 1)
for i, dev in enumerate(new_devices):
port_num = max_port + 1 + i
symlink = f"esp_port_{port_num:02d}"
print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}")
if not args.dry_run:
# Write new rules to temp file
with open(TEMP_RULES_FILE, 'w') as f:
f.write("\n# --- Added by gen_udev_rules.py ---\n")
f.write("\n".join(new_rules) + "\n")
print(f"\n{Colors.CYAN}New rules saved to: {TEMP_RULES_FILE}{Colors.RESET}")
print(f"To apply, run:")
print(f" {Colors.GREEN}cat {TEMP_RULES_FILE} | sudo tee -a {args.rules_file}{Colors.RESET}")
print(f" {Colors.GREEN}sudo udevadm control --reload-rules && sudo udevadm trigger{Colors.RESET}")
else:
print(f"\n{Colors.YELLOW}DRY RUN: Would append {len(new_rules)} rules{Colors.RESET}")
print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}")
return 0
if __name__ == '__main__': if __name__ == '__main__':
# Requires 'pyudev'. Install with: sudo dnf install python3-pyudev (or pip install pyudev) sys.exit(main())
try:
import pyudev
generate_rules()
except ImportError:
print("Error: This script requires 'pyudev'.")
print("Install it via: pip install pyudev")

188
leddiff.txt Normal file
View File

@ -0,0 +1,188 @@
diff --git a/components/status_led/status_led.c b/components/status_led/status_led.c
index 5a34d7e..6dd6d09 100644
--- a/components/status_led/status_led.c
+++ b/components/status_led/status_led.c
@@ -30,7 +30,6 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
-
#include "status_led.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
@@ -38,87 +37,56 @@
#include "led_strip.h"
#include "esp_log.h"
-static const char *TAG = "status_led";
+static const char *TAG = "STATUS_LED"; // Added TAG for logging
static led_strip_handle_t s_led_strip = NULL;
static bool s_is_rgb = false;
static int s_gpio_pin = -1;
static volatile led_state_t s_current_state = LED_STATE_NO_CONFIG;
-// Helper to set color safely
static void set_color(uint8_t r, uint8_t g, uint8_t b) {
if (s_is_rgb && s_led_strip) {
led_strip_set_pixel(s_led_strip, 0, r, g, b);
led_strip_refresh(s_led_strip);
} else if (!s_is_rgb && s_gpio_pin >= 0) {
- // Simple LED logic: If any color is requested, turn ON.
- // NOTE: If your LED is active-low (VCC->LED->Pin), invert this to !((r+g+b)>0)
- gpio_set_level(s_gpio_pin, (r + g + b) > 0 ? 1 : 0);
+ gpio_set_level(s_gpio_pin, (r+g+b) > 0);
}
}
static void led_task(void *arg) {
int toggle = 0;
-
- // --- Startup Diagnostic Sequence ---
- // Cycle R -> G -> B to prove hardware is working
- ESP_LOGW(TAG, "Running LED Diagnostic Sequence on GPIO %d...", s_gpio_pin);
- set_color(50, 0, 0); // Red
- vTaskDelay(pdMS_TO_TICKS(300));
- set_color(0, 50, 0); // Green
- vTaskDelay(pdMS_TO_TICKS(300));
- set_color(0, 0, 50); // Blue
- vTaskDelay(pdMS_TO_TICKS(300));
- set_color(0, 0, 0); // Off
- vTaskDelay(pdMS_TO_TICKS(100));
-
while (1) {
- // Brightness set to 30-50 (out of 255) for visibility
switch (s_current_state) {
- case LED_STATE_NO_CONFIG: // Yellow (Solid RGB / Blink Simple)
- if (s_is_rgb) {
- set_color(40, 30, 0);
- vTaskDelay(pdMS_TO_TICKS(1000));
- } else {
- set_color(1,1,1); vTaskDelay(pdMS_TO_TICKS(100));
- set_color(0,0,0); vTaskDelay(pdMS_TO_TICKS(100));
- }
+ case LED_STATE_NO_CONFIG: // Yellow
+ if (s_is_rgb) { set_color(25, 25, 0); vTaskDelay(pdMS_TO_TICKS(1000)); }
+ else { set_color(1,1,1); vTaskDelay(100); set_color(0,0,0); vTaskDelay(100); }
break;
- case LED_STATE_WAITING: // Blue Blink
- set_color(0, 0, toggle ? 50 : 0);
- toggle = !toggle;
+ // ... rest of cases identical to your code ...
+ case LED_STATE_WAITING:
+ set_color(0, 0, toggle ? 50 : 0); toggle = !toggle;
vTaskDelay(pdMS_TO_TICKS(500));
break;
- case LED_STATE_CONNECTED: // Green Solid
- set_color(0, 30, 0);
- vTaskDelay(pdMS_TO_TICKS(1000));
+ case LED_STATE_CONNECTED:
+ set_color(0, 25, 0); vTaskDelay(pdMS_TO_TICKS(1000));
break;
- case LED_STATE_MONITORING: // Cyan Solid
- set_color(0, 30, 30);
- vTaskDelay(pdMS_TO_TICKS(1000));
+ case LED_STATE_MONITORING:
+ set_color(0, 0, 50); vTaskDelay(pdMS_TO_TICKS(1000));
break;
- case LED_STATE_TRANSMITTING: // Purple Fast Flash
- set_color(toggle ? 40 : 0, 0, toggle ? 40 : 0);
- toggle = !toggle;
- vTaskDelay(pdMS_TO_TICKS(100));
+ case LED_STATE_TRANSMITTING:
+ set_color(toggle ? 50 : 0, 0, toggle ? 50 : 0); toggle = !toggle;
+ vTaskDelay(pdMS_TO_TICKS(50));
break;
- case LED_STATE_TRANSMITTING_SLOW: // Purple Slow Pulse
- set_color(toggle ? 40 : 0, 0, toggle ? 40 : 0);
- toggle = !toggle;
- vTaskDelay(pdMS_TO_TICKS(500));
+ case LED_STATE_TRANSMITTING_SLOW:
+ set_color(toggle ? 50 : 0, 0, toggle ? 50 : 0); toggle = !toggle;
+ vTaskDelay(pdMS_TO_TICKS(250));
break;
- case LED_STATE_STALLED: // Red/Purple Solid
- set_color(50, 0, 20);
- vTaskDelay(pdMS_TO_TICKS(1000));
+ case LED_STATE_STALLED:
+ set_color(50, 0, 50); vTaskDelay(pdMS_TO_TICKS(1000));
break;
- case LED_STATE_FAILED: // Red Blink
- set_color(toggle ? 50 : 0, 0, 0);
- toggle = !toggle;
+ case LED_STATE_FAILED:
+ set_color(toggle ? 50 : 0, 0, 0); toggle = !toggle;
vTaskDelay(pdMS_TO_TICKS(200));
break;
- default:
- vTaskDelay(pdMS_TO_TICKS(100));
- break;
}
}
}
@@ -127,41 +95,27 @@ void status_led_init(int gpio_pin, bool is_rgb_strip) {
s_gpio_pin = gpio_pin;
s_is_rgb = is_rgb_strip;
- ESP_LOGI(TAG, "Initializing Status LED: GPIO=%d, Type=%s",
- gpio_pin, is_rgb_strip ? "RGB Strip (WS2812)" : "Simple GPIO");
+ // --- DIAGNOSTIC LOG ---
+ ESP_LOGW(TAG, "Initializing LED on GPIO %d (RGB: %d)", gpio_pin, is_rgb_strip);
if (s_is_rgb) {
- led_strip_config_t s_cfg = {
- .strip_gpio_num = gpio_pin,
- .max_leds = 1,
- .led_pixel_format = LED_PIXEL_FORMAT_GRB,
- .led_model = LED_MODEL_WS2812,
- .flags.invert_out = false,
- };
- led_strip_rmt_config_t r_cfg = {
- .resolution_hz = 10 * 1000 * 1000,
- .flags.with_dma = false,
- };
+ led_strip_config_t s_cfg = { .strip_gpio_num = gpio_pin, .max_leds = 1 };
+ led_strip_rmt_config_t r_cfg = { .resolution_hz = 10 * 1000 * 1000 };
esp_err_t ret = led_strip_new_rmt_device(&s_cfg, &r_cfg, &s_led_strip);
if (ret != ESP_OK) {
- ESP_LOGE(TAG, "Failed to create RMT LED strip: %s", esp_err_to_name(ret));
- return;
+ ESP_LOGE(TAG, "RMT Device Init Failed: %s", esp_err_to_name(ret));
+ } else {
+ ESP_LOGI(TAG, "RMT Device Init Success");
+ led_strip_clear(s_led_strip);
}
- led_strip_clear(s_led_strip);
} else {
gpio_reset_pin(gpio_pin);
gpio_set_direction(gpio_pin, GPIO_MODE_OUTPUT);
- gpio_set_level(gpio_pin, 0);
}
-
xTaskCreate(led_task, "led_task", 2048, NULL, 5, NULL);
}
-void status_led_set_state(led_state_t state) {
- s_current_state = state;
-}
-
-led_state_t status_led_get_state(void) {
- return s_current_state;
-}
+// ... Setters/Getters ...
+void status_led_set_state(led_state_t state) { s_current_state = state; }
+led_state_t status_led_get_state(void) { return s_current_state; }
diff --git a/main/board_config.h b/main/board_config.h
index 6e4aa28..05d7726 100644
--- a/main/board_config.h
+++ b/main/board_config.h
@@ -42,7 +42,7 @@
// ============================================================================
// ESP32-C5 (DevKitC-1) 3.3V VCC Pin 1 GND PIN 15
// ============================================================================
- #define RGB_LED_GPIO 8 // Common addressable LED pin for C5
+ #define RGB_LED_GPIO 27 // Common addressable LED pin for C5
#define HAS_RGB_LED 1
#define GPS_TX_PIN GPIO_NUM_24
#define GPS_RX_PIN GPIO_NUM_23

17
main/idf_component.yml Normal file
View File

@ -0,0 +1,17 @@
## IDF Component Manager Manifest File
dependencies:
## Required IDF version
idf:
version: '>=4.1.0'
# # Put list of dependencies here
# # For components maintained by Espressif:
# component: "~1.0.0"
# # For 3rd party components:
# username/component: ">=1.0.0,<2.0.0"
# username2/component2:
# version: "~1.0.0"
# # For transient dependencies `public` flag can be set.
# # `public` flag doesn't have an effect dependencies of the `main` component.
# # All dependencies of `main` are public by default.
# public: true
espressif/led_strip: ^2.5.3

View File

@ -39,6 +39,7 @@
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "freertos/task.h" #include "freertos/task.h"
#include "esp_system.h" #include "esp_system.h"
#include "esp_err.h"
#include "esp_log.h" #include "esp_log.h"
#include "esp_console.h" #include "esp_console.h"
#include "esp_vfs_dev.h" #include "esp_vfs_dev.h"
@ -62,7 +63,7 @@
#include "csi_manager.h" #include "csi_manager.h"
#endif #endif
#define APP_VERSION "2.0.0-SHELL" #define APP_VERSION "2.1.0-CONSOLE-DEBUG"
static const char *TAG = "MAIN"; static const char *TAG = "MAIN";
@ -173,7 +174,7 @@ void app_main(void) {
ESP_LOGW(TAG, "GPS initialization skipped (Disabled in NVS)"); ESP_LOGW(TAG, "GPS initialization skipped (Disabled in NVS)");
} }
// 4. Hardware Init // Hardware Init
status_led_init(RGB_LED_GPIO, HAS_RGB_LED); status_led_init(RGB_LED_GPIO, HAS_RGB_LED);
status_led_set_state(LED_STATE_FAILED); // Force Red Blink status_led_set_state(LED_STATE_FAILED); // Force Red Blink
#ifdef CONFIG_ESP_WIFI_CSI_ENABLED #ifdef CONFIG_ESP_WIFI_CSI_ENABLED
@ -186,27 +187,42 @@ void app_main(void) {
iperf_param_init(); iperf_param_init();
// 6. Initialize Console (REPL) // 6. Initialize Console (REPL)
ESP_LOGI(TAG, "Initializing console REPL...");
esp_console_repl_t *repl = NULL; esp_console_repl_t *repl = NULL;
esp_console_repl_config_t repl_config = ESP_CONSOLE_REPL_CONFIG_DEFAULT(); esp_console_repl_config_t repl_config = ESP_CONSOLE_REPL_CONFIG_DEFAULT();
repl_config.prompt = s_cli_prompt; repl_config.prompt = s_cli_prompt;
repl_config.max_cmdline_length = 1024; repl_config.max_cmdline_length = 1024;
esp_console_dev_uart_config_t hw_config = ESP_CONSOLE_DEV_UART_CONFIG_DEFAULT(); esp_console_dev_uart_config_t hw_config = ESP_CONSOLE_DEV_UART_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_console_new_repl_uart(&hw_config, &repl_config, &repl)); esp_err_t repl_init_err = esp_console_new_repl_uart(&hw_config, &repl_config, &repl);
if (repl_init_err != ESP_OK) {
ESP_LOGE(TAG, "Failed to create console REPL: %s", esp_err_to_name(repl_init_err));
esp_restart();
}
ESP_LOGI(TAG, "Console REPL object created successfully");
// 7. Register Commands // 7. Register Commands
ESP_LOGI(TAG, "Registering console commands...");
register_system_common(); register_system_common();
app_console_register_commands(); app_console_register_commands();
ESP_LOGI(TAG, "Console commands registered");
// 8. Initial Prompt State Check // 8. Initial Prompt State Check
app_console_update_prompt(); app_console_update_prompt();
// 9. Start Shell // 9. Start Shell
ESP_LOGI(TAG, "Starting console REPL...");
printf("\n ==================================================\n"); printf("\n ==================================================\n");
printf(" | ESP32 iPerf Shell - Ready |\n"); printf(" | ESP32 iPerf Shell - Ready |\n");
printf(" | Type 'help' for commands |\n"); printf(" | Type 'help' for commands |\n");
printf(" ==================================================\n"); printf(" ==================================================\n");
fflush(stdout);
ESP_ERROR_CHECK(esp_console_start_repl(repl)); esp_err_t repl_err = esp_console_start_repl(repl);
if (repl_err != ESP_OK) {
ESP_LOGE(TAG, "Failed to start console REPL: %s", esp_err_to_name(repl_err));
esp_restart();
}
// Note: esp_console_start_repl() blocks and never returns on success
// so code below would never execute
} }

View File

@ -1,244 +0,0 @@
#!/usr/bin/env python3
"""
Map ESP32 USB ports to IP addresses
Creates and manages mapping between /dev/ttyUSB* and assigned IPs
"""
import serial.tools.list_ports
import argparse
import json
import glob
import re
from pathlib import Path
class USBIPMapper:
def __init__(self, start_ip="192.168.1.51", config_file="usb_ip_map.json"):
self.start_ip = start_ip
self.config_file = config_file
self.mapping = {}
def get_ip_for_index(self, index):
"""Calculate IP address for a given index"""
ip_parts = self.start_ip.split('.')
base_ip = int(ip_parts[3])
ip_parts[3] = str(base_ip + index)
return '.'.join(ip_parts)
def extract_usb_number(self, port):
"""Extract number from /dev/ttyUSBX"""
match = re.search(r'ttyUSB(\d+)', port)
if match:
return int(match.group(1))
return None
def detect_devices(self):
"""Detect all ESP32 USB devices and create mapping"""
devices = sorted(glob.glob('/dev/ttyUSB*'))
print(f"\n{'='*70}")
print(f"ESP32 USB to IP Address Mapping")
print(f"{'='*70}")
print(f"Start IP: {self.start_ip}")
print(f"Detected {len(devices)} USB device(s)\n")
self.mapping = {}
for idx, port in enumerate(devices):
usb_num = self.extract_usb_number(port)
ip = self.get_ip_for_index(idx)
# Get device info
try:
ports = serial.tools.list_ports.comports()
device_info = next((p for p in ports if p.device == port), None)
if device_info:
serial_num = device_info.serial_number or "Unknown"
description = device_info.description or "Unknown"
else:
serial_num = "Unknown"
description = "Unknown"
except:
serial_num = "Unknown"
description = "Unknown"
self.mapping[port] = {
'index': idx,
'usb_number': usb_num,
'ip': ip,
'serial': serial_num,
'description': description
}
print(f"[{idx:2d}] {port:14s}{ip:15s} (USB{usb_num}, SN: {serial_num})")
print(f"\n{'='*70}")
print(f"Total: {len(devices)} devices mapped")
print(f"IP Range: {self.mapping[devices[0]]['ip']} - {self.mapping[devices[-1]]['ip']}" if devices else "")
print(f"{'='*70}\n")
return self.mapping
def save_mapping(self):
"""Save mapping to JSON file"""
with open(self.config_file, 'w') as f:
json.dump(self.mapping, f, indent=2)
print(f"✓ Mapping saved to {self.config_file}")
def load_mapping(self):
"""Load mapping from JSON file"""
try:
with open(self.config_file, 'r') as f:
self.mapping = json.load(f)
print(f"✓ Mapping loaded from {self.config_file}")
return self.mapping
except FileNotFoundError:
print(f"✗ No saved mapping found at {self.config_file}")
return {}
def get_ip(self, port):
"""Get IP address for a specific USB port"""
if port in self.mapping:
return self.mapping[port]['ip']
return None
def get_port(self, ip):
"""Get USB port for a specific IP address"""
for port, info in self.mapping.items():
if info['ip'] == ip:
return port
return None
def print_mapping(self):
"""Print current mapping"""
if not self.mapping:
print("No mapping loaded. Run with --detect first.")
return
print(f"\n{'='*70}")
print(f"Current USB to IP Mapping")
print(f"{'='*70}")
for port, info in sorted(self.mapping.items(), key=lambda x: x[1]['index']):
print(f"[{info['index']:2d}] {port:14s}{info['ip']:15s} (USB{info['usb_number']})")
print(f"{'='*70}\n")
def export_bash_script(self, filename="usb_ip_vars.sh"):
"""Export mapping as bash variables"""
with open(filename, 'w') as f:
f.write("#!/bin/bash\n")
f.write("# USB to IP mapping - Auto-generated\n\n")
# Create associative array
f.write("declare -A USB_TO_IP\n")
for port, info in self.mapping.items():
f.write(f"USB_TO_IP[{port}]=\"{info['ip']}\"\n")
f.write("\n# Create reverse mapping\n")
f.write("declare -A IP_TO_USB\n")
for port, info in self.mapping.items():
f.write(f"IP_TO_USB[{info['ip']}]=\"{port}\"\n")
f.write("\n# Helper functions\n")
f.write("get_ip_for_usb() { echo \"${USB_TO_IP[$1]}\"; }\n")
f.write("get_usb_for_ip() { echo \"${IP_TO_USB[$1]}\"; }\n")
print(f"✓ Bash script exported to {filename}")
print(f" Usage: source {filename} && get_ip_for_usb /dev/ttyUSB0")
def main():
parser = argparse.ArgumentParser(
description='Map ESP32 USB ports to IP addresses',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Detect devices and create mapping
%(prog)s --detect
# Detect and save to file
%(prog)s --detect --save
# Load saved mapping and display
%(prog)s --load --print
# Get IP for specific USB port
%(prog)s --load --port /dev/ttyUSB5
# Get USB port for specific IP
%(prog)s --load --ip 192.168.1.55
# Export as bash script
%(prog)s --load --export
# Use custom IP range
%(prog)s --detect --start-ip 10.0.0.100
"""
)
parser.add_argument('--detect', action='store_true',
help='Detect USB devices and create mapping')
parser.add_argument('--save', action='store_true',
help='Save mapping to file')
parser.add_argument('--load', action='store_true',
help='Load mapping from file')
parser.add_argument('--print', action='store_true',
help='Print current mapping')
parser.add_argument('--start-ip', default='192.168.1.51',
help='Starting IP address (default: 192.168.1.51)')
parser.add_argument('--config', default='usb_ip_map.json',
help='Config file path (default: usb_ip_map.json)')
parser.add_argument('--port', metavar='PORT',
help='Get IP for specific USB port (e.g., /dev/ttyUSB5)')
parser.add_argument('--ip', metavar='IP',
help='Get USB port for specific IP address')
parser.add_argument('--export', action='store_true',
help='Export mapping as bash script')
args = parser.parse_args()
mapper = USBIPMapper(start_ip=args.start_ip, config_file=args.config)
# Detect devices
if args.detect:
mapper.detect_devices()
if args.save:
mapper.save_mapping()
# Load mapping
if args.load:
mapper.load_mapping()
# Print mapping
if args.print:
mapper.print_mapping()
# Query specific port
if args.port:
if not mapper.mapping:
mapper.load_mapping()
ip = mapper.get_ip(args.port)
if ip:
print(f"{args.port}{ip}")
else:
print(f"Port {args.port} not found in mapping")
# Query specific IP
if args.ip:
if not mapper.mapping:
mapper.load_mapping()
port = mapper.get_port(args.ip)
if port:
print(f"{args.ip}{port}")
else:
print(f"IP {args.ip} not found in mapping")
# Export bash script
if args.export:
if not mapper.mapping:
mapper.load_mapping()
mapper.export_bash_script()
# Default: detect and print
if not any([args.detect, args.load, args.print, args.port, args.ip, args.export]):
mapper.detect_devices()
if __name__ == '__main__':
main()

View File

@ -1,319 +0,0 @@
#!/usr/bin/env python3
"""
ESP32 Mass Deployment Tool (Fixed for Parallel Flashing & Path Issues)
Uses esptool.py from the build directory to resolve relative paths correctly.
"""
import os
import sys
import subprocess
import glob
import time
import argparse
import serial
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
NC = '\033[0m'
class DeviceDeployer:
def __init__(self, project_dir, ssid, password, start_ip="192.168.1.51",
netmask="255.255.255.0", gateway="192.168.1.1",
baud_rate=460800, max_retries=2, verify_ping=True,
num_devices=None, verbose=False, parallel=True):
self.project_dir = Path(project_dir).resolve() # Absolute path is safer
self.build_dir = self.project_dir / 'build'
self.ssid = ssid
self.password = password
self.start_ip = start_ip
self.netmask = netmask
self.gateway = gateway
self.baud_rate = baud_rate
self.max_retries = max_retries
self.verify_ping = verify_ping
self.num_devices = num_devices
self.verbose = verbose
self.parallel = parallel
self.config_mode = (self.ssid is not None and self.password is not None)
if self.start_ip:
ip_parts = start_ip.split('.')
self.ip_base = '.'.join(ip_parts[:3])
self.ip_start = int(ip_parts[3])
else:
self.ip_base = "192.168.1"
self.ip_start = 51
self.devices = []
self.results = {}
self.log_dir = Path('/tmp')
def print_banner(self):
print()
print(f"{Colors.BLUE}{'='*70}")
print("ESP32 Mass Deployment Tool")
print(f"{'='*70}{Colors.NC}")
print(f"Project: {self.project_dir}")
print(f"Build Dir: {self.build_dir}")
if self.config_mode:
print(f"Mode: {Colors.YELLOW}FLASH + CONFIGURE{Colors.NC}")
print(f"SSID: {self.ssid}")
print(f"Password: {'*' * len(self.password)}")
print(f"Start IP: {self.start_ip}")
else:
print(f"Mode: {Colors.GREEN}FLASH ONLY (Preserve NVS){Colors.NC}")
print(f"Flash Baud: {self.baud_rate}")
print(f"Parallel: {self.parallel}")
if self.num_devices:
print(f"Max Devices: {self.num_devices}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
def build_firmware(self):
print()
print(f"{Colors.YELLOW}[1/4] Building firmware...{Colors.NC}")
try:
subprocess.run(
['idf.py', 'build'],
cwd=self.project_dir,
check=True,
capture_output=not self.verbose
)
flash_args_path = self.build_dir / 'flash_args'
if not flash_args_path.exists():
print(f"{Colors.RED}Error: build/flash_args not found.{Colors.NC}")
return False
print(f"{Colors.GREEN}✓ Build complete{Colors.NC}")
return True
except subprocess.CalledProcessError as e:
print(f"{Colors.RED}✗ Build failed!{Colors.NC}")
if self.verbose: print(e.stderr.decode() if e.stderr else "")
return False
def detect_devices(self):
print()
print(f"{Colors.YELLOW}[2/4] Detecting ESP32 devices...{Colors.NC}")
self.devices = sorted(glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*'))
if not self.devices:
print(f"{Colors.RED}ERROR: No devices found!{Colors.NC}")
return False
if self.num_devices and len(self.devices) > self.num_devices:
print(f"Limiting to first {self.num_devices} devices")
self.devices = self.devices[:self.num_devices]
print(f"{Colors.GREEN}Found {len(self.devices)} device(s):{Colors.NC}")
for i, device in enumerate(self.devices):
if self.config_mode:
print(f" [{i:2d}] {device:14s}{self.get_ip_for_index(i)}")
else:
print(f" [{i:2d}] {device:14s} → (Existing IP)")
return True
def get_ip_for_index(self, index):
return f"{self.ip_base}.{self.ip_start + index}"
def flash_and_configure(self, index, device):
target_ip = self.get_ip_for_index(index) if self.config_mode else "Existing IP"
log_file = self.log_dir / f"esp32_deploy_{index}.log"
log_lines = []
flash_args_file = 'flash_args' # Relative to build_dir
def log(msg):
log_lines.append(msg)
if self.verbose or not self.parallel:
print(f"[{index}] {msg}")
for attempt in range(1, self.max_retries + 1):
log(f"=== Device {index}: {device} (Attempt {attempt}/{self.max_retries}) ===")
# --- FLASHING ---
log("Flashing via esptool...")
try:
cmd = [
'esptool.py',
'-p', device,
'-b', str(self.baud_rate),
'--before', 'default_reset',
'--after', 'hard_reset',
'write_flash',
f"@{flash_args_file}"
]
# CRITICAL FIX: Run from build_dir so relative paths in flash_args are valid
result = subprocess.run(
cmd,
cwd=self.build_dir,
check=True,
capture_output=True,
timeout=300
)
log("✓ Flash successful")
except subprocess.CalledProcessError as e:
log(f"✗ Flash failed: {e.stderr.decode() if e.stderr else 'Unknown error'}")
if attempt == self.max_retries:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines}
time.sleep(2)
continue
except subprocess.TimeoutExpired:
log("✗ Flash timeout")
if attempt == self.max_retries:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'TIMEOUT', 'log': log_lines}
continue
# --- CONFIGURATION ---
log("Waiting for boot (3s)...")
time.sleep(3)
if self.config_mode:
log(f"Configuring WiFi ({target_ip})...")
try:
config = (
f"CFG\n"
f"SSID:{self.ssid}\n"
f"PASS:{self.password}\n"
f"IP:{target_ip}\n"
f"MASK:{self.netmask}\n"
f"GW:{self.gateway}\n"
f"DHCP:0\n"
f"END\n"
)
with serial.Serial(device, 115200, timeout=2, write_timeout=2) as ser:
ser.reset_input_buffer()
ser.write(config.encode('utf-8'))
ser.flush()
log("✓ Config sent")
if self.verify_ping:
log("Waiting for network (6s)...")
time.sleep(6)
log(f"Pinging {target_ip}...")
try:
res = subprocess.run(['ping', '-c', '2', '-W', '3', target_ip], capture_output=True, timeout=10)
if res.returncode == 0:
log("✓ Ping successful")
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
else:
log("✗ Ping failed")
except:
log("✗ Ping error")
else:
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
except Exception as e:
log(f"✗ Config error: {e}")
else:
log("Configuration skipped (Preserving NVS)")
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'SUCCESS', 'log': log_lines}
time.sleep(2)
with open(log_file, 'w') as f: f.write('\n'.join(log_lines))
return {'index': index, 'device': device, 'ip': target_ip, 'status': 'FAILED', 'log': log_lines}
def deploy_all_parallel(self):
print()
print(f"{Colors.YELLOW}[3/4] Flashing (parallel)...{Colors.NC}")
max_workers = min(10, len(self.devices))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.flash_and_configure, i, device): (i, device)
for i, device in enumerate(self.devices)
}
for future in as_completed(futures):
result = future.result()
self.results[result['index']] = result
self.print_device_status(result)
def deploy_all_sequential(self):
print()
print(f"{Colors.YELLOW}[3/4] Flashing (sequential)...{Colors.NC}")
for i, device in enumerate(self.devices):
print(f"\n{Colors.BLUE}--- Device {i+1}/{len(self.devices)} ---{Colors.NC}")
result = self.flash_and_configure(i, device)
self.results[result['index']] = result
self.print_device_status(result)
def print_device_status(self, result):
status_color = {
'SUCCESS': Colors.GREEN, 'NO_PING': Colors.YELLOW,
'FAILED': Colors.RED, 'TIMEOUT': Colors.RED
}.get(result['status'], Colors.RED)
print(f"{status_color}[Device {result['index']:2d}] {result['device']:14s}{result['ip']:15s} [{result['status']}]{Colors.NC}")
def deploy_all(self):
if self.parallel: self.deploy_all_parallel()
else: self.deploy_all_sequential()
def print_summary(self):
print()
print(f"{Colors.YELLOW}[4/4] Deployment Summary{Colors.NC}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
success = sum(1 for r in self.results.values() if r['status'] == 'SUCCESS')
failed = sum(1 for r in self.results.values() if r['status'] in ['FAILED', 'TIMEOUT'])
for i in range(len(self.devices)):
if i in self.results:
r = self.results[i]
icon = f"{Colors.GREEN}{Colors.NC}" if r['status'] == 'SUCCESS' else f"{Colors.RED}{Colors.NC}"
print(f"{icon} {r['device']:14s}{r['ip']}")
print(f"{Colors.BLUE}{'='*70}{Colors.NC}")
print(f"Total: {len(self.devices)}")
print(f"Success: {success}")
print(f"Failed: {failed}")
return failed
def main():
parser = argparse.ArgumentParser(description='ESP32 Mass Deployment Tool')
parser.add_argument('-d', '--dir', default=os.getcwd(), help='ESP-IDF project dir')
parser.add_argument('-s', '--ssid', help='WiFi SSID')
parser.add_argument('-p', '--password', help='WiFi Password')
parser.add_argument('--start-ip', default='192.168.1.51', help='Starting IP')
parser.add_argument('-n', '--num-devices', type=int, default=30, help='Max devices')
parser.add_argument('-g', '--gateway', default='192.168.1.1', help='Gateway IP')
parser.add_argument('-m', '--netmask', default='255.255.255.0', help='Netmask')
parser.add_argument('-b', '--baud', type=int, default=460800, help='Flash baud rate')
parser.add_argument('-r', '--retries', type=int, default=2, help='Retries')
parser.add_argument('--no-verify', action='store_true', help='Skip ping check')
parser.add_argument('--sequential', action='store_true', help='Run sequentially')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose')
args = parser.parse_args()
if (args.ssid and not args.password) or (args.password and not args.ssid):
print(f"{Colors.RED}ERROR: Provide both SSID and Password for config, or neither for flash-only.{Colors.NC}")
sys.exit(1)
deployer = DeviceDeployer(
project_dir=args.dir, ssid=args.ssid, password=args.password,
start_ip=args.start_ip, netmask=args.netmask, gateway=args.gateway,
baud_rate=args.baud, max_retries=args.retries, verify_ping=not args.no_verify,
num_devices=args.num_devices if args.num_devices > 0 else None,
verbose=args.verbose, parallel=not args.sequential
)
deployer.print_banner()
if not deployer.build_firmware(): sys.exit(1)
if not deployer.detect_devices(): sys.exit(1)
deployer.deploy_all()
failed_count = deployer.print_summary()
sys.exit(failed_count)
if __name__ == '__main__':
main()

21
new_rules.part Normal file
View File

@ -0,0 +1,21 @@
# --- Added by update script ---
SUBSYSTEM=="tty", ATTRS{serial}=="04d3540e9223f011aefcbef12a319464", SYMLINK+="esp_port_01"
SUBSYSTEM=="tty", ATTRS{serial}=="0aa1c0a3a323f0118893c2f12a319464", SYMLINK+="esp_port_02"
SUBSYSTEM=="tty", ATTRS{serial}=="1ca7d2748a23f0118d9db8f12a319464", SYMLINK+="esp_port_03"
SUBSYSTEM=="tty", ATTRS{serial}=="1e8972fca871f011ae8af99e1045c30f", SYMLINK+="esp_port_04"
SUBSYSTEM=="tty", ATTRS{serial}=="263cd138a871f011bcecff9e1045c30f", SYMLINK+="esp_port_05"
SUBSYSTEM=="tty", ATTRS{serial}=="28e8c61a8523f011a56ac2f12a319464", SYMLINK+="esp_port_06"
SUBSYSTEM=="tty", ATTRS{serial}=="38717231a723f011a006c3f12a319464", SYMLINK+="esp_port_07"
SUBSYSTEM=="tty", ATTRS{serial}=="3e1afd689523f011b363baf12a319464", SYMLINK+="esp_port_08"
SUBSYSTEM=="tty", ATTRS{serial}=="4a6d2844a071f011bceaff9e1045c30f", SYMLINK+="esp_port_09"
SUBSYSTEM=="tty", ATTRS{serial}=="4e52608ba171f011af3ffb9e1045c30f", SYMLINK+="esp_port_10"
SUBSYSTEM=="tty", ATTRS{serial}=="4eaecac7a371f011be18fb9e1045c30f", SYMLINK+="esp_port_11"
SUBSYSTEM=="tty", ATTRS{serial}=="600faabf9a71f01195bbfb9e1045c30f", SYMLINK+="esp_port_12"
SUBSYSTEM=="tty", ATTRS{serial}=="640c56829723f01183c1bef12a319464", SYMLINK+="esp_port_13"
SUBSYSTEM=="tty", ATTRS{serial}=="904f28aede6ef011beac4d9b1045c30f", SYMLINK+="esp_port_14"
SUBSYSTEM=="tty", ATTRS{serial}=="A5069RR4", SYMLINK+="esp_port_15"
SUBSYSTEM=="tty", ATTRS{serial}=="a86592298f23f01199b0c2f12a319464", SYMLINK+="esp_port_16"
SUBSYSTEM=="tty", ATTRS{serial}=="ba2efc7fa071f0119aa1fb9e1045c30f", SYMLINK+="esp_port_17"
SUBSYSTEM=="tty", ATTRS{serial}=="fc641417fbf4ef11bd28a1a29ed47d52", SYMLINK+="esp_port_18"
SUBSYSTEM=="tty", ATTRS{serial}=="fedb86249723f01198ebc2f12a319464", SYMLINK+="esp_port_19"

View File

@ -1,167 +0,0 @@
#!/usr/bin/env python3
"""
Simple ESP32 WiFi Reconfiguration Tool
Sends WiFi config to all connected ESP32 devices via serial
"""
import serial
import time
import glob
import argparse
import sys
def reconfig_devices(ssid, password, start_ip, gateway="192.168.1.1",
netmask="255.255.255.0", verbose=False):
"""Reconfigure all connected devices"""
devices = sorted(glob.glob('/dev/ttyUSB*'))
num_devices = len(devices)
if num_devices == 0:
print("ERROR: No devices found!")
return 0
# Parse start IP
ip_parts = start_ip.split('.')
ip_base = '.'.join(ip_parts[:3])
ip_start = int(ip_parts[3])
ok_devices = 0
print(f"Found {num_devices} devices")
print(f"SSID: {ssid}")
print(f"Password: {'*' * len(password)}")
print(f"IP Range: {ip_base}.{ip_start} - {ip_base}.{ip_start + num_devices - 1}")
print()
for idx, dev in enumerate(devices):
ip = f"{ip_base}.{ip_start + idx}"
print(f"[{idx:2d}] Configuring {dev:14s}{ip}", end='')
try:
ser = serial.Serial(dev, 115200, timeout=1)
time.sleep(0.5) # Let serial port stabilize
# Send configuration
ser.write(b"CFG\n")
time.sleep(0.1)
ser.write(f"SSID:{ssid}\n".encode())
time.sleep(0.1)
ser.write(f"PASS:{password}\n".encode())
time.sleep(0.1)
ser.write(f"IP:{ip}\n".encode())
time.sleep(0.1)
ser.write(f"MASK:{netmask}\n".encode())
time.sleep(0.1)
ser.write(f"GW:{gateway}\n".encode())
time.sleep(0.1)
ser.write(b"DHCP:0\n")
time.sleep(0.1)
ser.write(b"END\n")
# Wait for OK response
time.sleep(0.5)
response = ser.read(100).decode('utf-8', errors='ignore')
if verbose and response.strip():
print(f"\n Response: {response[:80]}")
if 'OK' in response:
print("")
ok_devices += 1
else:
print(" ⚠ (no OK)")
ser.close()
except Exception as e:
print(f" ✗ Error: {e}")
time.sleep(0.5)
print()
print(f"{'='*60}")
print(f"Success: {ok_devices}/{num_devices}")
print(f"Failed: {num_devices - ok_devices}/{num_devices}")
print(f"{'='*60}")
return ok_devices
def main():
parser = argparse.ArgumentParser(
description='Reconfigure WiFi settings on all connected ESP32 devices',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage with defaults
%(prog)s
# Custom IP range
%(prog)s --start-ip 192.168.1.100
# Custom WiFi credentials
%(prog)s -s MyNetwork -p mypassword
# Different subnet
%(prog)s --start-ip 10.0.0.50 -g 10.0.0.1
# Verbose mode
%(prog)s -v
"""
)
parser.add_argument('-s', '--ssid', default='ClubHouse2G',
help='WiFi SSID (default: ClubHouse2G)')
parser.add_argument('-p', '--password', default='ez2remember',
help='WiFi password (default: ez2remember)')
parser.add_argument('--start-ip', default='192.168.1.51',
help='Starting IP address (default: 192.168.1.51)')
parser.add_argument('-g', '--gateway', default='192.168.1.1',
help='Gateway IP (default: 192.168.1.1)')
parser.add_argument('-m', '--netmask', default='255.255.255.0',
help='Network mask (default: 255.255.255.0)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Show device responses')
parser.add_argument('-w', '--wait', type=int, default=30,
help='Seconds to wait for connections (default: 30)')
args = parser.parse_args()
# Reconfigure all devices
ok_count = reconfig_devices(
ssid=args.ssid,
password=args.password,
start_ip=args.start_ip,
gateway=args.gateway,
netmask=args.netmask,
verbose=args.verbose
)
# Wait for connections
if ok_count > 0:
print(f"\nWaiting {args.wait}s for WiFi connections...")
time.sleep(args.wait)
print("Done!")
print()
print("Test commands:")
# Extract IP info
ip_parts = args.start_ip.split('.')
ip_base = '.'.join(ip_parts[:3])
ip_start = int(ip_parts[3])
num_devices = len(sorted(glob.glob('/dev/ttyUSB*')))
print(f" # Ping all devices")
print(f" for i in {{{ip_start}..{ip_start + num_devices - 1}}}; do ping -c 1 {ip_base}.$i & done; wait")
print()
print(f" # Check device status")
print(f" ./check_device_status.py --reset")
print()
print(f" # Test first device")
print(f" iperf -c {ip_base}.{ip_start}")
print()
sys.exit(0 if ok_count > 0 else 1)
if __name__ == '__main__':
main()

View File

@ -1,315 +0,0 @@
#!/usr/bin/env python3
import argparse
import glob
import re
import serial
import time
import sys
from serial.tools import list_ports
import json
import os
DEFAULT_PATTERN = "/dev/ttyUSB*"
MAP_FILE = os.path.expanduser("~/.reconfig_ipmap.json")
YELLOW_TOKENS = [
"NO WIFI CONFIG", "NO_WIFI_CONFIG", "NO CONFIG", "NO_CONFIG",
"YELLOW", "LED_STATE_NO_CONFIG"
]
IP_REGEX = re.compile(r'(?:(?:IP[ :]*|STA[ _-]*IP[ :]*|ADDR[ :]*|ADDRESS[ :]*))?(\d{1,3}(?:\.\d{1,3}){3})', re.IGNORECASE)
def eprint(*a, **kw):
print(*a, file=sys.stderr, **kw)
def detect_no_config(ser, verbose=False, settle=0.1, timeout=0.3, probes=(b"STATUS\n", b"IP\n"), deadline=None):
ser.timeout = timeout
ser.write_timeout = timeout
def now(): return time.time()
def read_and_collect(sleep_s=0.05):
buf = b""
# sleep but respect deadline
t_end = now() + sleep_s
while now() < t_end:
time.sleep(0.01)
try:
while True:
if deadline and now() >= deadline: break
chunk = ser.read(256)
if not chunk:
break
buf += chunk
except Exception:
pass
return buf.decode('utf-8', errors='ignore')
text = ""
# initial settle
t_end = now() + settle
while now() < t_end:
time.sleep(0.01)
text += read_and_collect(0.0)
# probes
for cmd in probes:
if deadline and now() >= deadline: break
try:
ser.write(cmd)
except Exception:
pass
text += read_and_collect(0.1)
if verbose and text.strip():
eprint("--- STATUS TEXT BEGIN ---")
eprint(text)
eprint("--- STATUS TEXT END ---")
utext = text.upper()
return any(tok in utext for tok in YELLOW_TOKENS), text
def parse_ip_from_text(text):
for m in IP_REGEX.finditer(text or ""):
ip = m.group(1)
try:
octs = [int(x) for x in ip.split(".")]
if all(0 <= x <= 255 for x in octs):
return ip
except Exception:
pass
return None
def next_free_ip(used_last_octets, start_ip_octet, max_octet=254):
x = start_ip_octet
while x <= max_octet:
if x not in used_last_octets:
used_last_octets.add(x)
return x
x += 1
raise RuntimeError("No free IPs left in the range")
def load_map(path):
if os.path.exists(path):
try:
with open(path, "r") as f:
return json.load(f)
except Exception:
return {}
return {}
def usb_serial_for_port(dev):
for p in list_ports.comports():
if p.device == dev:
return p.serial_number or p.hwid or dev
return dev
def configure_device(ser, ssid, password, ip, dhcp, verbose=False):
def writeln(s):
if isinstance(s, str):
s = s.encode()
ser.write(s + b"\n")
time.sleep(0.05)
time.sleep(0.15)
writeln("CFG")
writeln(f"SSID:{ssid}")
writeln(f"PASS:{password}")
if dhcp:
writeln("DHCP:1")
else:
writeln(f"IP:{ip}")
writeln("MASK:255.255.255.0")
writeln("GW:192.168.1.1")
writeln("DHCP:0")
writeln("END")
time.sleep(0.2)
resp = b""
try:
while True:
chunk = ser.read(256)
if not chunk:
break
resp += chunk
except Exception:
pass
text = resp.decode('utf-8', errors='ignore')
if verbose and text.strip():
eprint("--- CONFIG RESPONSE BEGIN ---")
eprint(text)
eprint("--- CONFIG RESPONSE END ---")
ok = ("OK" in text) or ("Saved" in text) or ("DONE" in text.upper())
return ok, text
def main():
parser = argparse.ArgumentParser(
description="Configure ESP32-S3 devices over serial. Fast, with strict per-device deadlines and exclude regex."
)
parser.add_argument("--ssid", default="ClubHouse2G", help="WiFi SSID")
parser.add_argument("--password", default="ez2remember", help="WiFi password")
parser.add_argument("--pattern", default=DEFAULT_PATTERN, help=f"Glob for serial ports (default: {DEFAULT_PATTERN})")
parser.add_argument("--exclude", default="", help="Regex of device paths to skip, e.g. 'ttyUSB10|ttyUSB11'")
parser.add_argument("--baud", type=int, default=115200, help="Serial baud rate")
parser.add_argument("--timeout", type=float, default=0.3, help="Serial read/write timeout (s)")
parser.add_argument("--settle", type=float, default=0.1, help="Settle delay before first read (s)")
parser.add_argument("--per-device-cap", type=float, default=1.2, help="Hard deadline seconds per device during probe")
parser.add_argument("--only-yellow", action="store_true",
help="Only program devices that appear to be in 'no WiFi config' (solid yellow) state")
parser.add_argument("--dhcp", action="store_true", help="Configure device for DHCP instead of static IP")
parser.add_argument("--start-ip", type=int, default=51, help="Starting host octet for static IPs (x in 192.168.1.x)")
parser.add_argument("--persist-map", action="store_true",
help=f"Persist USB-serial → IP assignments to {MAP_FILE} to keep continuity across runs")
parser.add_argument("--full-probes", action="store_true", help="Use extended probes (STATUS, STAT, GET STATUS, IP)")
parser.add_argument("--list", action="store_true", help="List ports with serial numbers and exit")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose status prints to stderr")
parser.add_argument("--dry-run", action="store_true", help="Do not send CFG/END; just print what would happen")
args = parser.parse_args()
if args.list:
print("Ports:")
for p in list_ports.comports():
print(f" {p.device:>12} sn={p.serial_number} desc={p.description}")
return
devices = sorted(glob.glob(args.pattern))
if args.exclude:
devices = [d for d in devices if not re.search(args.exclude, d)]
print(f"Found {len(devices)} devices matching {args.pattern}", flush=True)
if args.exclude:
print(f"Excluding devices matching /{args.exclude}/", flush=True)
ip_map = load_map(MAP_FILE) if args.persist_map else {}
used_last_octets = set()
prepass_info = {}
for i, dev in enumerate(devices):
print(f"[pre] {i+1}/{len(devices)} probing {dev}", flush=True)
start_t = time.time()
already_ip = None
no_cfg = False
try:
ser = serial.Serial(
dev,
args.baud,
timeout=args.timeout,
write_timeout=args.timeout,
rtscts=False,
dsrdtr=False,
xonxoff=False,
)
# gentle DTR/RTS toggle
try:
ser.dtr = False; ser.rts = False; time.sleep(0.02)
ser.dtr = True; ser.rts = True; time.sleep(0.02)
except Exception:
pass
probes = (b"STATUS\n", b"IP\n") if not args.full_probes else (b"STATUS\n", b"STAT\n", b"GET STATUS\n", b"IP\n")
deadline = start_t + max(0.4, args.per_device_cap)
no_cfg, text = detect_no_config(
ser, verbose=args.verbose, settle=args.settle,
timeout=args.timeout, probes=probes, deadline=deadline
)
already_ip = parse_ip_from_text(text)
ser.close()
except Exception as e:
eprint(f" [warn] {dev} probe error: {e}")
dur = time.time() - start_t
print(f" → no_cfg={no_cfg} ip={already_ip} ({dur:.2f}s)", flush=True)
prepass_info[dev] = {"no_cfg": no_cfg, "ip": already_ip}
if already_ip and not args.dhcp:
try:
last = int(already_ip.split(".")[-1])
used_last_octets.add(last)
except Exception:
pass
ok_devices = 0
skipped = 0
errors = 0
for idx, dev in enumerate(devices):
info = prepass_info.get(dev, {})
already_ip = info.get("ip")
no_cfg = info.get("no_cfg", False)
usb_key = usb_serial_for_port(dev)
if already_ip and not args.dhcp:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: already has {already_ip} → skip", flush=True)
skipped += 1
if args.persist_map:
ip_map[usb_key] = already_ip
continue
if args.only_yellow and not no_cfg:
print(f"[cfg] {idx+1}/{len(devices)} {dev}: not yellow/no-config → skip", flush=True)
skipped += 1
continue
# pick target IP
if args.dhcp:
target_ip = None
mode = "DHCP"
else:
target_last_octet = None
if args.persist_map and usb_key in ip_map:
try:
prev_ip = ip_map[usb_key]
target_last_octet = int(prev_ip.split(".")[-1])
if target_last_octet in used_last_octets:
target_last_octet = None
except Exception:
target_last_octet = None
if target_last_octet is None:
target_last_octet = next_free_ip(used_last_octets, args.start_ip, 254)
target_ip = f"192.168.1.{target_last_octet}"
mode = f"Static {target_ip}"
print(f"[cfg] {idx+1}/{len(devices)} {dev}: configuring ({mode})", flush=True)
if args.dry_run:
print(" (dry-run) Would send CFG/END", flush=True)
ok = True
else:
try:
ser = serial.Serial(dev, args.baud, timeout=args.timeout, write_timeout=args.timeout)
ok, resp = configure_device(ser, args.ssid, args.password, target_ip, args.dhcp, verbose=args.verbose)
ser.close()
except Exception as e:
print(f" ✗ Error opening/configuring: {e}", flush=True)
ok = False
if ok:
print(" ✓ OK", flush=True)
ok_devices += 1
if not args.dhcp and args.persist_map and target_ip:
ip_map[usb_key] = target_ip
else:
print(" ✗ Failed", flush=True)
errors += 1
time.sleep(0.05)
if args.persist_map:
try:
with open(MAP_FILE, "w") as f:
json.dump(ip_map, f, indent=2, sort_keys=True)
print(f"Persisted mapping to {MAP_FILE}", flush=True)
except Exception as e:
print(f"Warning: could not save mapping to {MAP_FILE}: {e}", flush=True)
print(f"Summary: OK={ok_devices} Skipped={skipped} Errors={errors} Total={len(devices)}", flush=True)
if __name__ == "__main__":
main()