#!/usr/bin/env python3 import asyncio import argparse import serial_asyncio import sys import re class SerialController(asyncio.Protocol): def __init__(self, port_name, args, loop, completion_future): self.port_name = port_name self.args = args self.loop = loop self.transport = None self.buffer = "" self.completion_future = completion_future if args.action == 'pps': self.cmd_str = f"iperf pps {args.value}\n" self.target_key = "IPERF_PPS_UPDATED" elif args.action == 'status': self.cmd_str = "iperf status\n" self.target_key = "IPERF_STATUS" elif args.action == 'start': self.cmd_str = "iperf start\n" self.target_key = "IPERF_STARTED" elif args.action == 'stop': self.cmd_str = "iperf stop\n" self.target_key = "IPERF_STOPPED" def connection_made(self, transport): self.transport = transport transport.write(b'\n') self.loop.create_task(self.send_command()) async def send_command(self): await asyncio.sleep(0.1) self.transport.write(self.cmd_str.encode()) def data_received(self, data): self.buffer += data.decode(errors='ignore') # FIX: Process complete lines only to avoid partial regex matching while '\n' in self.buffer: line, self.buffer = self.buffer.split('\n', 1) line = line.strip() if self.target_key in line: if not self.completion_future.done(): if self.args.action == 'status': # FIX: Added [-]? to allow negative error rates (overshoot) # Regex: Err=([-\d\.]+)% m = re.search(r'Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%', line) if m: state = "Running" if m.group(1) == '1' else "Stopped" self.completion_future.set_result(f"{state}, Cfg: {m.group(2)}, Act: {m.group(3)}, Err: {m.group(4)}%") else: # Now if it fails, it's a true format mismatch, not fragmentation self.completion_future.set_result(f"Parse Error on line: {line}") else: self.completion_future.set_result(True) self.transport.close() return def connection_lost(self, exc): if not self.completion_future.done(): self.completion_future.set_exception(Exception("Closed")) async def run_device(port, args): loop = asyncio.get_running_loop() fut = loop.create_future() try: await serial_asyncio.create_serial_connection( loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) return await asyncio.wait_for(fut, timeout=2.0) except: return None def expand_devices(device_str): devices = [] parts = [d.strip() for d in device_str.split(',')] for part in parts: match = re.match(r'^(.*?)(\d+)-(\d+)$', part) if match: prefix, start, end = match.group(1), int(match.group(2)), int(match.group(3)) step = 1 if end >= start else -1 for i in range(start, end + step, step): devices.append(f"{prefix}{i}") else: devices.append(part) return devices async def main(): parser = argparse.ArgumentParser() parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS') parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") args = parser.parse_args() if args.value_arg is not None: args.value = args.value_arg if args.action == 'pps' and args.value is None: print("Error: 'pps' action requires a value") sys.exit(1) if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) devs = expand_devices(args.devices) print(f"Executing '{args.action}' on {len(devs)} devices...") tasks = [run_device(d, args) for d in devs] results = await asyncio.gather(*tasks) print("\nResults:") for dev, res in zip(devs, results): if args.action == 'status': print(f"{dev}: {res if res else 'TIMEOUT'}") else: status = "OK" if res is True else "FAIL" print(f"{dev}: {status}") if __name__ == '__main__': asyncio.run(main())