diff --git a/components/iperf/iperf.c b/components/iperf/iperf.c index a5e047a..1fe20df 100644 --- a/components/iperf/iperf.c +++ b/components/iperf/iperf.c @@ -27,7 +27,7 @@ static EventGroupHandle_t s_iperf_event_group = NULL; #define IPERF_IP_READY_BIT (1 << 0) #define IPERF_STOP_REQ_BIT (1 << 1) -#define MIN_RATE_CHECK_INTERVAL_US 250000 +#define RATE_CHECK_INTERVAL_US 500000 #define MIN_PACING_INTERVAL_US 100 typedef struct { @@ -39,12 +39,13 @@ typedef struct { static iperf_ctrl_t s_iperf_ctrl = {0}; static TaskHandle_t s_iperf_task_handle = NULL; +static iperf_cfg_t s_next_cfg; // Holding area for the new config +static bool s_reload_req = false; // Flag to trigger internal restart // Global Stats Tracker static iperf_stats_t s_stats = {0}; // --- Session Persistence Variables --- -// These persist after the task stops for post-analysis static int64_t s_session_start_time = 0; static int64_t s_session_end_time = 0; static uint64_t s_session_packets = 0; @@ -119,7 +120,7 @@ void iperf_print_status(void) { // 4. Calculate State Percentages double total_us = (double)(s_time_tx_us + s_time_slow_us + s_time_stalled_us); - if (total_us < 1.0) total_us = 1.0; // Prevent div/0 + if (total_us < 1.0) total_us = 1.0; double pct_tx = ((double)s_time_tx_us / total_us) * 100.0; double pct_slow = ((double)s_time_slow_us / total_us) * 100.0; @@ -130,7 +131,6 @@ void iperf_print_status(void) { src_ip, dst_ip, s_stats.running, s_stats.config_pps, s_stats.actual_pps, err, s_session_packets, avg_bw_mbps); // New Format: Time + Percentage + Edges - // Example: TX=15.15s/28.5% (15) printf("IPERF_STATES: TX=%.2fs/%.1f%% (%lu), SLOW=%.2fs/%.1f%% (%lu), STALLED=%.2fs/%.1f%% (%lu)\n", (double)s_time_tx_us/1000000.0, pct_tx, (unsigned long)s_edge_tx, (double)s_time_slow_us/1000000.0, pct_slow, (unsigned long)s_edge_slow, @@ -285,7 +285,7 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { int64_t last_rate_check = esp_timer_get_time(); uint32_t packets_since_check = 0; int32_t packet_id = 0; - uint32_t current_rate_check_interval_us = MIN_RATE_CHECK_INTERVAL_US; + struct timespec ts; while (!ctrl->finish && esp_timer_get_time() < end_time) { @@ -320,19 +320,8 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { } now = esp_timer_get_time(); - // Modified check to use dynamic interval - if (now - last_rate_check > current_rate_check_interval_us) { + if (now - last_rate_check > RATE_CHECK_INTERVAL_US) { uint32_t interval_us = (uint32_t)(now - last_rate_check); - - // Dynamic Interval Calculation --- - uint32_t config_pps = iperf_get_pps(); - if (config_pps > 0) { - // Calculate time needed to send 25 packets based on the CONFIG PPS - uint32_t needed_time_us = (25ULL * 1000000ULL) / config_pps; - // Set new interval: needed_time_us, but not lower than 250ms - current_rate_check_interval_us = (needed_time_us > MIN_RATE_CHECK_INTERVAL_US) ? needed_time_us : MIN_RATE_CHECK_INTERVAL_US; - } - if (interval_us > 0) { // Calculate Instantaneous PPS s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); @@ -403,8 +392,6 @@ exit: s_stats.running = false; // --- Session Stop --- - // Capture time exactly when loop exits. - // This allows accurate bandwidth calculation after the test stops. s_session_end_time = esp_timer_get_time(); s_stats.actual_pps = 0; @@ -415,32 +402,76 @@ exit: static void iperf_task(void *arg) { iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg; - if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) { - iperf_start_udp_client(ctrl); - } + + do { + // 1. Reset State for the new run + s_reload_req = false; // Clear the reload flag + ctrl->finish = false; // Clear the stop flag + xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT); // Clear event bits + + // 2. Run the workload (Blocking until finished or stopped) + if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) { + iperf_start_udp_client(ctrl); + } + + // 3. Check if we are dying or reloading + if (s_reload_req) { + ESP_LOGI(TAG, "Hot reloading iperf task with new config..."); + // Load the new config we staged in iperf_start + ctrl->cfg = s_next_cfg; + + // Optional: small delay to let the socket stack cool down + vTaskDelay(pdMS_TO_TICKS(100)); + } + + } while (s_reload_req); // Loop back if reload was requested + + // 4. Cleanup and Death (Only reached if NO reload requested) free(ctrl->buffer); s_iperf_task_handle = NULL; vTaskDelete(NULL); } void iperf_start(iperf_cfg_t *cfg) { + // 1. Prepare the Configuration + iperf_cfg_t new_cfg = *cfg; + iperf_read_nvs_config(&new_cfg); + + // Apply defaults + if (new_cfg.send_len == 0) new_cfg.send_len = 1470; + if (new_cfg.pacing_period_us == 0) new_cfg.pacing_period_us = 10000; + if (new_cfg.burst_count == 0) new_cfg.burst_count = 1; + + // 2. Check if Task is Already Running if (s_iperf_task_handle) { - ESP_LOGW(TAG, "Iperf already running"); + ESP_LOGI(TAG, "Task running. Staging hot reload."); + + // Stage the new config globally + s_next_cfg = new_cfg; + s_reload_req = true; + + // Signal the current task to stop (it will see s_reload_req when it exits) + iperf_stop(); + + // RETURN IMMEDIATELY - Do not wait! + printf("IPERF_RELOADING\n"); return; } - s_iperf_ctrl.cfg = *cfg; - iperf_read_nvs_config(&s_iperf_ctrl.cfg); - - if (s_iperf_ctrl.cfg.send_len == 0) s_iperf_ctrl.cfg.send_len = 1470; - if (s_iperf_ctrl.cfg.pacing_period_us == 0) s_iperf_ctrl.cfg.pacing_period_us = 10000; - if (s_iperf_ctrl.cfg.burst_count == 0) s_iperf_ctrl.cfg.burst_count = 1; - + // 3. Fresh Start (No existing task) + s_iperf_ctrl.cfg = new_cfg; s_iperf_ctrl.finish = false; - s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128; - s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len); - s_iperf_event_group = xEventGroupCreate(); + // Allocate buffer if it doesn't exist (it might be freed if task died previously) + if (s_iperf_ctrl.buffer == NULL) { + s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128; + s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len); + } + + if (s_iperf_event_group == NULL) { + s_iperf_event_group = xEventGroupCreate(); + } + xTaskCreate(iperf_task, "iperf", 4096, &s_iperf_ctrl, 5, &s_iperf_task_handle); } diff --git a/control_iperf.py b/control_iperf.py index 17fff15..71c4e65 100755 --- a/control_iperf.py +++ b/control_iperf.py @@ -4,16 +4,18 @@ import argparse import serial_asyncio import sys import re +import glob # --- 1. Protocol Class (Async Logic) --- class SerialController(asyncio.Protocol): - def __init__(self, port_name, args, loop, completion_future): + def __init__(self, port_name, args, loop, completion_future, sync_event): self.port_name = port_name self.args = args self.loop = loop self.transport = None self.buffer = "" self.completion_future = completion_future + self.sync_event = sync_event # Global trigger for tight sync self.status_data = {} # Command Construction @@ -32,11 +34,16 @@ class SerialController(asyncio.Protocol): def connection_made(self, transport): self.transport = transport - transport.write(b'\n') # Wake up/Clear - self.loop.create_task(self.send_command()) + # 1. Wake up the device immediately + transport.write(b'\n') + # 2. Schedule the command to wait for the global trigger + self.loop.create_task(self.await_trigger_and_send()) - async def send_command(self): - await asyncio.sleep(0.1) # Stabilization delay + async def await_trigger_and_send(self): + # 3. Wait here until ALL devices are connected and ready + await self.sync_event.wait() + + # 4. FIRE! self.transport.write(self.cmd_str.encode()) def data_received(self, data): @@ -59,8 +66,6 @@ class SerialController(asyncio.Protocol): } elif "IPERF_STATES" in line: - # NEW REGEX: Matches "TX=15.15s/28.5% (15)" - # Groups: 1=Time, 2=Pct, 3=Count (Repeated for SLOW and STALLED) m = re.search(r'TX=([\d\.]+)s/([\d\.]+)% \((\d+)\), SLOW=([\d\.]+)s/([\d\.]+)% \((\d+)\), STALLED=([\d\.]+)s/([\d\.]+)% \((\d+)\)', line) if m: self.status_data['states'] = { @@ -73,7 +78,6 @@ class SerialController(asyncio.Protocol): if not self.completion_future.done(): d = self.status_data['main'] s = self.status_data['states'] - # Updated Output Format output = (f"{d['src']} -> {d['dst']} | {d['run']}, " f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | " f"TX:{s['tx_t']}s/{s['tx_p']}%({s['tx_c']}) " @@ -107,7 +111,8 @@ def parse_arguments(): parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS') - parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") + # Updated help text + parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'") args = parser.parse_args() if args.value_arg is not None: args.value = args.value_arg @@ -116,7 +121,23 @@ def parse_arguments(): sys.exit(1) return args +def natural_sort_key(s): + """Sorts strings with numbers naturally (USB2 before USB10)""" + return [int(text) if text.isdigit() else text.lower() + for text in re.split('([0-9]+)', s)] + def expand_devices(device_str): + # NEW: Handle "all" case + if device_str.lower() == 'all': + # Find all ttyUSB devices + devices = glob.glob('/dev/ttyUSB*') + # Sort them numerically so output is clean + devices.sort(key=natural_sort_key) + if not devices: + print("Error: No /dev/ttyUSB* devices found!") + sys.exit(1) + return devices + devices = [] parts = [d.strip() for d in device_str.split(',')] for part in parts: @@ -131,19 +152,36 @@ def expand_devices(device_str): return devices # --- 3. Async Entry Point --- -async def run_device(port, args): +async def run_device(port, args, sync_event): loop = asyncio.get_running_loop() fut = loop.create_future() try: await serial_asyncio.create_serial_connection( - loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) - return await asyncio.wait_for(fut, timeout=2.0) - except: + loop, lambda: SerialController(port, args, loop, fut, sync_event), port, baudrate=115200) + + # Wait for the result (includes the wait for the event inside the protocol) + return await asyncio.wait_for(fut, timeout=5.0) + except asyncio.TimeoutError: + return None + except Exception: return None async def async_main(args, devices): - print(f"Executing '{args.action}' on {len(devices)} devices...") - tasks = [run_device(d, args) for d in devices] + print(f"Initializing {len(devices)} devices for '{args.action}'...") + + # Create the Global Trigger + sync_event = asyncio.Event() + + # Create all tasks, passing the shared event + # Connection setup happens immediately here + tasks = [run_device(d, args, sync_event) for d in devices] + + # Give a brief moment for all serial ports to open and "settle" + await asyncio.sleep(0.5) + + print(">>> TRIGGERING ALL DEVICES <<<") + sync_event.set() # Unblocks all devices instantly + results = await asyncio.gather(*tasks) print("\nResults:") @@ -156,11 +194,9 @@ async def async_main(args, devices): # --- 4. Main Execution Block --- if __name__ == '__main__': - # Phase 1: Synchronous Setup args = parse_arguments() dev_list = expand_devices(args.devices) - # Phase 2: Asynchronous Execution if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop())