more on synchronis start/stop

This commit is contained in:
Bob 2025-12-15 09:42:05 -08:00
parent 08d75fc645
commit cf33a8b19a
2 changed files with 117 additions and 50 deletions

View File

@ -27,7 +27,7 @@ static EventGroupHandle_t s_iperf_event_group = NULL;
#define IPERF_IP_READY_BIT (1 << 0) #define IPERF_IP_READY_BIT (1 << 0)
#define IPERF_STOP_REQ_BIT (1 << 1) #define IPERF_STOP_REQ_BIT (1 << 1)
#define MIN_RATE_CHECK_INTERVAL_US 250000 #define RATE_CHECK_INTERVAL_US 500000
#define MIN_PACING_INTERVAL_US 100 #define MIN_PACING_INTERVAL_US 100
typedef struct { typedef struct {
@ -39,12 +39,13 @@ typedef struct {
static iperf_ctrl_t s_iperf_ctrl = {0}; static iperf_ctrl_t s_iperf_ctrl = {0};
static TaskHandle_t s_iperf_task_handle = NULL; static TaskHandle_t s_iperf_task_handle = NULL;
static iperf_cfg_t s_next_cfg; // Holding area for the new config
static bool s_reload_req = false; // Flag to trigger internal restart
// Global Stats Tracker // Global Stats Tracker
static iperf_stats_t s_stats = {0}; static iperf_stats_t s_stats = {0};
// --- Session Persistence Variables --- // --- Session Persistence Variables ---
// These persist after the task stops for post-analysis
static int64_t s_session_start_time = 0; static int64_t s_session_start_time = 0;
static int64_t s_session_end_time = 0; static int64_t s_session_end_time = 0;
static uint64_t s_session_packets = 0; static uint64_t s_session_packets = 0;
@ -119,7 +120,7 @@ void iperf_print_status(void) {
// 4. Calculate State Percentages // 4. Calculate State Percentages
double total_us = (double)(s_time_tx_us + s_time_slow_us + s_time_stalled_us); double total_us = (double)(s_time_tx_us + s_time_slow_us + s_time_stalled_us);
if (total_us < 1.0) total_us = 1.0; // Prevent div/0 if (total_us < 1.0) total_us = 1.0;
double pct_tx = ((double)s_time_tx_us / total_us) * 100.0; double pct_tx = ((double)s_time_tx_us / total_us) * 100.0;
double pct_slow = ((double)s_time_slow_us / total_us) * 100.0; double pct_slow = ((double)s_time_slow_us / total_us) * 100.0;
@ -130,7 +131,6 @@ void iperf_print_status(void) {
src_ip, dst_ip, s_stats.running, s_stats.config_pps, s_stats.actual_pps, err, s_session_packets, avg_bw_mbps); src_ip, dst_ip, s_stats.running, s_stats.config_pps, s_stats.actual_pps, err, s_session_packets, avg_bw_mbps);
// New Format: Time + Percentage + Edges // New Format: Time + Percentage + Edges
// Example: TX=15.15s/28.5% (15)
printf("IPERF_STATES: TX=%.2fs/%.1f%% (%lu), SLOW=%.2fs/%.1f%% (%lu), STALLED=%.2fs/%.1f%% (%lu)\n", printf("IPERF_STATES: TX=%.2fs/%.1f%% (%lu), SLOW=%.2fs/%.1f%% (%lu), STALLED=%.2fs/%.1f%% (%lu)\n",
(double)s_time_tx_us/1000000.0, pct_tx, (unsigned long)s_edge_tx, (double)s_time_tx_us/1000000.0, pct_tx, (unsigned long)s_edge_tx,
(double)s_time_slow_us/1000000.0, pct_slow, (unsigned long)s_edge_slow, (double)s_time_slow_us/1000000.0, pct_slow, (unsigned long)s_edge_slow,
@ -285,7 +285,7 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
int64_t last_rate_check = esp_timer_get_time(); int64_t last_rate_check = esp_timer_get_time();
uint32_t packets_since_check = 0; uint32_t packets_since_check = 0;
int32_t packet_id = 0; int32_t packet_id = 0;
uint32_t current_rate_check_interval_us = MIN_RATE_CHECK_INTERVAL_US;
struct timespec ts; struct timespec ts;
while (!ctrl->finish && esp_timer_get_time() < end_time) { while (!ctrl->finish && esp_timer_get_time() < end_time) {
@ -320,19 +320,8 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
} }
now = esp_timer_get_time(); now = esp_timer_get_time();
// Modified check to use dynamic interval if (now - last_rate_check > RATE_CHECK_INTERVAL_US) {
if (now - last_rate_check > current_rate_check_interval_us) {
uint32_t interval_us = (uint32_t)(now - last_rate_check); uint32_t interval_us = (uint32_t)(now - last_rate_check);
// Dynamic Interval Calculation ---
uint32_t config_pps = iperf_get_pps();
if (config_pps > 0) {
// Calculate time needed to send 25 packets based on the CONFIG PPS
uint32_t needed_time_us = (25ULL * 1000000ULL) / config_pps;
// Set new interval: needed_time_us, but not lower than 250ms
current_rate_check_interval_us = (needed_time_us > MIN_RATE_CHECK_INTERVAL_US) ? needed_time_us : MIN_RATE_CHECK_INTERVAL_US;
}
if (interval_us > 0) { if (interval_us > 0) {
// Calculate Instantaneous PPS // Calculate Instantaneous PPS
s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us);
@ -403,8 +392,6 @@ exit:
s_stats.running = false; s_stats.running = false;
// --- Session Stop --- // --- Session Stop ---
// Capture time exactly when loop exits.
// This allows accurate bandwidth calculation after the test stops.
s_session_end_time = esp_timer_get_time(); s_session_end_time = esp_timer_get_time();
s_stats.actual_pps = 0; s_stats.actual_pps = 0;
@ -415,32 +402,76 @@ exit:
static void iperf_task(void *arg) { static void iperf_task(void *arg) {
iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg; iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg;
if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) {
iperf_start_udp_client(ctrl); do {
} // 1. Reset State for the new run
s_reload_req = false; // Clear the reload flag
ctrl->finish = false; // Clear the stop flag
xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT); // Clear event bits
// 2. Run the workload (Blocking until finished or stopped)
if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) {
iperf_start_udp_client(ctrl);
}
// 3. Check if we are dying or reloading
if (s_reload_req) {
ESP_LOGI(TAG, "Hot reloading iperf task with new config...");
// Load the new config we staged in iperf_start
ctrl->cfg = s_next_cfg;
// Optional: small delay to let the socket stack cool down
vTaskDelay(pdMS_TO_TICKS(100));
}
} while (s_reload_req); // Loop back if reload was requested
// 4. Cleanup and Death (Only reached if NO reload requested)
free(ctrl->buffer); free(ctrl->buffer);
s_iperf_task_handle = NULL; s_iperf_task_handle = NULL;
vTaskDelete(NULL); vTaskDelete(NULL);
} }
void iperf_start(iperf_cfg_t *cfg) { void iperf_start(iperf_cfg_t *cfg) {
// 1. Prepare the Configuration
iperf_cfg_t new_cfg = *cfg;
iperf_read_nvs_config(&new_cfg);
// Apply defaults
if (new_cfg.send_len == 0) new_cfg.send_len = 1470;
if (new_cfg.pacing_period_us == 0) new_cfg.pacing_period_us = 10000;
if (new_cfg.burst_count == 0) new_cfg.burst_count = 1;
// 2. Check if Task is Already Running
if (s_iperf_task_handle) { if (s_iperf_task_handle) {
ESP_LOGW(TAG, "Iperf already running"); ESP_LOGI(TAG, "Task running. Staging hot reload.");
// Stage the new config globally
s_next_cfg = new_cfg;
s_reload_req = true;
// Signal the current task to stop (it will see s_reload_req when it exits)
iperf_stop();
// RETURN IMMEDIATELY - Do not wait!
printf("IPERF_RELOADING\n");
return; return;
} }
s_iperf_ctrl.cfg = *cfg; // 3. Fresh Start (No existing task)
iperf_read_nvs_config(&s_iperf_ctrl.cfg); s_iperf_ctrl.cfg = new_cfg;
if (s_iperf_ctrl.cfg.send_len == 0) s_iperf_ctrl.cfg.send_len = 1470;
if (s_iperf_ctrl.cfg.pacing_period_us == 0) s_iperf_ctrl.cfg.pacing_period_us = 10000;
if (s_iperf_ctrl.cfg.burst_count == 0) s_iperf_ctrl.cfg.burst_count = 1;
s_iperf_ctrl.finish = false; s_iperf_ctrl.finish = false;
s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128;
s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len);
s_iperf_event_group = xEventGroupCreate(); // Allocate buffer if it doesn't exist (it might be freed if task died previously)
if (s_iperf_ctrl.buffer == NULL) {
s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128;
s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len);
}
if (s_iperf_event_group == NULL) {
s_iperf_event_group = xEventGroupCreate();
}
xTaskCreate(iperf_task, "iperf", 4096, &s_iperf_ctrl, 5, &s_iperf_task_handle); xTaskCreate(iperf_task, "iperf", 4096, &s_iperf_ctrl, 5, &s_iperf_task_handle);
} }

View File

@ -4,16 +4,18 @@ import argparse
import serial_asyncio import serial_asyncio
import sys import sys
import re import re
import glob
# --- 1. Protocol Class (Async Logic) --- # --- 1. Protocol Class (Async Logic) ---
class SerialController(asyncio.Protocol): class SerialController(asyncio.Protocol):
def __init__(self, port_name, args, loop, completion_future): def __init__(self, port_name, args, loop, completion_future, sync_event):
self.port_name = port_name self.port_name = port_name
self.args = args self.args = args
self.loop = loop self.loop = loop
self.transport = None self.transport = None
self.buffer = "" self.buffer = ""
self.completion_future = completion_future self.completion_future = completion_future
self.sync_event = sync_event # Global trigger for tight sync
self.status_data = {} self.status_data = {}
# Command Construction # Command Construction
@ -32,11 +34,16 @@ class SerialController(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
transport.write(b'\n') # Wake up/Clear # 1. Wake up the device immediately
self.loop.create_task(self.send_command()) transport.write(b'\n')
# 2. Schedule the command to wait for the global trigger
self.loop.create_task(self.await_trigger_and_send())
async def send_command(self): async def await_trigger_and_send(self):
await asyncio.sleep(0.1) # Stabilization delay # 3. Wait here until ALL devices are connected and ready
await self.sync_event.wait()
# 4. FIRE!
self.transport.write(self.cmd_str.encode()) self.transport.write(self.cmd_str.encode())
def data_received(self, data): def data_received(self, data):
@ -59,8 +66,6 @@ class SerialController(asyncio.Protocol):
} }
elif "IPERF_STATES" in line: elif "IPERF_STATES" in line:
# NEW REGEX: Matches "TX=15.15s/28.5% (15)"
# Groups: 1=Time, 2=Pct, 3=Count (Repeated for SLOW and STALLED)
m = re.search(r'TX=([\d\.]+)s/([\d\.]+)% \((\d+)\), SLOW=([\d\.]+)s/([\d\.]+)% \((\d+)\), STALLED=([\d\.]+)s/([\d\.]+)% \((\d+)\)', line) m = re.search(r'TX=([\d\.]+)s/([\d\.]+)% \((\d+)\), SLOW=([\d\.]+)s/([\d\.]+)% \((\d+)\), STALLED=([\d\.]+)s/([\d\.]+)% \((\d+)\)', line)
if m: if m:
self.status_data['states'] = { self.status_data['states'] = {
@ -73,7 +78,6 @@ class SerialController(asyncio.Protocol):
if not self.completion_future.done(): if not self.completion_future.done():
d = self.status_data['main'] d = self.status_data['main']
s = self.status_data['states'] s = self.status_data['states']
# Updated Output Format
output = (f"{d['src']} -> {d['dst']} | {d['run']}, " output = (f"{d['src']} -> {d['dst']} | {d['run']}, "
f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | " f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | "
f"TX:{s['tx_t']}s/{s['tx_p']}%({s['tx_c']}) " f"TX:{s['tx_t']}s/{s['tx_p']}%({s['tx_c']}) "
@ -107,7 +111,8 @@ def parse_arguments():
parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('action', choices=['start', 'stop', 'pps', 'status'])
parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS')
parser.add_argument('--value', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS')
parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") # Updated help text
parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'")
args = parser.parse_args() args = parser.parse_args()
if args.value_arg is not None: args.value = args.value_arg if args.value_arg is not None: args.value = args.value_arg
@ -116,7 +121,23 @@ def parse_arguments():
sys.exit(1) sys.exit(1)
return args return args
def natural_sort_key(s):
"""Sorts strings with numbers naturally (USB2 before USB10)"""
return [int(text) if text.isdigit() else text.lower()
for text in re.split('([0-9]+)', s)]
def expand_devices(device_str): def expand_devices(device_str):
# NEW: Handle "all" case
if device_str.lower() == 'all':
# Find all ttyUSB devices
devices = glob.glob('/dev/ttyUSB*')
# Sort them numerically so output is clean
devices.sort(key=natural_sort_key)
if not devices:
print("Error: No /dev/ttyUSB* devices found!")
sys.exit(1)
return devices
devices = [] devices = []
parts = [d.strip() for d in device_str.split(',')] parts = [d.strip() for d in device_str.split(',')]
for part in parts: for part in parts:
@ -131,19 +152,36 @@ def expand_devices(device_str):
return devices return devices
# --- 3. Async Entry Point --- # --- 3. Async Entry Point ---
async def run_device(port, args): async def run_device(port, args, sync_event):
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
fut = loop.create_future() fut = loop.create_future()
try: try:
await serial_asyncio.create_serial_connection( await serial_asyncio.create_serial_connection(
loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) loop, lambda: SerialController(port, args, loop, fut, sync_event), port, baudrate=115200)
return await asyncio.wait_for(fut, timeout=2.0)
except: # Wait for the result (includes the wait for the event inside the protocol)
return await asyncio.wait_for(fut, timeout=5.0)
except asyncio.TimeoutError:
return None
except Exception:
return None return None
async def async_main(args, devices): async def async_main(args, devices):
print(f"Executing '{args.action}' on {len(devices)} devices...") print(f"Initializing {len(devices)} devices for '{args.action}'...")
tasks = [run_device(d, args) for d in devices]
# Create the Global Trigger
sync_event = asyncio.Event()
# Create all tasks, passing the shared event
# Connection setup happens immediately here
tasks = [run_device(d, args, sync_event) for d in devices]
# Give a brief moment for all serial ports to open and "settle"
await asyncio.sleep(0.5)
print(">>> TRIGGERING ALL DEVICES <<<")
sync_event.set() # Unblocks all devices instantly
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
print("\nResults:") print("\nResults:")
@ -156,11 +194,9 @@ async def async_main(args, devices):
# --- 4. Main Execution Block --- # --- 4. Main Execution Block ---
if __name__ == '__main__': if __name__ == '__main__':
# Phase 1: Synchronous Setup
args = parse_arguments() args = parse_arguments()
dev_list = expand_devices(args.devices) dev_list = expand_devices(args.devices)
# Phase 2: Asynchronous Execution
if sys.platform == 'win32': if sys.platform == 'win32':
asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.set_event_loop(asyncio.ProactorEventLoop())