From c640bc4df746a749e87108f716a52178fbfb5899 Mon Sep 17 00:00:00 2001 From: Robert McMahon Date: Thu, 18 Dec 2025 17:53:11 -0800 Subject: [PATCH] fixes to deploy to update udev rules --- esp32_deploy.py | 125 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 119 insertions(+), 6 deletions(-) diff --git a/esp32_deploy.py b/esp32_deploy.py index e184360..f238c0b 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -12,6 +12,8 @@ import logging import glob import random from pathlib import Path +from serial.tools import list_ports +import subprocess # Ensure detection script is available sys.path.append(os.path.dirname(os.path.abspath(__file__))) @@ -56,13 +58,35 @@ def generate_config_suffix(target, csi, ampdu): return f"{target}_{csi_str}_{ampdu_str}" def auto_detect_devices(): - """Prioritizes static udev paths (/dev/esp_port_XX) if they exist.""" + """Prioritizes static udev paths (/dev/esp_port_XX) and removes duplicates.""" try: ports = glob.glob('/dev/esp_port_*') if ports: + # --- New Deduplication Logic --- + unique_map = {} + for p in ports: + try: + # Resolve symlink (e.g., /dev/esp_port_01 -> /dev/ttyUSB0) + real_path = os.path.realpath(p) + + if real_path not in unique_map: + unique_map[real_path] = p + else: + # Conflict! We have both esp_port_1 and esp_port_01. + # Keep the "shorter" one (esp_port_1) to match your new scheme. + current_alias = unique_map[real_path] + if len(p) < len(current_alias): + unique_map[real_path] = p + except OSError: + continue + + # Use the filtered list + ports = list(unique_map.values()) + # ------------------------------- + # Sort by suffix number ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0) - print(f"{Colors.CYAN}Auto-detected {len(ports)} devices using static udev rules.{Colors.RESET}") + print(f"{Colors.CYAN}Auto-detected {len(ports)} devices (filtered from {len(unique_map) + (len(glob.glob('/dev/esp_port_*')) - len(unique_map))} aliases).{Colors.RESET}") return [type('obj', (object,), {'device': p}) for p in ports] except Exception: pass @@ -344,6 +368,81 @@ class UnifiedDeployWorker: self.log.error(f"Flash Prep Error: {e}") return False +def update_udev_map(dry_run=False): + """ + Scans all USB serial devices, sorts them by physical topology (Bus/Port), + and generates a udev rule file to map them to /dev/esp_port_XX. + """ + print(f"{Colors.BLUE}Scanning USB topology to generate stable port maps...{Colors.RESET}") + + # Get all USB serial devices + devices = list(list_ports.grep("USB|ACM|CP210|FT232")) + + if not devices: + print(f"{Colors.RED}No devices found.{Colors.RESET}") + return + + # Sort by "location" (Physical USB path: e.g., 1-1.2.3) + # This guarantees esp_port_01 is always the first physical port. + devices.sort(key=lambda x: x.location if x.location else x.device) + + generated_rules = [] + print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}") + print("-" * 65) + + for i, dev in enumerate(devices): + port_num = i + 1 + symlink = f"esp_port_{port_num}" # e.g., esp_port_1 + + # Get detailed udev info to find the stable physical path ID + try: + cmd = ['udevadm', 'info', '--name', dev.device, '--query=property'] + proc = subprocess.run(cmd, capture_output=True, text=True) + props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line) + + # ID_PATH is the robust physical identifier (e.g., pci-0000:00:14.0-usb-0:1.4.3:1.0) + dev_path = props.get('ID_PATH', '') + + if not dev_path: + print(f"{Colors.YELLOW}Skipping {dev.device} (No ID_PATH found){Colors.RESET}") + continue + + # Generate the rule + rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev_path}", SYMLINK+="{symlink}"' + generated_rules.append(rule) + + print(f"{dev.location:<20} | {dev.device:<15} | {symlink}") + + except Exception as e: + print(f"Error inspecting {dev.device}: {e}") + + print("-" * 65) + + rules_content = "# Auto-generated by esp32_deploy.py\n" + "\n".join(generated_rules) + "\n" + rule_file = "/etc/udev/rules.d/99-esp32-stable.rules" + + if dry_run: + print(f"\n{Colors.YELLOW}--- DRY RUN: Rules that would be written to {rule_file} ---{Colors.RESET}") + print(rules_content) + else: + if os.geteuid() != 0: + print(f"\n{Colors.RED}ERROR: Root privileges required to write udev rules.{Colors.RESET}") + print(f"Run: sudo ./esp32_deploy.py --map-ports") + return + + print(f"\nWriting rules to {rule_file}...") + try: + with open(rule_file, 'w') as f: + f.write(rules_content) + + print("Reloading udev rules...") + subprocess.run(['udevadm', 'control', '--reload-rules'], check=True) + subprocess.run(['udevadm', 'trigger'], check=True) + print(f"{Colors.GREEN}Success! Devices re-mapped.{Colors.RESET}") + + except Exception as e: + print(f"{Colors.RED}Failed to write rules: {e}{Colors.RESET}") + def parse_args(): parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options') @@ -380,8 +479,9 @@ def parse_args(): parser.add_argument('-M', '--mode', default='STA') parser.add_argument('-mc', '--monitor-channel', type=int, default=36) parser.add_argument('--csi', dest='csi_enable', action='store_true') + parser.add_argument('--map-ports', action='store_true', help="Rescan USB topology and generate udev rules for esp_port_xx") args = parser.parse_args() - if args.target != 'all' and not args.start_ip and not args.check_version: + if args.target != 'all' and not args.start_ip and not args.check_version and not args.map_ports: parser.error("the following arguments are required: --start-ip") if args.config_only and args.flash_only: parser.error("Conflicting modes") return args @@ -553,9 +653,22 @@ async def run_deployment(args): print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}") def main(): - if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop()) - try: asyncio.run(run_deployment(parse_args())) - except KeyboardInterrupt: sys.exit(1) + args = parse_args() + + # --- INTERCEPT --map-ports HERE --- + if args.map_ports: + # Run synchronously, no async loop needed + update_udev_map(dry_run=False) + sys.exit(0) + + # Standard async deployment flow + if os.name == 'nt': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + + try: + asyncio.run(run_deployment(args)) + except KeyboardInterrupt: + sys.exit(1) if __name__ == '__main__': main()