#!/usr/bin/env python3 import asyncio import argparse import serial_asyncio import sys import re class SerialController(asyncio.Protocol): def __init__(self, port_name, args, loop, completion_future): self.port_name = port_name self.args = args self.loop = loop self.transport = None self.buffer = "" self.completion_future = completion_future # Determine command string if args.action == 'pps': self.cmd_str = f"iperf pps {args.value}\n" self.target_key = "IPERF_PPS_UPDATED" elif args.action == 'status': self.cmd_str = "iperf status\n" self.target_key = "IPERF_STATUS" else: self.cmd_str = f"iperf {args.action}\n" self.target_key = f"IPERF_{args.action.upper()}ED" # STARTED / STOPPED def connection_made(self, transport): self.transport = transport transport.write(b'\n') # Clear noise self.loop.create_task(self.send_command()) async def send_command(self): await asyncio.sleep(0.1) self.transport.write(self.cmd_str.encode()) def data_received(self, data): self.buffer += data.decode(errors='ignore') if self.target_key in self.buffer: if not self.completion_future.done(): if self.args.action == 'status': m = re.search(r'PPS=(\d+)', self.buffer) val = m.group(1) if m else "Unknown" self.completion_future.set_result(val) else: self.completion_future.set_result(True) self.transport.close() def connection_lost(self, exc): if not self.completion_future.done(): self.completion_future.set_exception(Exception("Closed")) async def run_device(port, args): loop = asyncio.get_running_loop() fut = loop.create_future() try: await serial_asyncio.create_serial_connection( loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) return await asyncio.wait_for(fut, timeout=2.0) except: return None def expand_devices(device_str): devices = [] parts = [d.strip() for d in device_str.split(',')] for part in parts: match = re.match(r'^(.*?)(\d+)-(\d+)$', part) if match: prefix, start, end = match.group(1), int(match.group(2)), int(match.group(3)) step = 1 if end >= start else -1 for i in range(start, end + step, step): devices.append(f"{prefix}{i}") else: devices.append(part) return devices async def main(): parser = argparse.ArgumentParser() parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('--value', type=int, help='Value for PPS command') parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") args = parser.parse_args() if args.action == 'pps' and not args.value: print("Error: 'pps' action requires --value") sys.exit(1) if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) devs = expand_devices(args.devices) print(f"Executing '{args.action}' on {len(devs)} devices...") tasks = [run_device(d, args) for d in devs] results = await asyncio.gather(*tasks) print("\nResults:") for dev, res in zip(devs, results): if args.action == 'status': print(f"{dev}: {res if res else 'TIMEOUT'} PPS") else: status = "OK" if res is True else "FAIL" print(f"{dev}: {status}") if __name__ == '__main__': asyncio.run(main())