diff --git a/components/iperf/iperf.c b/components/iperf/iperf.c index 1fe20df..cd67c84 100644 --- a/components/iperf/iperf.c +++ b/components/iperf/iperf.c @@ -71,7 +71,26 @@ static iperf_fsm_state_t s_current_fsm_state = IPERF_STATE_IDLE; static esp_event_handler_instance_t instance_any_id; static esp_event_handler_instance_t instance_got_ip; -// --- Status Reporting --- +// --- Helper: Pattern Initialization --- +// Fills buffer with 0-9 cyclic ASCII pattern (matches iperf2 "pattern" function) +static void iperf_pattern(uint8_t *buf, uint32_t len) { + for (uint32_t i = 0; i < len; i++) { + buf[i] = (i % 10) + '0'; + } +} + +// --- Helper: Generate Client Header --- +// Modified to set all zeros except HEADER_SEQNO64B +static void iperf_generate_client_hdr(iperf_cfg_t *cfg, client_hdr_v1 *hdr) { + // Zero out the entire structure + memset(hdr, 0, sizeof(client_hdr_v1)); + + // Set only the SEQNO64B flag (Server will detect 64-bit seqno in UDP header) + hdr->flags = htonl(HEADER_SEQNO64B); +} + +// ... [Existing Status Reporting & Event Handler Code] ... + void iperf_get_stats(iperf_stats_t *stats) { if (stats) { s_stats.config_pps = (s_iperf_ctrl.cfg.pacing_period_us > 0) ? @@ -237,6 +256,7 @@ uint32_t iperf_get_pps(void) { return 1000000 / s_iperf_ctrl.cfg.pacing_period_us; } +// --- UPDATED UDP Client --- static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { if (!iperf_wait_for_ip()) { printf("IPERF_STOPPED\n"); @@ -262,19 +282,23 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { status_led_set_state(LED_STATE_TRANSMITTING_SLOW); + // --- Prepare Headers --- + udp_datagram *udp_hdr = (udp_datagram *)ctrl->buffer; + client_hdr_v1 *client_hdr = (client_hdr_v1 *)(ctrl->buffer + sizeof(udp_datagram)); + + // Fill Client Header once with just the FLAG set. + // This is idempotent, so sending it every time is fine and allows server to detect settings. + iperf_generate_client_hdr(&ctrl->cfg, client_hdr); + // --- Session Start --- s_stats.running = true; s_session_start_time = esp_timer_get_time(); - s_session_end_time = 0; // Clear previous end time + s_session_end_time = 0; s_session_packets = 0; - // Reset FSM Counters - s_time_tx_us = 0; - s_time_slow_us = 0; - s_time_stalled_us = 0; - s_edge_tx = 0; - s_edge_slow = 0; - s_edge_stalled = 0; + // Reset FSM + s_time_tx_us = 0; s_time_slow_us = 0; s_time_stalled_us = 0; + s_edge_tx = 0; s_edge_slow = 0; s_edge_stalled = 0; s_current_fsm_state = IPERF_STATE_IDLE; printf("IPERF_STARTED\n"); @@ -284,8 +308,9 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { int64_t last_rate_check = esp_timer_get_time(); uint32_t packets_since_check = 0; - int32_t packet_id = 0; + // --- Use 64-bit integer for Packet ID --- + int64_t packet_id = 0; struct timespec ts; while (!ctrl->finish && esp_timer_get_time() < end_time) { @@ -296,13 +321,17 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { else while (esp_timer_get_time() < next_send_time) taskYIELD(); for (int k = 0; k < ctrl->cfg.burst_count; k++) { - udp_datagram *hdr = (udp_datagram *)ctrl->buffer; + // Update UDP Header (Seq/Time) + int64_t current_id = packet_id++; + + // --- SPLIT 64-BIT ID --- + // Server logic: packetID = ntohl(id) | (ntohl(id2) << 32) + udp_hdr->id = htonl((uint32_t)(current_id & 0xFFFFFFFF)); + udp_hdr->id2 = htonl((uint32_t)((current_id >> 32) & 0xFFFFFFFF)); - hdr->id = htonl(packet_id++); clock_gettime(CLOCK_REALTIME, &ts); - hdr->tv_sec = htonl(ts.tv_sec); - hdr->tv_usec = htonl(ts.tv_nsec / 1000); - hdr->id2 = hdr->id; + udp_hdr->tv_sec = htonl((uint32_t)ts.tv_sec); + udp_hdr->tv_usec = htonl(ts.tv_nsec / 1000); int sent = sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr)); @@ -323,31 +352,20 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { if (now - last_rate_check > RATE_CHECK_INTERVAL_US) { uint32_t interval_us = (uint32_t)(now - last_rate_check); if (interval_us > 0) { - // Calculate Instantaneous PPS s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); - - // --- FSM Logic --- uint32_t config_pps = iperf_get_pps(); uint32_t threshold = (config_pps * 3) / 4; - iperf_fsm_state_t next_state; - if (s_stats.actual_pps == 0) { - next_state = IPERF_STATE_TX_STALLED; - } else if (s_stats.actual_pps >= threshold) { - next_state = IPERF_STATE_TX; - } else { - next_state = IPERF_STATE_TX_SLOW; - } + if (s_stats.actual_pps == 0) next_state = IPERF_STATE_TX_STALLED; + else if (s_stats.actual_pps >= threshold) next_state = IPERF_STATE_TX; + else next_state = IPERF_STATE_TX_SLOW; - // Update Duration switch (next_state) { case IPERF_STATE_TX: s_time_tx_us += interval_us; break; case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break; case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break; default: break; } - - // Detect Edges if (next_state != s_current_fsm_state) { switch (next_state) { case IPERF_STATE_TX: s_edge_tx++; break; @@ -357,12 +375,9 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { } s_current_fsm_state = next_state; } - - // Update LED based on state led_state_t led_target = (s_current_fsm_state == IPERF_STATE_TX) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW; if (status_led_get_state() != led_target) status_led_set_state(led_target); } - last_rate_check = now; packets_since_check = 0; } @@ -373,28 +388,27 @@ exit: // Termination Packets { udp_datagram *hdr = (udp_datagram *)ctrl->buffer; - int32_t final_id = -packet_id; - hdr->id = htonl(final_id); - hdr->id2 = hdr->id; + + // --- NEGATED 64-BIT TERMINATION ID --- + // Server logic: if (packetID < 0) terminate = true; + int64_t final_id = -packet_id; + hdr->id = htonl((uint32_t)(final_id & 0xFFFFFFFF)); + hdr->id2 = htonl((uint32_t)((final_id >> 32) & 0xFFFFFFFF)); clock_gettime(CLOCK_REALTIME, &ts); - hdr->tv_sec = htonl(ts.tv_sec); + hdr->tv_sec = htonl((uint32_t)ts.tv_sec); hdr->tv_usec = htonl(ts.tv_nsec / 1000); - for(int i=0; i<10; i++) { sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr)); vTaskDelay(pdMS_TO_TICKS(2)); } - ESP_LOGI(TAG, "Sent termination packets (ID: %ld)", (long)final_id); + ESP_LOGI(TAG, "Sent termination packets (ID: %" PRId64 ")", final_id); } close(sockfd); s_stats.running = false; - - // --- Session Stop --- s_session_end_time = esp_timer_get_time(); s_stats.actual_pps = 0; - status_led_set_state(LED_STATE_CONNECTED); printf("IPERF_STOPPED\n"); return ESP_OK; @@ -404,70 +418,57 @@ static void iperf_task(void *arg) { iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg; do { - // 1. Reset State for the new run - s_reload_req = false; // Clear the reload flag - ctrl->finish = false; // Clear the stop flag - xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT); // Clear event bits + s_reload_req = false; + ctrl->finish = false; + xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT); - // 2. Run the workload (Blocking until finished or stopped) if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) { iperf_start_udp_client(ctrl); } - // 3. Check if we are dying or reloading if (s_reload_req) { ESP_LOGI(TAG, "Hot reloading iperf task with new config..."); - // Load the new config we staged in iperf_start ctrl->cfg = s_next_cfg; - - // Optional: small delay to let the socket stack cool down vTaskDelay(pdMS_TO_TICKS(100)); } - } while (s_reload_req); // Loop back if reload was requested + } while (s_reload_req); - // 4. Cleanup and Death (Only reached if NO reload requested) free(ctrl->buffer); s_iperf_task_handle = NULL; vTaskDelete(NULL); } void iperf_start(iperf_cfg_t *cfg) { - // 1. Prepare the Configuration iperf_cfg_t new_cfg = *cfg; iperf_read_nvs_config(&new_cfg); - // Apply defaults if (new_cfg.send_len == 0) new_cfg.send_len = 1470; if (new_cfg.pacing_period_us == 0) new_cfg.pacing_period_us = 10000; if (new_cfg.burst_count == 0) new_cfg.burst_count = 1; - // 2. Check if Task is Already Running if (s_iperf_task_handle) { ESP_LOGI(TAG, "Task running. Staging hot reload."); - - // Stage the new config globally s_next_cfg = new_cfg; s_reload_req = true; - - // Signal the current task to stop (it will see s_reload_req when it exits) iperf_stop(); - - // RETURN IMMEDIATELY - Do not wait! printf("IPERF_RELOADING\n"); return; } - // 3. Fresh Start (No existing task) s_iperf_ctrl.cfg = new_cfg; s_iperf_ctrl.finish = false; - // Allocate buffer if it doesn't exist (it might be freed if task died previously) if (s_iperf_ctrl.buffer == NULL) { s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128; s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len); } + // Initialize Buffer Pattern + if (s_iperf_ctrl.buffer) { + iperf_pattern(s_iperf_ctrl.buffer, s_iperf_ctrl.buffer_len); + } + if (s_iperf_event_group == NULL) { s_iperf_event_group = xEventGroupCreate(); } diff --git a/components/iperf/iperf.h b/components/iperf/iperf.h index 23c3d99..d7771eb 100644 --- a/components/iperf/iperf.h +++ b/components/iperf/iperf.h @@ -11,6 +11,12 @@ #define IPERF_FLAG_TCP (1 << 2) #define IPERF_FLAG_UDP (1 << 3) +// --- Standard Iperf2 Header Flags (from payloads.h) --- +#define HEADER_VERSION1 0x80000000 +#define HEADER_EXTEND 0x40000000 +#define HEADER_UDPTESTS 0x20000000 +#define HEADER_SEQNO64B 0x08000000 + // --- Defaults --- #define IPERF_DEFAULT_PORT 5001 #define IPERF_DEFAULT_INTERVAL 3 @@ -48,27 +54,41 @@ typedef struct { float error_rate; } iperf_stats_t; -// --- Header --- +// --- Wire Formats (Strict Layout) --- + +// 1. Basic UDP Datagram Header (16 bytes) +// Corresponds to 'struct UDP_datagram' in payloads.h typedef struct { - int32_t id; - uint32_t tv_sec; - uint32_t tv_usec; - uint32_t id2; + int32_t id; // Lower 32 bits of seqno + uint32_t tv_sec; // Seconds + uint32_t tv_usec; // Microseconds + int32_t id2; // Upper 32 bits of seqno (when HEADER_SEQNO64B is set) } udp_datagram; +// 2. Client Header V1 (Used for First Packet Exchange) +// Corresponds to 'struct client_hdr_v1' in payloads.h +typedef struct { + int32_t flags; + int32_t numThreads; + int32_t mPort; + int32_t mBufLen; + int32_t mWinBand; + int32_t mAmount; +} client_hdr_v1; + // --- API --- void iperf_init_led(led_strip_handle_t handle); void iperf_set_pps(uint32_t pps); uint32_t iperf_get_pps(void); -// New: Get snapshot of current stats +// Get snapshot of current stats void iperf_get_stats(iperf_stats_t *stats); -// New: Print formatted status to stdout (for CLI/Python) +// Print formatted status to stdout (for CLI/Python) void iperf_print_status(void); void iperf_start(iperf_cfg_t *cfg); void iperf_stop(void); -#endif // IPERF_H +#endif diff --git a/control_iperf.py b/control_iperf.py index c0221c8..ebbf15f 100755 --- a/control_iperf.py +++ b/control_iperf.py @@ -8,9 +8,10 @@ import glob # --- 1. Protocol Class (Async Logic) --- class SerialController(asyncio.Protocol): - def __init__(self, port_name, args, loop, completion_future, sync_event): + def __init__(self, port_name, cmd_type, value, loop, completion_future, sync_event): self.port_name = port_name - self.args = args + self.cmd_type = cmd_type # 'start', 'stop', 'pps', 'status' + self.value = value self.loop = loop self.transport = None self.buffer = "" @@ -19,16 +20,16 @@ class SerialController(asyncio.Protocol): self.status_data = {} # Command Construction - if args.action == 'pps': - self.cmd_str = f"iperf pps {args.value}\n" + if self.cmd_type == 'pps': + self.cmd_str = f"iperf pps {self.value}\n" self.target_key = "IPERF_PPS_UPDATED" - elif args.action == 'status': + elif self.cmd_type == 'status': self.cmd_str = "iperf status\n" self.target_key = "IPERF_STATUS" - elif args.action == 'start': + elif self.cmd_type == 'start': self.cmd_str = "iperf start\n" self.target_key = "IPERF_STARTED" - elif args.action == 'stop': + elif self.cmd_type == 'stop': self.cmd_str = "iperf stop\n" self.target_key = "IPERF_STOPPED" @@ -40,8 +41,9 @@ class SerialController(asyncio.Protocol): self.loop.create_task(self.await_trigger_and_send()) async def await_trigger_and_send(self): - # 3. Wait here until ALL devices are connected and ready - await self.sync_event.wait() + # 3. Wait here until trigger is fired (allows sync start/stop) + if self.sync_event: + await self.sync_event.wait() # 4. FIRE! self.transport.write(self.cmd_str.encode()) @@ -54,7 +56,7 @@ class SerialController(asyncio.Protocol): line = line.strip() # --- Status Parsing --- - if self.args.action == 'status': + if self.cmd_type == 'status': if "IPERF_STATUS" in line: m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line) if m: @@ -97,7 +99,7 @@ class SerialController(asyncio.Protocol): def connection_lost(self, exc): if not self.completion_future.done(): - if self.args.action == 'status' and 'main' in self.status_data: + if self.cmd_type == 'status' and 'main' in self.status_data: d = self.status_data['main'] output = (f"{d['src']} -> {d['dst']} | {d['run']}, " f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)") @@ -108,10 +110,9 @@ class SerialController(asyncio.Protocol): # --- 2. Helper Functions --- def parse_arguments(): parser = argparse.ArgumentParser() - parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) + parser.add_argument('action', choices=['start', 'stop', 'pps', 'status', 'step-all']) parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS') - # Updated help text parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'") args = parser.parse_args() @@ -122,16 +123,12 @@ def parse_arguments(): return args def natural_sort_key(s): - """Sorts strings with numbers naturally (USB2 before USB10)""" return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)] def expand_devices(device_str): - # NEW: Handle "all" case if device_str.lower() == 'all': - # Find all ttyUSB devices devices = glob.glob('/dev/ttyUSB*') - # Sort them numerically so output is clean devices.sort(key=natural_sort_key) if not devices: print("Error: No /dev/ttyUSB* devices found!") @@ -151,51 +148,83 @@ def expand_devices(device_str): devices.append(part) return devices -# --- 3. Async Entry Point --- -async def run_device(port, args, sync_event): +# --- 3. Async Execution Core --- +async def run_single_cmd(port, cmd_type, value, sync_event=None): + """Runs a single command on a single port.""" loop = asyncio.get_running_loop() fut = loop.create_future() try: await serial_asyncio.create_serial_connection( - loop, lambda: SerialController(port, args, loop, fut, sync_event), port, baudrate=115200) - - # Wait for the result (includes the wait for the event inside the protocol) + loop, + lambda: SerialController(port, cmd_type, value, loop, fut, sync_event), + port, + baudrate=115200 + ) return await asyncio.wait_for(fut, timeout=5.0) except asyncio.TimeoutError: return None except Exception: return None -async def async_main(args, devices): - print(f"Initializing {len(devices)} devices for '{args.action}'...") - - # Create the Global Trigger +async def run_parallel_action(devices, action, value): + """Runs the specified action on all devices in parallel.""" + print(f"Initializing {len(devices)} devices for '{action}'...") sync_event = asyncio.Event() + tasks = [run_single_cmd(d, action, value, sync_event) for d in devices] - # Create all tasks, passing the shared event - # Connection setup happens immediately here - tasks = [run_device(d, args, sync_event) for d in devices] - - # Give a brief moment for all serial ports to open and "settle" + # Allow connections to settle await asyncio.sleep(0.5) - if len(devices) > 1: - print(f">>> TRIGGERING {len(devices)} DEVICES <<<") - else: - print(f">>> TRIGGERING {devices[0]} <<<") - - sync_event.set() # Unblocks all devices instantly - + # Fire all commands at once + sync_event.set() results = await asyncio.gather(*tasks) print("\nResults:") for dev, res in zip(devices, results): - if args.action == 'status': + if action == 'status': print(f"{dev}: {res if res else 'TIMEOUT'}") else: status = "OK" if res is True else "FAIL" print(f"{dev}: {status}") +async def run_step_all(devices): + """Stops all, then starts/stops devices one by one.""" + print("\n>>> STEP-ALL PHASE 1: STOPPING ALL DEVICES <<<") + await run_parallel_action(devices, 'stop', None) + + print("\n>>> STEP-ALL PHASE 2: SEQUENTIAL TEST <<<") + for i, dev in enumerate(devices): + print(f"\n[{i+1}/{len(devices)}] Testing {dev}...") + + # Start + print(f" -> Starting {dev}...") + res_start = await run_single_cmd(dev, 'start', None, None) # No sync needed for single + if res_start is not True: + print(f" -> FAILED to start {dev}. Skipping.") + continue + + # Wait (Run Traffic) + print(" -> Running traffic (5 seconds)...") + await asyncio.sleep(5) + + # Stop + print(f" -> Stopping {dev}...") + res_stop = await run_single_cmd(dev, 'stop', None, None) + if res_stop is not True: + print(f" -> Warning: Failed to stop {dev}") + else: + print(f" -> {dev} OK") + + # Wait (Gap between devices) + print(" -> Waiting 1 second...") + await asyncio.sleep(1) + +async def async_main(args, devices): + if args.action == 'step-all': + await run_step_all(devices) + else: + await run_parallel_action(devices, args.action, args.value) + # --- 4. Main Execution Block --- if __name__ == '__main__': args = parse_arguments() diff --git a/esp32_deploy.py b/esp32_deploy.py index 8a20521..7caa508 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -1,48 +1,6 @@ #!/usr/bin/env python3 import sys import os - -# --- AUTO-BOOTSTRAP ESP-IDF ENVIRONMENT --- -# This block detects if the script is running without 'get_idf' -# and automatically re-launches it using the ESP-IDF Python environment. -try: - import serial_asyncio -except ImportError: - # 1. Define the location of your ESP-IDF Python Env (from your .bashrc) - home = os.path.expanduser("~") - base_env_dir = os.path.join(home, ".espressif", "python_env") - idf_python_path = None - - # Search for the v6.0 environment - if os.path.exists(base_env_dir): - for d in os.listdir(base_env_dir): - if d.startswith("idf6.0") and "py3" in d: - candidate = os.path.join(base_env_dir, d, "bin", "python") - if os.path.exists(candidate): - idf_python_path = candidate - break - - if idf_python_path: - print(f"\033[93m[Bootstrap] Missing libraries. Switching to ESP-IDF Python: {idf_python_path}\033[0m") - - # 2. Update PATH to include 'esptool.py' (which lives in the venv bin) - venv_bin = os.path.dirname(idf_python_path) - os.environ["PATH"] = venv_bin + os.pathsep + os.environ["PATH"] - - # 3. Optional: Set IDF_PATH if missing - if "IDF_PATH" not in os.environ: - default_idf = os.path.join(home, "Code", "esp32", "esp-idf-v6") - if os.path.exists(default_idf): - os.environ["IDF_PATH"] = default_idf - - # 4. Re-execute the script using the correct interpreter - os.execv(idf_python_path, [idf_python_path] + sys.argv) - else: - print("\033[91mError: Could not find 'serial_asyncio' or the ESP-IDF environment.\033[0m") - print("Please run 'get_idf' manually.") - sys.exit(1) -# ------------------------------------------ - import asyncio import serial_asyncio import argparse