fixes to deploy to update udev rules
This commit is contained in:
parent
538d203185
commit
c640bc4df7
125
esp32_deploy.py
125
esp32_deploy.py
|
|
@ -12,6 +12,8 @@ import logging
|
||||||
import glob
|
import glob
|
||||||
import random
|
import random
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from serial.tools import list_ports
|
||||||
|
import subprocess
|
||||||
|
|
||||||
# Ensure detection script is available
|
# Ensure detection script is available
|
||||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
|
@ -56,13 +58,35 @@ def generate_config_suffix(target, csi, ampdu):
|
||||||
return f"{target}_{csi_str}_{ampdu_str}"
|
return f"{target}_{csi_str}_{ampdu_str}"
|
||||||
|
|
||||||
def auto_detect_devices():
|
def auto_detect_devices():
|
||||||
"""Prioritizes static udev paths (/dev/esp_port_XX) if they exist."""
|
"""Prioritizes static udev paths (/dev/esp_port_XX) and removes duplicates."""
|
||||||
try:
|
try:
|
||||||
ports = glob.glob('/dev/esp_port_*')
|
ports = glob.glob('/dev/esp_port_*')
|
||||||
if ports:
|
if ports:
|
||||||
|
# --- New Deduplication Logic ---
|
||||||
|
unique_map = {}
|
||||||
|
for p in ports:
|
||||||
|
try:
|
||||||
|
# Resolve symlink (e.g., /dev/esp_port_01 -> /dev/ttyUSB0)
|
||||||
|
real_path = os.path.realpath(p)
|
||||||
|
|
||||||
|
if real_path not in unique_map:
|
||||||
|
unique_map[real_path] = p
|
||||||
|
else:
|
||||||
|
# Conflict! We have both esp_port_1 and esp_port_01.
|
||||||
|
# Keep the "shorter" one (esp_port_1) to match your new scheme.
|
||||||
|
current_alias = unique_map[real_path]
|
||||||
|
if len(p) < len(current_alias):
|
||||||
|
unique_map[real_path] = p
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Use the filtered list
|
||||||
|
ports = list(unique_map.values())
|
||||||
|
# -------------------------------
|
||||||
|
|
||||||
# Sort by suffix number
|
# Sort by suffix number
|
||||||
ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0)
|
ports.sort(key=lambda x: int(re.search(r'(\d+)$', x).group(1)) if re.search(r'(\d+)$', x) else 0)
|
||||||
print(f"{Colors.CYAN}Auto-detected {len(ports)} devices using static udev rules.{Colors.RESET}")
|
print(f"{Colors.CYAN}Auto-detected {len(ports)} devices (filtered from {len(unique_map) + (len(glob.glob('/dev/esp_port_*')) - len(unique_map))} aliases).{Colors.RESET}")
|
||||||
return [type('obj', (object,), {'device': p}) for p in ports]
|
return [type('obj', (object,), {'device': p}) for p in ports]
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
@ -344,6 +368,81 @@ class UnifiedDeployWorker:
|
||||||
self.log.error(f"Flash Prep Error: {e}")
|
self.log.error(f"Flash Prep Error: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def update_udev_map(dry_run=False):
|
||||||
|
"""
|
||||||
|
Scans all USB serial devices, sorts them by physical topology (Bus/Port),
|
||||||
|
and generates a udev rule file to map them to /dev/esp_port_XX.
|
||||||
|
"""
|
||||||
|
print(f"{Colors.BLUE}Scanning USB topology to generate stable port maps...{Colors.RESET}")
|
||||||
|
|
||||||
|
# Get all USB serial devices
|
||||||
|
devices = list(list_ports.grep("USB|ACM|CP210|FT232"))
|
||||||
|
|
||||||
|
if not devices:
|
||||||
|
print(f"{Colors.RED}No devices found.{Colors.RESET}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Sort by "location" (Physical USB path: e.g., 1-1.2.3)
|
||||||
|
# This guarantees esp_port_01 is always the first physical port.
|
||||||
|
devices.sort(key=lambda x: x.location if x.location else x.device)
|
||||||
|
|
||||||
|
generated_rules = []
|
||||||
|
print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
|
||||||
|
print("-" * 65)
|
||||||
|
|
||||||
|
for i, dev in enumerate(devices):
|
||||||
|
port_num = i + 1
|
||||||
|
symlink = f"esp_port_{port_num}" # e.g., esp_port_1
|
||||||
|
|
||||||
|
# Get detailed udev info to find the stable physical path ID
|
||||||
|
try:
|
||||||
|
cmd = ['udevadm', 'info', '--name', dev.device, '--query=property']
|
||||||
|
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line)
|
||||||
|
|
||||||
|
# ID_PATH is the robust physical identifier (e.g., pci-0000:00:14.0-usb-0:1.4.3:1.0)
|
||||||
|
dev_path = props.get('ID_PATH', '')
|
||||||
|
|
||||||
|
if not dev_path:
|
||||||
|
print(f"{Colors.YELLOW}Skipping {dev.device} (No ID_PATH found){Colors.RESET}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Generate the rule
|
||||||
|
rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev_path}", SYMLINK+="{symlink}"'
|
||||||
|
generated_rules.append(rule)
|
||||||
|
|
||||||
|
print(f"{dev.location:<20} | {dev.device:<15} | {symlink}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error inspecting {dev.device}: {e}")
|
||||||
|
|
||||||
|
print("-" * 65)
|
||||||
|
|
||||||
|
rules_content = "# Auto-generated by esp32_deploy.py\n" + "\n".join(generated_rules) + "\n"
|
||||||
|
rule_file = "/etc/udev/rules.d/99-esp32-stable.rules"
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
print(f"\n{Colors.YELLOW}--- DRY RUN: Rules that would be written to {rule_file} ---{Colors.RESET}")
|
||||||
|
print(rules_content)
|
||||||
|
else:
|
||||||
|
if os.geteuid() != 0:
|
||||||
|
print(f"\n{Colors.RED}ERROR: Root privileges required to write udev rules.{Colors.RESET}")
|
||||||
|
print(f"Run: sudo ./esp32_deploy.py --map-ports")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"\nWriting rules to {rule_file}...")
|
||||||
|
try:
|
||||||
|
with open(rule_file, 'w') as f:
|
||||||
|
f.write(rules_content)
|
||||||
|
|
||||||
|
print("Reloading udev rules...")
|
||||||
|
subprocess.run(['udevadm', 'control', '--reload-rules'], check=True)
|
||||||
|
subprocess.run(['udevadm', 'trigger'], check=True)
|
||||||
|
print(f"{Colors.GREEN}Success! Devices re-mapped.{Colors.RESET}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{Colors.RED}Failed to write rules: {e}{Colors.RESET}")
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool')
|
parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool')
|
||||||
parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options')
|
parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options')
|
||||||
|
|
@ -380,8 +479,9 @@ def parse_args():
|
||||||
parser.add_argument('-M', '--mode', default='STA')
|
parser.add_argument('-M', '--mode', default='STA')
|
||||||
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
|
parser.add_argument('-mc', '--monitor-channel', type=int, default=36)
|
||||||
parser.add_argument('--csi', dest='csi_enable', action='store_true')
|
parser.add_argument('--csi', dest='csi_enable', action='store_true')
|
||||||
|
parser.add_argument('--map-ports', action='store_true', help="Rescan USB topology and generate udev rules for esp_port_xx")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if args.target != 'all' and not args.start_ip and not args.check_version:
|
if args.target != 'all' and not args.start_ip and not args.check_version and not args.map_ports:
|
||||||
parser.error("the following arguments are required: --start-ip")
|
parser.error("the following arguments are required: --start-ip")
|
||||||
if args.config_only and args.flash_only: parser.error("Conflicting modes")
|
if args.config_only and args.flash_only: parser.error("Conflicting modes")
|
||||||
return args
|
return args
|
||||||
|
|
@ -553,9 +653,22 @@ async def run_deployment(args):
|
||||||
print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}")
|
print(f"\n{Colors.BLUE}Summary: {success}/{len(devs)} Success{Colors.RESET}")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if os.name == 'nt': asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
args = parse_args()
|
||||||
try: asyncio.run(run_deployment(parse_args()))
|
|
||||||
except KeyboardInterrupt: sys.exit(1)
|
# --- INTERCEPT --map-ports HERE ---
|
||||||
|
if args.map_ports:
|
||||||
|
# Run synchronously, no async loop needed
|
||||||
|
update_udev_map(dry_run=False)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Standard async deployment flow
|
||||||
|
if os.name == 'nt':
|
||||||
|
asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(run_deployment(args))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue