#!/usr/bin/env python3 import asyncio import argparse import serial_asyncio import sys import re import glob # --- 1. Protocol Class (Async Logic) --- class SerialController(asyncio.Protocol): def __init__(self, port_name, args, loop, completion_future, sync_event): self.port_name = port_name self.args = args self.loop = loop self.transport = None self.buffer = "" self.completion_future = completion_future self.sync_event = sync_event # Global trigger for tight sync self.status_data = {} # Command Construction if args.action == 'pps': self.cmd_str = f"iperf pps {args.value}\n" self.target_key = "IPERF_PPS_UPDATED" elif args.action == 'status': self.cmd_str = "iperf status\n" self.target_key = "IPERF_STATUS" elif args.action == 'start': self.cmd_str = "iperf start\n" self.target_key = "IPERF_STARTED" elif args.action == 'stop': self.cmd_str = "iperf stop\n" self.target_key = "IPERF_STOPPED" def connection_made(self, transport): self.transport = transport # 1. Wake up the device immediately transport.write(b'\n') # 2. Schedule the command to wait for the global trigger self.loop.create_task(self.await_trigger_and_send()) async def await_trigger_and_send(self): # 3. Wait here until ALL devices are connected and ready await self.sync_event.wait() # 4. FIRE! self.transport.write(self.cmd_str.encode()) def data_received(self, data): self.buffer += data.decode(errors='ignore') while '\n' in self.buffer: line, self.buffer = self.buffer.split('\n', 1) line = line.strip() # --- Status Parsing --- if self.args.action == 'status': if "IPERF_STATUS" in line: m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line) if m: self.status_data['main'] = { 'src': m.group(1), 'dst': m.group(2), 'run': "Run" if m.group(3) == '1' else "Stop", 'cfg': m.group(4), 'act': m.group(5), 'err': m.group(6), 'pkts': m.group(7), 'bw': m.group(8) } elif "IPERF_STATES" in line: m = re.search(r'TX=([\d\.]+)s/([\d\.]+)% \((\d+)\), SLOW=([\d\.]+)s/([\d\.]+)% \((\d+)\), STALLED=([\d\.]+)s/([\d\.]+)% \((\d+)\)', line) if m: self.status_data['states'] = { 'tx_t': m.group(1), 'tx_p': m.group(2), 'tx_c': m.group(3), 'sl_t': m.group(4), 'sl_p': m.group(5), 'sl_c': m.group(6), 'st_t': m.group(7), 'st_p': m.group(8), 'st_c': m.group(9) } if 'main' in self.status_data and 'states' in self.status_data: if not self.completion_future.done(): d = self.status_data['main'] s = self.status_data['states'] output = (f"{d['src']} -> {d['dst']} | {d['run']}, " f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | " f"TX:{s['tx_t']}s/{s['tx_p']}%({s['tx_c']}) " f"SL:{s['sl_t']}s/{s['sl_p']}%({s['sl_c']}) " f"ST:{s['st_t']}s/{s['st_p']}%({s['st_c']})") self.completion_future.set_result(output) self.transport.close() return # --- Simple Command Parsing --- else: if self.target_key in line: if not self.completion_future.done(): self.completion_future.set_result(True) self.transport.close() return def connection_lost(self, exc): if not self.completion_future.done(): if self.args.action == 'status' and 'main' in self.status_data: d = self.status_data['main'] output = (f"{d['src']} -> {d['dst']} | {d['run']}, " f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)") self.completion_future.set_result(output) else: self.completion_future.set_exception(Exception("Closed")) # --- 2. Helper Functions --- def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS') # Updated help text parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'") args = parser.parse_args() if args.value_arg is not None: args.value = args.value_arg if args.action == 'pps' and args.value is None: print("Error: 'pps' action requires a value") sys.exit(1) return args def natural_sort_key(s): """Sorts strings with numbers naturally (USB2 before USB10)""" return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)] def expand_devices(device_str): # NEW: Handle "all" case if device_str.lower() == 'all': # Find all ttyUSB devices devices = glob.glob('/dev/ttyUSB*') # Sort them numerically so output is clean devices.sort(key=natural_sort_key) if not devices: print("Error: No /dev/ttyUSB* devices found!") sys.exit(1) return devices devices = [] parts = [d.strip() for d in device_str.split(',')] for part in parts: match = re.match(r'^(.*?)(\d+)-(\d+)$', part) if match: prefix, start, end = match.group(1), int(match.group(2)), int(match.group(3)) step = 1 if end >= start else -1 for i in range(start, end + step, step): devices.append(f"{prefix}{i}") else: devices.append(part) return devices # --- 3. Async Entry Point --- async def run_device(port, args, sync_event): loop = asyncio.get_running_loop() fut = loop.create_future() try: await serial_asyncio.create_serial_connection( loop, lambda: SerialController(port, args, loop, fut, sync_event), port, baudrate=115200) # Wait for the result (includes the wait for the event inside the protocol) return await asyncio.wait_for(fut, timeout=5.0) except asyncio.TimeoutError: return None except Exception: return None async def async_main(args, devices): print(f"Initializing {len(devices)} devices for '{args.action}'...") # Create the Global Trigger sync_event = asyncio.Event() # Create all tasks, passing the shared event # Connection setup happens immediately here tasks = [run_device(d, args, sync_event) for d in devices] # Give a brief moment for all serial ports to open and "settle" await asyncio.sleep(0.5) if len(devices) > 1: print(f">>> TRIGGERING {len(devices)} DEVICES <<<") else: print(f">>> TRIGGERING {devices[0]} <<<") sync_event.set() # Unblocks all devices instantly results = await asyncio.gather(*tasks) print("\nResults:") for dev, res in zip(devices, results): if args.action == 'status': print(f"{dev}: {res if res else 'TIMEOUT'}") else: status = "OK" if res is True else "FAIL" print(f"{dev}: {status}") # --- 4. Main Execution Block --- if __name__ == '__main__': args = parse_arguments() dev_list = expand_devices(args.devices) if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.run(async_main(args, dev_list))