250 lines
8.6 KiB
Python
Executable File
250 lines
8.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Unified udev rules generator for ESP32 devices.
|
|
Combines functionality from gen_udev_rules.py and append_new_rules.py.
|
|
|
|
Features:
|
|
- Generate rules from scratch (full scan)
|
|
- Append new rules to existing file (incremental update)
|
|
- Uses ID_PATH for stable device identification
|
|
- Sorts devices by physical topology for consistent ordering
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
from pathlib import Path
|
|
from serial.tools import list_ports
|
|
|
|
class Colors:
|
|
GREEN = '\033[92m'
|
|
RED = '\033[91m'
|
|
YELLOW = '\033[93m'
|
|
BLUE = '\033[94m'
|
|
CYAN = '\033[96m'
|
|
RESET = '\033[0m'
|
|
|
|
# Default paths
|
|
DEFAULT_RULES_FILE = "/etc/udev/rules.d/99-esp32-stable.rules"
|
|
TEMP_RULES_FILE = "new_rules.part"
|
|
|
|
def get_existing_rules(rules_path):
|
|
"""
|
|
Parse existing rules file to extract:
|
|
- Set of ID_PATH values already mapped
|
|
- Maximum port number used
|
|
Returns: (known_paths, max_port)
|
|
"""
|
|
known_paths = set()
|
|
max_port = 0
|
|
|
|
if not os.path.exists(rules_path):
|
|
return known_paths, 0
|
|
|
|
regex_path = re.compile(r'ENV\{ID_PATH\}=="([^"]+)"')
|
|
regex_port = re.compile(r'esp_port_(\d+)')
|
|
|
|
try:
|
|
with open(rules_path, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
path_match = regex_path.search(line)
|
|
port_match = regex_port.search(line)
|
|
|
|
if path_match:
|
|
known_paths.add(path_match.group(1))
|
|
if port_match:
|
|
port_num = int(port_match.group(1))
|
|
if port_num > max_port:
|
|
max_port = port_num
|
|
except Exception as e:
|
|
print(f"{Colors.RED}Error reading rules file: {e}{Colors.RESET}")
|
|
return known_paths, 0
|
|
|
|
return known_paths, max_port
|
|
|
|
def scan_usb_devices():
|
|
"""
|
|
Scan all USB serial devices and return list of (device, id_path, location) tuples.
|
|
Uses udevadm to get stable ID_PATH identifiers.
|
|
"""
|
|
devices = []
|
|
|
|
# Get all USB serial devices
|
|
usb_devices = list(list_ports.grep("USB|ACM|CP210|FT232"))
|
|
|
|
if not usb_devices:
|
|
return devices
|
|
|
|
# Sort by physical location for consistent ordering
|
|
usb_devices.sort(key=lambda x: x.location if x.location else x.device)
|
|
|
|
for dev in usb_devices:
|
|
try:
|
|
# Get ID_PATH via udevadm (most stable identifier)
|
|
cmd = ['udevadm', 'info', '--name', dev.device, '--query=property']
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=2)
|
|
|
|
if proc.returncode != 0:
|
|
continue
|
|
|
|
props = dict(line.split('=', 1) for line in proc.stdout.splitlines() if '=' in line)
|
|
id_path = props.get('ID_PATH', '')
|
|
|
|
if not id_path:
|
|
continue
|
|
|
|
devices.append({
|
|
'device': dev.device,
|
|
'id_path': id_path,
|
|
'location': dev.location or '',
|
|
'serial': props.get('ID_SERIAL_SHORT', '')
|
|
})
|
|
except Exception as e:
|
|
print(f"{Colors.YELLOW}Warning: Could not inspect {dev.device}: {e}{Colors.RESET}")
|
|
continue
|
|
|
|
return devices
|
|
|
|
def generate_rules(devices, start_port=1):
|
|
"""
|
|
Generate udev rules for the given devices.
|
|
Returns list of rule strings.
|
|
"""
|
|
rules = []
|
|
port_num = start_port
|
|
|
|
for dev in devices:
|
|
symlink = f"esp_port_{port_num}"
|
|
rule = f'SUBSYSTEM=="tty", ENV{{ID_PATH}}=="{dev["id_path"]}", SYMLINK+="{symlink}"'
|
|
rules.append(rule)
|
|
port_num += 1
|
|
|
|
return rules
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate or update udev rules for ESP32 devices',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Generate rules from scratch (overwrites existing)
|
|
%(prog)s --full
|
|
|
|
# Append only new devices to existing rules
|
|
%(prog)s --append
|
|
|
|
# Dry run (show what would be generated)
|
|
%(prog)s --append --dry-run
|
|
"""
|
|
)
|
|
parser.add_argument('--full', action='store_true',
|
|
help='Generate complete rules file from scratch')
|
|
parser.add_argument('--append', action='store_true',
|
|
help='Append only new devices to existing rules')
|
|
parser.add_argument('--rules-file', default=DEFAULT_RULES_FILE,
|
|
help=f'Path to udev rules file (default: {DEFAULT_RULES_FILE})')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be done without writing files')
|
|
parser.add_argument('--output', help='Write to custom file instead of rules file')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Default to append mode if neither specified
|
|
if not args.full and not args.append:
|
|
args.append = True
|
|
|
|
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}")
|
|
print(f" ESP32 Udev Rules Generator")
|
|
print(f"{Colors.BLUE}{'='*60}{Colors.RESET}\n")
|
|
|
|
# Scan connected devices
|
|
print(f"{Colors.CYAN}Scanning USB topology...{Colors.RESET}")
|
|
all_devices = scan_usb_devices()
|
|
|
|
if not all_devices:
|
|
print(f"{Colors.RED}No USB serial devices found.{Colors.RESET}")
|
|
return 1
|
|
|
|
print(f"Found {len(all_devices)} USB serial devices\n")
|
|
|
|
if args.full:
|
|
# Generate complete rules file
|
|
print(f"{Colors.CYAN}Generating complete rules file...{Colors.RESET}")
|
|
rules = generate_rules(all_devices, start_port=1)
|
|
output_file = args.output or args.rules_file
|
|
|
|
print(f"\n{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
|
|
print("-" * 65)
|
|
for i, dev in enumerate(all_devices):
|
|
port_num = i + 1
|
|
symlink = f"esp_port_{port_num}"
|
|
print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}")
|
|
|
|
if not args.dry_run:
|
|
rules_content = "# Auto-generated by gen_udev_rules.py\n"
|
|
rules_content += "# Stable port mapping based on USB physical topology (ID_PATH)\n\n"
|
|
rules_content += "\n".join(rules) + "\n"
|
|
|
|
if output_file == args.rules_file and os.geteuid() != 0:
|
|
# Write to temp file for user to copy
|
|
temp_file = "99-esp32-stable.rules"
|
|
with open(temp_file, 'w') as f:
|
|
f.write(rules_content)
|
|
print(f"\n{Colors.YELLOW}Rules written to: {temp_file}{Colors.RESET}")
|
|
print(f"To install, run:")
|
|
print(f" {Colors.GREEN}sudo cp {temp_file} {args.rules_file}{Colors.RESET}")
|
|
else:
|
|
with open(output_file, 'w') as f:
|
|
f.write(rules_content)
|
|
print(f"\n{Colors.GREEN}Rules written to: {output_file}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.YELLOW}DRY RUN: Would generate {len(rules)} rules{Colors.RESET}")
|
|
|
|
else: # append mode
|
|
# Read existing rules
|
|
known_paths, max_port = get_existing_rules(args.rules_file)
|
|
print(f"Existing rules: {len(known_paths)} devices mapped (Max Port: {max_port})")
|
|
|
|
# Find new devices
|
|
new_devices = [d for d in all_devices if d['id_path'] not in known_paths]
|
|
|
|
if not new_devices:
|
|
print(f"\n{Colors.GREEN}All connected devices are already mapped. No changes needed.{Colors.RESET}")
|
|
return 0
|
|
|
|
print(f"\n{Colors.CYAN}Found {len(new_devices)} UNMAPPED devices:{Colors.RESET}")
|
|
print(f"{'Physical Path':<20} | {'Current Dev':<15} | {'Assigned Symlink'}")
|
|
print("-" * 65)
|
|
|
|
new_rules = generate_rules(new_devices, start_port=max_port + 1)
|
|
for i, dev in enumerate(new_devices):
|
|
port_num = max_port + 1 + i
|
|
symlink = f"esp_port_{port_num:02d}"
|
|
print(f"{dev['location']:<20} | {dev['device']:<15} | {symlink}")
|
|
|
|
if not args.dry_run:
|
|
# Write new rules to temp file
|
|
with open(TEMP_RULES_FILE, 'w') as f:
|
|
f.write("\n# --- Added by gen_udev_rules.py ---\n")
|
|
f.write("\n".join(new_rules) + "\n")
|
|
|
|
print(f"\n{Colors.CYAN}New rules saved to: {TEMP_RULES_FILE}{Colors.RESET}")
|
|
print(f"To apply, run:")
|
|
print(f" {Colors.GREEN}cat {TEMP_RULES_FILE} | sudo tee -a {args.rules_file}{Colors.RESET}")
|
|
print(f" {Colors.GREEN}sudo udevadm control --reload-rules && sudo udevadm trigger{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.YELLOW}DRY RUN: Would append {len(new_rules)} rules{Colors.RESET}")
|
|
|
|
print(f"\n{Colors.BLUE}{'='*60}{Colors.RESET}")
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|