diff --git a/components/iperf/iperf.c b/components/iperf/iperf.c index 10adc42..b9532d1 100644 --- a/components/iperf/iperf.c +++ b/components/iperf/iperf.c @@ -43,6 +43,30 @@ static TaskHandle_t s_iperf_task_handle = NULL; // Global Stats Tracker static iperf_stats_t s_stats = {0}; +// --- Session Persistence Variables --- +// These persist after the task stops for post-analysis +static int64_t s_session_start_time = 0; +static int64_t s_session_end_time = 0; +static uint64_t s_session_packets = 0; + +// --- State Duration & Edge Counters --- +typedef enum { + IPERF_STATE_IDLE = 0, + IPERF_STATE_TX, + IPERF_STATE_TX_SLOW, + IPERF_STATE_TX_STALLED +} iperf_fsm_state_t; + +static int64_t s_time_tx_us = 0; +static int64_t s_time_slow_us = 0; +static int64_t s_time_stalled_us = 0; + +static uint32_t s_edge_tx = 0; +static uint32_t s_edge_slow = 0; +static uint32_t s_edge_stalled = 0; + +static iperf_fsm_state_t s_current_fsm_state = IPERF_STATE_IDLE; + static esp_event_handler_instance_t instance_any_id; static esp_event_handler_instance_t instance_got_ip; @@ -57,14 +81,56 @@ void iperf_get_stats(iperf_stats_t *stats) { void iperf_print_status(void) { iperf_get_stats(&s_stats); + + // 1. Get Source IP + char src_ip[32] = "0.0.0.0"; + esp_netif_t *netif = esp_netif_get_handle_from_ifkey("WIFI_STA_DEF"); + if (netif) { + esp_netif_ip_info_t ip_info; + if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) { + inet_ntop(AF_INET, &ip_info.ip, src_ip, sizeof(src_ip)); + } + } + + // 2. Get Destination IP + char dst_ip[32] = "0.0.0.0"; + struct in_addr daddr; + daddr.s_addr = s_iperf_ctrl.cfg.dip; + inet_ntop(AF_INET, &daddr, dst_ip, sizeof(dst_ip)); + float err = 0.0f; if (s_stats.running && s_stats.config_pps > 0) { int32_t diff = (int32_t)s_stats.config_pps - (int32_t)s_stats.actual_pps; err = (float)diff * 100.0f / (float)s_stats.config_pps; } - printf("IPERF_STATUS: Running=%d, Config=%" PRIu32 ", Actual=%" PRIu32 ", Err=%.1f%%\n", - s_stats.running, s_stats.config_pps, s_stats.actual_pps, err); + // 3. Compute Session Bandwidth + // Logic: If running, use current time. If stopped, use stored end time. + float avg_bw_mbps = 0.0f; + if (s_session_start_time > 0) { + int64_t end_t = (s_stats.running) ? esp_timer_get_time() : s_session_end_time; + + if (end_t > s_session_start_time) { + double duration_sec = (double)(end_t - s_session_start_time) / 1000000.0; + + // Only calc if duration is significant to avoid divide-by-tiny-number + if (duration_sec > 0.001) { + // Total bits = packets * packet_size * 8 + double total_bits = (double)s_session_packets * (double)s_iperf_ctrl.cfg.send_len * 8.0; + avg_bw_mbps = (float)(total_bits / duration_sec / 1000000.0); + } + } + } + + // New Format: Standard Stats + printf("IPERF_STATUS: Src=%s, Dst=%s, Running=%d, Config=%" PRIu32 ", Actual=%" PRIu32 ", Err=%.1f%%, Pkts=%" PRIu64 ", AvgBW=%.2f Mbps\n", + src_ip, dst_ip, s_stats.running, s_stats.config_pps, s_stats.actual_pps, err, s_session_packets, avg_bw_mbps); + + // New Format: State Durations & Edges + printf("IPERF_STATES: TX=%.2fs (%lu), SLOW=%.2fs (%lu), STALLED=%.2fs (%lu)\n", + (double)s_time_tx_us/1000000.0, (unsigned long)s_edge_tx, + (double)s_time_slow_us/1000000.0, (unsigned long)s_edge_slow, + (double)s_time_stalled_us/1000000.0, (unsigned long)s_edge_stalled); } // --- Network Events --- @@ -168,7 +234,6 @@ uint32_t iperf_get_pps(void) { } static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { - // FIX 1: If wait is aborted (stop requested), print STOPPED so controller knows if (!iperf_wait_for_ip()) { printf("IPERF_STOPPED\n"); return ESP_OK; @@ -187,13 +252,27 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { if (sockfd < 0) { status_led_set_state(LED_STATE_FAILED); ESP_LOGE(TAG, "Socket creation failed: %d", errno); - // FIX 2: Print STOPPED on failure so controller doesn't timeout printf("IPERF_STOPPED\n"); return ESP_FAIL; } status_led_set_state(LED_STATE_TRANSMITTING_SLOW); + + // --- Session Start --- s_stats.running = true; + s_session_start_time = esp_timer_get_time(); + s_session_end_time = 0; // Clear previous end time + s_session_packets = 0; + + // Reset FSM Counters + s_time_tx_us = 0; + s_time_slow_us = 0; + s_time_stalled_us = 0; + s_edge_tx = 0; + s_edge_slow = 0; + s_edge_stalled = 0; + s_current_fsm_state = IPERF_STATE_IDLE; + printf("IPERF_STARTED\n"); int64_t next_send_time = esp_timer_get_time(); @@ -225,6 +304,7 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { if (sent > 0) { packets_since_check++; + s_session_packets++; } else { if (errno != 12) { ESP_LOGE(TAG, "Send failed: %d", errno); @@ -239,12 +319,45 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { if (now - last_rate_check > RATE_CHECK_INTERVAL_US) { uint32_t interval_us = (uint32_t)(now - last_rate_check); if (interval_us > 0) { + // Calculate Instantaneous PPS s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); + + // --- FSM Logic --- + uint32_t config_pps = iperf_get_pps(); + uint32_t threshold = (config_pps * 3) / 4; + + iperf_fsm_state_t next_state; + if (s_stats.actual_pps == 0) { + next_state = IPERF_STATE_TX_STALLED; + } else if (s_stats.actual_pps >= threshold) { + next_state = IPERF_STATE_TX; + } else { + next_state = IPERF_STATE_TX_SLOW; + } + + // Update Duration + switch (next_state) { + case IPERF_STATE_TX: s_time_tx_us += interval_us; break; + case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break; + case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break; + default: break; + } + + // Detect Edges + if (next_state != s_current_fsm_state) { + switch (next_state) { + case IPERF_STATE_TX: s_edge_tx++; break; + case IPERF_STATE_TX_SLOW: s_edge_slow++; break; + case IPERF_STATE_TX_STALLED: s_edge_stalled++; break; + default: break; + } + s_current_fsm_state = next_state; + } + + // Update LED based on state + led_state_t led_target = (s_current_fsm_state == IPERF_STATE_TX) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW; + if (status_led_get_state() != led_target) status_led_set_state(led_target); } - uint32_t config_pps = iperf_get_pps(); - uint32_t threshold = (config_pps * 3) / 4; - led_state_t target = (s_stats.actual_pps >= threshold) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW; - if (status_led_get_state() != target) status_led_set_state(target); last_rate_check = now; packets_since_check = 0; @@ -273,7 +386,13 @@ exit: close(sockfd); s_stats.running = false; + + // --- Session Stop --- + // Capture time exactly when loop exits. + // This allows accurate bandwidth calculation after the test stops. + s_session_end_time = esp_timer_get_time(); s_stats.actual_pps = 0; + status_led_set_state(LED_STATE_CONNECTED); printf("IPERF_STOPPED\n"); return ESP_OK; diff --git a/control_iperf.py b/control_iperf.py index 3682c34..2d1a8e2 100755 --- a/control_iperf.py +++ b/control_iperf.py @@ -5,6 +5,7 @@ import serial_asyncio import sys import re +# --- 1. Protocol Class (Async Logic) --- class SerialController(asyncio.Protocol): def __init__(self, port_name, args, loop, completion_future): self.port_name = port_name @@ -13,7 +14,9 @@ class SerialController(asyncio.Protocol): self.transport = None self.buffer = "" self.completion_future = completion_future + self.status_data = {} + # Command Construction if args.action == 'pps': self.cmd_str = f"iperf pps {args.value}\n" self.target_key = "IPERF_PPS_UPDATED" @@ -29,51 +32,86 @@ class SerialController(asyncio.Protocol): def connection_made(self, transport): self.transport = transport - transport.write(b'\n') + transport.write(b'\n') # Wake up/Clear self.loop.create_task(self.send_command()) async def send_command(self): - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # Stabilization delay self.transport.write(self.cmd_str.encode()) def data_received(self, data): self.buffer += data.decode(errors='ignore') - # FIX: Process complete lines only to avoid partial regex matching while '\n' in self.buffer: line, self.buffer = self.buffer.split('\n', 1) line = line.strip() - if self.target_key in line: - if not self.completion_future.done(): - if self.args.action == 'status': - # FIX: Added [-]? to allow negative error rates (overshoot) - # Regex: Err=([-\d\.]+)% - m = re.search(r'Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%', line) - if m: - state = "Running" if m.group(1) == '1' else "Stopped" - self.completion_future.set_result(f"{state}, Cfg: {m.group(2)}, Act: {m.group(3)}, Err: {m.group(4)}%") - else: - # Now if it fails, it's a true format mismatch, not fragmentation - self.completion_future.set_result(f"Parse Error on line: {line}") - else: + # --- Status Parsing --- + if self.args.action == 'status': + if "IPERF_STATUS" in line: + m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line) + if m: + self.status_data['main'] = { + 'src': m.group(1), 'dst': m.group(2), + 'run': "Run" if m.group(3) == '1' else "Stop", + 'cfg': m.group(4), 'act': m.group(5), + 'err': m.group(6), 'pkts': m.group(7), 'bw': m.group(8) + } + + elif "IPERF_STATES" in line: + m = re.search(r'TX=([\d\.]+)s \((\d+)\), SLOW=([\d\.]+)s \((\d+)\), STALLED=([\d\.]+)s \((\d+)\)', line) + if m: + self.status_data['states'] = { + 'tx_t': m.group(1), 'tx_c': m.group(2), + 'sl_t': m.group(3), 'sl_c': m.group(4), + 'st_t': m.group(5), 'st_c': m.group(6) + } + + if 'main' in self.status_data and 'states' in self.status_data: + if not self.completion_future.done(): + d = self.status_data['main'] + s = self.status_data['states'] + output = (f"{d['src']} -> {d['dst']} | {d['run']}, " + f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | " + f"TX:{s['tx_t']}s({s['tx_c']}) " + f"SL:{s['sl_t']}s({s['sl_c']}) " + f"ST:{s['st_t']}s({s['st_c']})") + self.completion_future.set_result(output) + self.transport.close() + return + + # --- Simple Command Parsing --- + else: + if self.target_key in line: + if not self.completion_future.done(): self.completion_future.set_result(True) - self.transport.close() - return + self.transport.close() + return def connection_lost(self, exc): if not self.completion_future.done(): - self.completion_future.set_exception(Exception("Closed")) + if self.args.action == 'status' and 'main' in self.status_data: + d = self.status_data['main'] + output = (f"{d['src']} -> {d['dst']} | {d['run']}, " + f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)") + self.completion_future.set_result(output) + else: + self.completion_future.set_exception(Exception("Closed")) -async def run_device(port, args): - loop = asyncio.get_running_loop() - fut = loop.create_future() - try: - await serial_asyncio.create_serial_connection( - loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) - return await asyncio.wait_for(fut, timeout=2.0) - except: - return None +# --- 2. Helper Functions --- +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) + parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') + parser.add_argument('--value', type=int, help='Value for PPS') + parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") + + args = parser.parse_args() + if args.value_arg is not None: args.value = args.value_arg + if args.action == 'pps' and args.value is None: + print("Error: 'pps' action requires a value") + sys.exit(1) + return args def expand_devices(device_str): devices = [] @@ -89,34 +127,38 @@ def expand_devices(device_str): devices.append(part) return devices -async def main(): - parser = argparse.ArgumentParser() - parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) - parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') - parser.add_argument('--value', type=int, help='Value for PPS') - parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") +# --- 3. Async Entry Point --- +async def run_device(port, args): + loop = asyncio.get_running_loop() + fut = loop.create_future() + try: + await serial_asyncio.create_serial_connection( + loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) + return await asyncio.wait_for(fut, timeout=2.0) + except: + return None - args = parser.parse_args() - if args.value_arg is not None: args.value = args.value_arg - if args.action == 'pps' and args.value is None: - print("Error: 'pps' action requires a value") - sys.exit(1) - - if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop()) - - devs = expand_devices(args.devices) - print(f"Executing '{args.action}' on {len(devs)} devices...") - - tasks = [run_device(d, args) for d in devs] +async def async_main(args, devices): + print(f"Executing '{args.action}' on {len(devices)} devices...") + tasks = [run_device(d, args) for d in devices] results = await asyncio.gather(*tasks) print("\nResults:") - for dev, res in zip(devs, results): + for dev, res in zip(devices, results): if args.action == 'status': print(f"{dev}: {res if res else 'TIMEOUT'}") else: status = "OK" if res is True else "FAIL" print(f"{dev}: {status}") +# --- 4. Main Execution Block --- if __name__ == '__main__': - asyncio.run(main()) + # Phase 1: Synchronous Setup + args = parse_arguments() + dev_list = expand_devices(args.devices) + + # Phase 2: Asynchronous Execution + if sys.platform == 'win32': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + + asyncio.run(async_main(args, dev_list))