#!/usr/bin/env python3 import asyncio import argparse import serial_asyncio import sys import re class SerialController(asyncio.Protocol): def __init__(self, port_name, command, loop, completion_future): self.port_name = port_name self.command = command self.loop = loop self.transport = None self.response_buffer = "" self.completion_future = completion_future self.target_keyword = "IPERF_STARTED" if "start" in command else "IPERF_STOPPED" def connection_made(self, transport): self.transport = transport # 1. Clear line noise transport.write(b'\n') # 2. Schedule command self.loop.create_task(self.send_command()) async def send_command(self): await asyncio.sleep(0.1) # SPACE separated subcommand: "iperf start" or "iperf stop" full_cmd = f"iperf {self.command}\n" self.transport.write(full_cmd.encode()) def data_received(self, data): text = data.decode(errors='ignore') self.response_buffer += text # Check for confirmation keyword if self.target_keyword in self.response_buffer: if not self.completion_future.done(): self.completion_future.set_result(True) self.transport.close() def connection_lost(self, exc): if not self.completion_future.done(): # If we closed it intentionally (set_result called), this is fine. # If it closed unexpectedly, set exception. self.completion_future.set_exception(exc if exc else Exception("Connection closed without confirmation")) async def run_single_device(port, action): loop = asyncio.get_running_loop() completion_future = loop.create_future() transport = None try: transport, protocol = await serial_asyncio.create_serial_connection( loop, lambda: SerialController(port, action, loop, completion_future), port, baudrate=115200 ) # Wait for success or timeout await asyncio.wait_for(completion_future, timeout=5.0) print(f"[{port}] {action.upper()} SUCCESS") return True except asyncio.TimeoutError: print(f"[{port}] TIMEOUT (No confirmation received)") return False except Exception as e: print(f"[{port}] FAILED: {e}") return False finally: if transport and not transport.is_closing(): transport.close() def expand_devices(device_str): """ Expands device strings like: - "/dev/ttyUSB0, /dev/ttyUSB1" -> ['/dev/ttyUSB0', '/dev/ttyUSB1'] - "/dev/ttyUSB0-5" -> ['/dev/ttyUSB0', ... '/dev/ttyUSB5'] """ devices = [] parts = [d.strip() for d in device_str.split(',')] for part in parts: # Check for range syntax (e.g. /dev/ttyUSB0-29) # Matches "prefix" + "start_num" + "-" + "end_num" match = re.match(r'^(.*?)(\d+)-(\d+)$', part) if match: prefix = match.group(1) start = int(match.group(2)) end = int(match.group(3)) step = 1 if end >= start else -1 for i in range(start, end + step, step): devices.append(f"{prefix}{i}") else: devices.append(part) return devices async def main(): parser = argparse.ArgumentParser(description='Control ESP32 iperf concurrently') parser.add_argument('action', choices=['start', 'stop'], help='Action to perform') parser.add_argument('--devices', required=True, help='Device list (e.g., "/dev/ttyUSB0-29" or "/dev/ttyUSB0,/dev/ttyUSB1")') args = parser.parse_args() if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) # 1. Expand device list device_list = expand_devices(args.devices) print(f"Targeting {len(device_list)} devices for '{args.action.upper()}'...") # 2. Create tasks for all devices tasks = [run_single_device(dev, args.action) for dev in device_list] # 3. Run all concurrently results = await asyncio.gather(*tasks) # 4. Summary success_count = results.count(True) print(f"\nSummary: {success_count}/{len(device_list)} Succeeded") if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: pass #!/usr/bin/env python3 import asyncio import argparse import serial_asyncio import sys class SerialController(asyncio.Protocol): def __init__(self, command, loop): self.command = command self.loop = loop self.transport = None self.response_buffer = "" # Keywords the firmware will print to confirm action self.target_keyword = "IPERF_STARTED" if "start" in command else "IPERF_STOPPED" def connection_made(self, transport): self.transport = transport print(f"Connected. Sending: iperf {self.command}") # 1. Clear line noise transport.write(b'\n') # 2. Schedule command self.loop.create_task(self.send_command()) async def send_command(self): await asyncio.sleep(0.1) # SPACE separated subcommand: "iperf start" or "iperf stop" full_cmd = f"iperf {self.command}\n" self.transport.write(full_cmd.encode()) def data_received(self, data): text = data.decode(errors='ignore') sys.stdout.write(text) # Echo firmware output to console self.response_buffer += text # Check for confirmation keyword if self.target_keyword in self.response_buffer: print(f"\n[SUCCESS] Confirmed: {self.target_keyword}") self.transport.close() self.loop.stop() def connection_lost(self, exc): if exc: print(f"Serial connection lost: {exc}") self.loop.stop() async def run_control(port, action): loop = asyncio.get_running_loop() try: transport, protocol = await serial_asyncio.create_serial_connection( loop, lambda: SerialController(action, loop), port, baudrate=115200 ) except Exception as e: print(f"Failed to open port {port}: {e}") return # Safety timeout: 5 seconds try: await asyncio.wait_for(loop.create_future(), timeout=5.0) except asyncio.TimeoutError: print("\n[TIMEOUT] Firmware did not respond with confirmation keyword.") except asyncio.CancelledError: pass if transport and not transport.is_closing(): transport.close() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Control ESP32 iperf via USB') parser.add_argument('port', help='Serial port (e.g., /dev/ttyUSB0)') parser.add_argument('action', choices=['start', 'stop'], help='Action to perform') args = parser.parse_args() if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) try: asyncio.run(run_control(args.port, args.action)) except KeyboardInterrupt: pass