iperf stats after stop

This commit is contained in:
Bob 2025-12-14 21:59:13 -08:00
parent d1be832790
commit b86bb81110
2 changed files with 217 additions and 56 deletions

View File

@ -43,6 +43,30 @@ static TaskHandle_t s_iperf_task_handle = NULL;
// Global Stats Tracker // Global Stats Tracker
static iperf_stats_t s_stats = {0}; static iperf_stats_t s_stats = {0};
// --- Session Persistence Variables ---
// These persist after the task stops for post-analysis
static int64_t s_session_start_time = 0;
static int64_t s_session_end_time = 0;
static uint64_t s_session_packets = 0;
// --- State Duration & Edge Counters ---
typedef enum {
IPERF_STATE_IDLE = 0,
IPERF_STATE_TX,
IPERF_STATE_TX_SLOW,
IPERF_STATE_TX_STALLED
} iperf_fsm_state_t;
static int64_t s_time_tx_us = 0;
static int64_t s_time_slow_us = 0;
static int64_t s_time_stalled_us = 0;
static uint32_t s_edge_tx = 0;
static uint32_t s_edge_slow = 0;
static uint32_t s_edge_stalled = 0;
static iperf_fsm_state_t s_current_fsm_state = IPERF_STATE_IDLE;
static esp_event_handler_instance_t instance_any_id; static esp_event_handler_instance_t instance_any_id;
static esp_event_handler_instance_t instance_got_ip; static esp_event_handler_instance_t instance_got_ip;
@ -57,14 +81,56 @@ void iperf_get_stats(iperf_stats_t *stats) {
void iperf_print_status(void) { void iperf_print_status(void) {
iperf_get_stats(&s_stats); iperf_get_stats(&s_stats);
// 1. Get Source IP
char src_ip[32] = "0.0.0.0";
esp_netif_t *netif = esp_netif_get_handle_from_ifkey("WIFI_STA_DEF");
if (netif) {
esp_netif_ip_info_t ip_info;
if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) {
inet_ntop(AF_INET, &ip_info.ip, src_ip, sizeof(src_ip));
}
}
// 2. Get Destination IP
char dst_ip[32] = "0.0.0.0";
struct in_addr daddr;
daddr.s_addr = s_iperf_ctrl.cfg.dip;
inet_ntop(AF_INET, &daddr, dst_ip, sizeof(dst_ip));
float err = 0.0f; float err = 0.0f;
if (s_stats.running && s_stats.config_pps > 0) { if (s_stats.running && s_stats.config_pps > 0) {
int32_t diff = (int32_t)s_stats.config_pps - (int32_t)s_stats.actual_pps; int32_t diff = (int32_t)s_stats.config_pps - (int32_t)s_stats.actual_pps;
err = (float)diff * 100.0f / (float)s_stats.config_pps; err = (float)diff * 100.0f / (float)s_stats.config_pps;
} }
printf("IPERF_STATUS: Running=%d, Config=%" PRIu32 ", Actual=%" PRIu32 ", Err=%.1f%%\n", // 3. Compute Session Bandwidth
s_stats.running, s_stats.config_pps, s_stats.actual_pps, err); // Logic: If running, use current time. If stopped, use stored end time.
float avg_bw_mbps = 0.0f;
if (s_session_start_time > 0) {
int64_t end_t = (s_stats.running) ? esp_timer_get_time() : s_session_end_time;
if (end_t > s_session_start_time) {
double duration_sec = (double)(end_t - s_session_start_time) / 1000000.0;
// Only calc if duration is significant to avoid divide-by-tiny-number
if (duration_sec > 0.001) {
// Total bits = packets * packet_size * 8
double total_bits = (double)s_session_packets * (double)s_iperf_ctrl.cfg.send_len * 8.0;
avg_bw_mbps = (float)(total_bits / duration_sec / 1000000.0);
}
}
}
// New Format: Standard Stats
printf("IPERF_STATUS: Src=%s, Dst=%s, Running=%d, Config=%" PRIu32 ", Actual=%" PRIu32 ", Err=%.1f%%, Pkts=%" PRIu64 ", AvgBW=%.2f Mbps\n",
src_ip, dst_ip, s_stats.running, s_stats.config_pps, s_stats.actual_pps, err, s_session_packets, avg_bw_mbps);
// New Format: State Durations & Edges
printf("IPERF_STATES: TX=%.2fs (%lu), SLOW=%.2fs (%lu), STALLED=%.2fs (%lu)\n",
(double)s_time_tx_us/1000000.0, (unsigned long)s_edge_tx,
(double)s_time_slow_us/1000000.0, (unsigned long)s_edge_slow,
(double)s_time_stalled_us/1000000.0, (unsigned long)s_edge_stalled);
} }
// --- Network Events --- // --- Network Events ---
@ -168,7 +234,6 @@ uint32_t iperf_get_pps(void) {
} }
static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
// FIX 1: If wait is aborted (stop requested), print STOPPED so controller knows
if (!iperf_wait_for_ip()) { if (!iperf_wait_for_ip()) {
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
return ESP_OK; return ESP_OK;
@ -187,13 +252,27 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (sockfd < 0) { if (sockfd < 0) {
status_led_set_state(LED_STATE_FAILED); status_led_set_state(LED_STATE_FAILED);
ESP_LOGE(TAG, "Socket creation failed: %d", errno); ESP_LOGE(TAG, "Socket creation failed: %d", errno);
// FIX 2: Print STOPPED on failure so controller doesn't timeout
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
return ESP_FAIL; return ESP_FAIL;
} }
status_led_set_state(LED_STATE_TRANSMITTING_SLOW); status_led_set_state(LED_STATE_TRANSMITTING_SLOW);
// --- Session Start ---
s_stats.running = true; s_stats.running = true;
s_session_start_time = esp_timer_get_time();
s_session_end_time = 0; // Clear previous end time
s_session_packets = 0;
// Reset FSM Counters
s_time_tx_us = 0;
s_time_slow_us = 0;
s_time_stalled_us = 0;
s_edge_tx = 0;
s_edge_slow = 0;
s_edge_stalled = 0;
s_current_fsm_state = IPERF_STATE_IDLE;
printf("IPERF_STARTED\n"); printf("IPERF_STARTED\n");
int64_t next_send_time = esp_timer_get_time(); int64_t next_send_time = esp_timer_get_time();
@ -225,6 +304,7 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (sent > 0) { if (sent > 0) {
packets_since_check++; packets_since_check++;
s_session_packets++;
} else { } else {
if (errno != 12) { if (errno != 12) {
ESP_LOGE(TAG, "Send failed: %d", errno); ESP_LOGE(TAG, "Send failed: %d", errno);
@ -239,12 +319,45 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (now - last_rate_check > RATE_CHECK_INTERVAL_US) { if (now - last_rate_check > RATE_CHECK_INTERVAL_US) {
uint32_t interval_us = (uint32_t)(now - last_rate_check); uint32_t interval_us = (uint32_t)(now - last_rate_check);
if (interval_us > 0) { if (interval_us > 0) {
// Calculate Instantaneous PPS
s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us);
}
// --- FSM Logic ---
uint32_t config_pps = iperf_get_pps(); uint32_t config_pps = iperf_get_pps();
uint32_t threshold = (config_pps * 3) / 4; uint32_t threshold = (config_pps * 3) / 4;
led_state_t target = (s_stats.actual_pps >= threshold) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW;
if (status_led_get_state() != target) status_led_set_state(target); iperf_fsm_state_t next_state;
if (s_stats.actual_pps == 0) {
next_state = IPERF_STATE_TX_STALLED;
} else if (s_stats.actual_pps >= threshold) {
next_state = IPERF_STATE_TX;
} else {
next_state = IPERF_STATE_TX_SLOW;
}
// Update Duration
switch (next_state) {
case IPERF_STATE_TX: s_time_tx_us += interval_us; break;
case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break;
case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break;
default: break;
}
// Detect Edges
if (next_state != s_current_fsm_state) {
switch (next_state) {
case IPERF_STATE_TX: s_edge_tx++; break;
case IPERF_STATE_TX_SLOW: s_edge_slow++; break;
case IPERF_STATE_TX_STALLED: s_edge_stalled++; break;
default: break;
}
s_current_fsm_state = next_state;
}
// Update LED based on state
led_state_t led_target = (s_current_fsm_state == IPERF_STATE_TX) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW;
if (status_led_get_state() != led_target) status_led_set_state(led_target);
}
last_rate_check = now; last_rate_check = now;
packets_since_check = 0; packets_since_check = 0;
@ -273,7 +386,13 @@ exit:
close(sockfd); close(sockfd);
s_stats.running = false; s_stats.running = false;
// --- Session Stop ---
// Capture time exactly when loop exits.
// This allows accurate bandwidth calculation after the test stops.
s_session_end_time = esp_timer_get_time();
s_stats.actual_pps = 0; s_stats.actual_pps = 0;
status_led_set_state(LED_STATE_CONNECTED); status_led_set_state(LED_STATE_CONNECTED);
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
return ESP_OK; return ESP_OK;

View File

@ -5,6 +5,7 @@ import serial_asyncio
import sys import sys
import re import re
# --- 1. Protocol Class (Async Logic) ---
class SerialController(asyncio.Protocol): class SerialController(asyncio.Protocol):
def __init__(self, port_name, args, loop, completion_future): def __init__(self, port_name, args, loop, completion_future):
self.port_name = port_name self.port_name = port_name
@ -13,7 +14,9 @@ class SerialController(asyncio.Protocol):
self.transport = None self.transport = None
self.buffer = "" self.buffer = ""
self.completion_future = completion_future self.completion_future = completion_future
self.status_data = {}
# Command Construction
if args.action == 'pps': if args.action == 'pps':
self.cmd_str = f"iperf pps {args.value}\n" self.cmd_str = f"iperf pps {args.value}\n"
self.target_key = "IPERF_PPS_UPDATED" self.target_key = "IPERF_PPS_UPDATED"
@ -29,51 +32,86 @@ class SerialController(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
transport.write(b'\n') transport.write(b'\n') # Wake up/Clear
self.loop.create_task(self.send_command()) self.loop.create_task(self.send_command())
async def send_command(self): async def send_command(self):
await asyncio.sleep(0.1) await asyncio.sleep(0.1) # Stabilization delay
self.transport.write(self.cmd_str.encode()) self.transport.write(self.cmd_str.encode())
def data_received(self, data): def data_received(self, data):
self.buffer += data.decode(errors='ignore') self.buffer += data.decode(errors='ignore')
# FIX: Process complete lines only to avoid partial regex matching
while '\n' in self.buffer: while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1) line, self.buffer = self.buffer.split('\n', 1)
line = line.strip() line = line.strip()
# --- Status Parsing ---
if self.args.action == 'status':
if "IPERF_STATUS" in line:
m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line)
if m:
self.status_data['main'] = {
'src': m.group(1), 'dst': m.group(2),
'run': "Run" if m.group(3) == '1' else "Stop",
'cfg': m.group(4), 'act': m.group(5),
'err': m.group(6), 'pkts': m.group(7), 'bw': m.group(8)
}
elif "IPERF_STATES" in line:
m = re.search(r'TX=([\d\.]+)s \((\d+)\), SLOW=([\d\.]+)s \((\d+)\), STALLED=([\d\.]+)s \((\d+)\)', line)
if m:
self.status_data['states'] = {
'tx_t': m.group(1), 'tx_c': m.group(2),
'sl_t': m.group(3), 'sl_c': m.group(4),
'st_t': m.group(5), 'st_c': m.group(6)
}
if 'main' in self.status_data and 'states' in self.status_data:
if not self.completion_future.done():
d = self.status_data['main']
s = self.status_data['states']
output = (f"{d['src']} -> {d['dst']} | {d['run']}, "
f"Cfg:{d['cfg']}, Act:{d['act']}, Err:{d['err']}%, Pkts:{d['pkts']}, BW:{d['bw']}M | "
f"TX:{s['tx_t']}s({s['tx_c']}) "
f"SL:{s['sl_t']}s({s['sl_c']}) "
f"ST:{s['st_t']}s({s['st_c']})")
self.completion_future.set_result(output)
self.transport.close()
return
# --- Simple Command Parsing ---
else:
if self.target_key in line: if self.target_key in line:
if not self.completion_future.done(): if not self.completion_future.done():
if self.args.action == 'status':
# FIX: Added [-]? to allow negative error rates (overshoot)
# Regex: Err=([-\d\.]+)%
m = re.search(r'Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%', line)
if m:
state = "Running" if m.group(1) == '1' else "Stopped"
self.completion_future.set_result(f"{state}, Cfg: {m.group(2)}, Act: {m.group(3)}, Err: {m.group(4)}%")
else:
# Now if it fails, it's a true format mismatch, not fragmentation
self.completion_future.set_result(f"Parse Error on line: {line}")
else:
self.completion_future.set_result(True) self.completion_future.set_result(True)
self.transport.close() self.transport.close()
return return
def connection_lost(self, exc): def connection_lost(self, exc):
if not self.completion_future.done(): if not self.completion_future.done():
if self.args.action == 'status' and 'main' in self.status_data:
d = self.status_data['main']
output = (f"{d['src']} -> {d['dst']} | {d['run']}, "
f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)")
self.completion_future.set_result(output)
else:
self.completion_future.set_exception(Exception("Closed")) self.completion_future.set_exception(Exception("Closed"))
async def run_device(port, args): # --- 2. Helper Functions ---
loop = asyncio.get_running_loop() def parse_arguments():
fut = loop.create_future() parser = argparse.ArgumentParser()
try: parser.add_argument('action', choices=['start', 'stop', 'pps', 'status'])
await serial_asyncio.create_serial_connection( parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS')
loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200) parser.add_argument('--value', type=int, help='Value for PPS')
return await asyncio.wait_for(fut, timeout=2.0) parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29")
except:
return None args = parser.parse_args()
if args.value_arg is not None: args.value = args.value_arg
if args.action == 'pps' and args.value is None:
print("Error: 'pps' action requires a value")
sys.exit(1)
return args
def expand_devices(device_str): def expand_devices(device_str):
devices = [] devices = []
@ -89,34 +127,38 @@ def expand_devices(device_str):
devices.append(part) devices.append(part)
return devices return devices
async def main(): # --- 3. Async Entry Point ---
parser = argparse.ArgumentParser() async def run_device(port, args):
parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) loop = asyncio.get_running_loop()
parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') fut = loop.create_future()
parser.add_argument('--value', type=int, help='Value for PPS') try:
parser.add_argument('--devices', required=True, help="/dev/ttyUSB0-29") await serial_asyncio.create_serial_connection(
loop, lambda: SerialController(port, args, loop, fut), port, baudrate=115200)
return await asyncio.wait_for(fut, timeout=2.0)
except:
return None
args = parser.parse_args() async def async_main(args, devices):
if args.value_arg is not None: args.value = args.value_arg print(f"Executing '{args.action}' on {len(devices)} devices...")
if args.action == 'pps' and args.value is None: tasks = [run_device(d, args) for d in devices]
print("Error: 'pps' action requires a value")
sys.exit(1)
if sys.platform == 'win32': asyncio.set_event_loop(asyncio.ProactorEventLoop())
devs = expand_devices(args.devices)
print(f"Executing '{args.action}' on {len(devs)} devices...")
tasks = [run_device(d, args) for d in devs]
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
print("\nResults:") print("\nResults:")
for dev, res in zip(devs, results): for dev, res in zip(devices, results):
if args.action == 'status': if args.action == 'status':
print(f"{dev}: {res if res else 'TIMEOUT'}") print(f"{dev}: {res if res else 'TIMEOUT'}")
else: else:
status = "OK" if res is True else "FAIL" status = "OK" if res is True else "FAIL"
print(f"{dev}: {status}") print(f"{dev}: {status}")
# --- 4. Main Execution Block ---
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(main()) # Phase 1: Synchronous Setup
args = parse_arguments()
dev_list = expand_devices(args.devices)
# Phase 2: Asynchronous Execution
if sys.platform == 'win32':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
asyncio.run(async_main(args, dev_list))