more on scaling

This commit is contained in:
Bob 2025-12-15 16:56:14 -08:00
parent 40f77d324d
commit 090c66f649
4 changed files with 160 additions and 152 deletions

View File

@ -71,7 +71,26 @@ static iperf_fsm_state_t s_current_fsm_state = IPERF_STATE_IDLE;
static esp_event_handler_instance_t instance_any_id; static esp_event_handler_instance_t instance_any_id;
static esp_event_handler_instance_t instance_got_ip; static esp_event_handler_instance_t instance_got_ip;
// --- Status Reporting --- // --- Helper: Pattern Initialization ---
// Fills buffer with 0-9 cyclic ASCII pattern (matches iperf2 "pattern" function)
static void iperf_pattern(uint8_t *buf, uint32_t len) {
for (uint32_t i = 0; i < len; i++) {
buf[i] = (i % 10) + '0';
}
}
// --- Helper: Generate Client Header ---
// Modified to set all zeros except HEADER_SEQNO64B
static void iperf_generate_client_hdr(iperf_cfg_t *cfg, client_hdr_v1 *hdr) {
// Zero out the entire structure
memset(hdr, 0, sizeof(client_hdr_v1));
// Set only the SEQNO64B flag (Server will detect 64-bit seqno in UDP header)
hdr->flags = htonl(HEADER_SEQNO64B);
}
// ... [Existing Status Reporting & Event Handler Code] ...
void iperf_get_stats(iperf_stats_t *stats) { void iperf_get_stats(iperf_stats_t *stats) {
if (stats) { if (stats) {
s_stats.config_pps = (s_iperf_ctrl.cfg.pacing_period_us > 0) ? s_stats.config_pps = (s_iperf_ctrl.cfg.pacing_period_us > 0) ?
@ -237,6 +256,7 @@ uint32_t iperf_get_pps(void) {
return 1000000 / s_iperf_ctrl.cfg.pacing_period_us; return 1000000 / s_iperf_ctrl.cfg.pacing_period_us;
} }
// --- UPDATED UDP Client ---
static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (!iperf_wait_for_ip()) { if (!iperf_wait_for_ip()) {
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
@ -262,19 +282,23 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
status_led_set_state(LED_STATE_TRANSMITTING_SLOW); status_led_set_state(LED_STATE_TRANSMITTING_SLOW);
// --- Prepare Headers ---
udp_datagram *udp_hdr = (udp_datagram *)ctrl->buffer;
client_hdr_v1 *client_hdr = (client_hdr_v1 *)(ctrl->buffer + sizeof(udp_datagram));
// Fill Client Header once with just the FLAG set.
// This is idempotent, so sending it every time is fine and allows server to detect settings.
iperf_generate_client_hdr(&ctrl->cfg, client_hdr);
// --- Session Start --- // --- Session Start ---
s_stats.running = true; s_stats.running = true;
s_session_start_time = esp_timer_get_time(); s_session_start_time = esp_timer_get_time();
s_session_end_time = 0; // Clear previous end time s_session_end_time = 0;
s_session_packets = 0; s_session_packets = 0;
// Reset FSM Counters // Reset FSM
s_time_tx_us = 0; s_time_tx_us = 0; s_time_slow_us = 0; s_time_stalled_us = 0;
s_time_slow_us = 0; s_edge_tx = 0; s_edge_slow = 0; s_edge_stalled = 0;
s_time_stalled_us = 0;
s_edge_tx = 0;
s_edge_slow = 0;
s_edge_stalled = 0;
s_current_fsm_state = IPERF_STATE_IDLE; s_current_fsm_state = IPERF_STATE_IDLE;
printf("IPERF_STARTED\n"); printf("IPERF_STARTED\n");
@ -284,8 +308,9 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
int64_t last_rate_check = esp_timer_get_time(); int64_t last_rate_check = esp_timer_get_time();
uint32_t packets_since_check = 0; uint32_t packets_since_check = 0;
int32_t packet_id = 0;
// --- Use 64-bit integer for Packet ID ---
int64_t packet_id = 0;
struct timespec ts; struct timespec ts;
while (!ctrl->finish && esp_timer_get_time() < end_time) { while (!ctrl->finish && esp_timer_get_time() < end_time) {
@ -296,13 +321,17 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
else while (esp_timer_get_time() < next_send_time) taskYIELD(); else while (esp_timer_get_time() < next_send_time) taskYIELD();
for (int k = 0; k < ctrl->cfg.burst_count; k++) { for (int k = 0; k < ctrl->cfg.burst_count; k++) {
udp_datagram *hdr = (udp_datagram *)ctrl->buffer; // Update UDP Header (Seq/Time)
int64_t current_id = packet_id++;
// --- SPLIT 64-BIT ID ---
// Server logic: packetID = ntohl(id) | (ntohl(id2) << 32)
udp_hdr->id = htonl((uint32_t)(current_id & 0xFFFFFFFF));
udp_hdr->id2 = htonl((uint32_t)((current_id >> 32) & 0xFFFFFFFF));
hdr->id = htonl(packet_id++);
clock_gettime(CLOCK_REALTIME, &ts); clock_gettime(CLOCK_REALTIME, &ts);
hdr->tv_sec = htonl(ts.tv_sec); udp_hdr->tv_sec = htonl((uint32_t)ts.tv_sec);
hdr->tv_usec = htonl(ts.tv_nsec / 1000); udp_hdr->tv_usec = htonl(ts.tv_nsec / 1000);
hdr->id2 = hdr->id;
int sent = sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr)); int sent = sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr));
@ -323,31 +352,20 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (now - last_rate_check > RATE_CHECK_INTERVAL_US) { if (now - last_rate_check > RATE_CHECK_INTERVAL_US) {
uint32_t interval_us = (uint32_t)(now - last_rate_check); uint32_t interval_us = (uint32_t)(now - last_rate_check);
if (interval_us > 0) { if (interval_us > 0) {
// Calculate Instantaneous PPS
s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us); s_stats.actual_pps = (uint32_t)((uint64_t)packets_since_check * 1000000 / interval_us);
// --- FSM Logic ---
uint32_t config_pps = iperf_get_pps(); uint32_t config_pps = iperf_get_pps();
uint32_t threshold = (config_pps * 3) / 4; uint32_t threshold = (config_pps * 3) / 4;
iperf_fsm_state_t next_state; iperf_fsm_state_t next_state;
if (s_stats.actual_pps == 0) { if (s_stats.actual_pps == 0) next_state = IPERF_STATE_TX_STALLED;
next_state = IPERF_STATE_TX_STALLED; else if (s_stats.actual_pps >= threshold) next_state = IPERF_STATE_TX;
} else if (s_stats.actual_pps >= threshold) { else next_state = IPERF_STATE_TX_SLOW;
next_state = IPERF_STATE_TX;
} else {
next_state = IPERF_STATE_TX_SLOW;
}
// Update Duration
switch (next_state) { switch (next_state) {
case IPERF_STATE_TX: s_time_tx_us += interval_us; break; case IPERF_STATE_TX: s_time_tx_us += interval_us; break;
case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break; case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break;
case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break; case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break;
default: break; default: break;
} }
// Detect Edges
if (next_state != s_current_fsm_state) { if (next_state != s_current_fsm_state) {
switch (next_state) { switch (next_state) {
case IPERF_STATE_TX: s_edge_tx++; break; case IPERF_STATE_TX: s_edge_tx++; break;
@ -357,12 +375,9 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
} }
s_current_fsm_state = next_state; s_current_fsm_state = next_state;
} }
// Update LED based on state
led_state_t led_target = (s_current_fsm_state == IPERF_STATE_TX) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW; led_state_t led_target = (s_current_fsm_state == IPERF_STATE_TX) ? LED_STATE_TRANSMITTING : LED_STATE_TRANSMITTING_SLOW;
if (status_led_get_state() != led_target) status_led_set_state(led_target); if (status_led_get_state() != led_target) status_led_set_state(led_target);
} }
last_rate_check = now; last_rate_check = now;
packets_since_check = 0; packets_since_check = 0;
} }
@ -373,28 +388,27 @@ exit:
// Termination Packets // Termination Packets
{ {
udp_datagram *hdr = (udp_datagram *)ctrl->buffer; udp_datagram *hdr = (udp_datagram *)ctrl->buffer;
int32_t final_id = -packet_id;
hdr->id = htonl(final_id); // --- NEGATED 64-BIT TERMINATION ID ---
hdr->id2 = hdr->id; // Server logic: if (packetID < 0) terminate = true;
int64_t final_id = -packet_id;
hdr->id = htonl((uint32_t)(final_id & 0xFFFFFFFF));
hdr->id2 = htonl((uint32_t)((final_id >> 32) & 0xFFFFFFFF));
clock_gettime(CLOCK_REALTIME, &ts); clock_gettime(CLOCK_REALTIME, &ts);
hdr->tv_sec = htonl(ts.tv_sec); hdr->tv_sec = htonl((uint32_t)ts.tv_sec);
hdr->tv_usec = htonl(ts.tv_nsec / 1000); hdr->tv_usec = htonl(ts.tv_nsec / 1000);
for(int i=0; i<10; i++) { for(int i=0; i<10; i++) {
sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr)); sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr));
vTaskDelay(pdMS_TO_TICKS(2)); vTaskDelay(pdMS_TO_TICKS(2));
} }
ESP_LOGI(TAG, "Sent termination packets (ID: %ld)", (long)final_id); ESP_LOGI(TAG, "Sent termination packets (ID: %" PRId64 ")", final_id);
} }
close(sockfd); close(sockfd);
s_stats.running = false; s_stats.running = false;
// --- Session Stop ---
s_session_end_time = esp_timer_get_time(); s_session_end_time = esp_timer_get_time();
s_stats.actual_pps = 0; s_stats.actual_pps = 0;
status_led_set_state(LED_STATE_CONNECTED); status_led_set_state(LED_STATE_CONNECTED);
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
return ESP_OK; return ESP_OK;
@ -404,70 +418,57 @@ static void iperf_task(void *arg) {
iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg; iperf_ctrl_t *ctrl = (iperf_ctrl_t *)arg;
do { do {
// 1. Reset State for the new run s_reload_req = false;
s_reload_req = false; // Clear the reload flag ctrl->finish = false;
ctrl->finish = false; // Clear the stop flag xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT);
xEventGroupClearBits(s_iperf_event_group, IPERF_STOP_REQ_BIT); // Clear event bits
// 2. Run the workload (Blocking until finished or stopped)
if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) { if (ctrl->cfg.flag & IPERF_FLAG_UDP && ctrl->cfg.flag & IPERF_FLAG_CLIENT) {
iperf_start_udp_client(ctrl); iperf_start_udp_client(ctrl);
} }
// 3. Check if we are dying or reloading
if (s_reload_req) { if (s_reload_req) {
ESP_LOGI(TAG, "Hot reloading iperf task with new config..."); ESP_LOGI(TAG, "Hot reloading iperf task with new config...");
// Load the new config we staged in iperf_start
ctrl->cfg = s_next_cfg; ctrl->cfg = s_next_cfg;
// Optional: small delay to let the socket stack cool down
vTaskDelay(pdMS_TO_TICKS(100)); vTaskDelay(pdMS_TO_TICKS(100));
} }
} while (s_reload_req); // Loop back if reload was requested } while (s_reload_req);
// 4. Cleanup and Death (Only reached if NO reload requested)
free(ctrl->buffer); free(ctrl->buffer);
s_iperf_task_handle = NULL; s_iperf_task_handle = NULL;
vTaskDelete(NULL); vTaskDelete(NULL);
} }
void iperf_start(iperf_cfg_t *cfg) { void iperf_start(iperf_cfg_t *cfg) {
// 1. Prepare the Configuration
iperf_cfg_t new_cfg = *cfg; iperf_cfg_t new_cfg = *cfg;
iperf_read_nvs_config(&new_cfg); iperf_read_nvs_config(&new_cfg);
// Apply defaults
if (new_cfg.send_len == 0) new_cfg.send_len = 1470; if (new_cfg.send_len == 0) new_cfg.send_len = 1470;
if (new_cfg.pacing_period_us == 0) new_cfg.pacing_period_us = 10000; if (new_cfg.pacing_period_us == 0) new_cfg.pacing_period_us = 10000;
if (new_cfg.burst_count == 0) new_cfg.burst_count = 1; if (new_cfg.burst_count == 0) new_cfg.burst_count = 1;
// 2. Check if Task is Already Running
if (s_iperf_task_handle) { if (s_iperf_task_handle) {
ESP_LOGI(TAG, "Task running. Staging hot reload."); ESP_LOGI(TAG, "Task running. Staging hot reload.");
// Stage the new config globally
s_next_cfg = new_cfg; s_next_cfg = new_cfg;
s_reload_req = true; s_reload_req = true;
// Signal the current task to stop (it will see s_reload_req when it exits)
iperf_stop(); iperf_stop();
// RETURN IMMEDIATELY - Do not wait!
printf("IPERF_RELOADING\n"); printf("IPERF_RELOADING\n");
return; return;
} }
// 3. Fresh Start (No existing task)
s_iperf_ctrl.cfg = new_cfg; s_iperf_ctrl.cfg = new_cfg;
s_iperf_ctrl.finish = false; s_iperf_ctrl.finish = false;
// Allocate buffer if it doesn't exist (it might be freed if task died previously)
if (s_iperf_ctrl.buffer == NULL) { if (s_iperf_ctrl.buffer == NULL) {
s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128; s_iperf_ctrl.buffer_len = s_iperf_ctrl.cfg.send_len + 128;
s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len); s_iperf_ctrl.buffer = calloc(1, s_iperf_ctrl.buffer_len);
} }
// Initialize Buffer Pattern
if (s_iperf_ctrl.buffer) {
iperf_pattern(s_iperf_ctrl.buffer, s_iperf_ctrl.buffer_len);
}
if (s_iperf_event_group == NULL) { if (s_iperf_event_group == NULL) {
s_iperf_event_group = xEventGroupCreate(); s_iperf_event_group = xEventGroupCreate();
} }

View File

@ -11,6 +11,12 @@
#define IPERF_FLAG_TCP (1 << 2) #define IPERF_FLAG_TCP (1 << 2)
#define IPERF_FLAG_UDP (1 << 3) #define IPERF_FLAG_UDP (1 << 3)
// --- Standard Iperf2 Header Flags (from payloads.h) ---
#define HEADER_VERSION1 0x80000000
#define HEADER_EXTEND 0x40000000
#define HEADER_UDPTESTS 0x20000000
#define HEADER_SEQNO64B 0x08000000
// --- Defaults --- // --- Defaults ---
#define IPERF_DEFAULT_PORT 5001 #define IPERF_DEFAULT_PORT 5001
#define IPERF_DEFAULT_INTERVAL 3 #define IPERF_DEFAULT_INTERVAL 3
@ -48,27 +54,41 @@ typedef struct {
float error_rate; float error_rate;
} iperf_stats_t; } iperf_stats_t;
// --- Header --- // --- Wire Formats (Strict Layout) ---
// 1. Basic UDP Datagram Header (16 bytes)
// Corresponds to 'struct UDP_datagram' in payloads.h
typedef struct { typedef struct {
int32_t id; int32_t id; // Lower 32 bits of seqno
uint32_t tv_sec; uint32_t tv_sec; // Seconds
uint32_t tv_usec; uint32_t tv_usec; // Microseconds
uint32_t id2; int32_t id2; // Upper 32 bits of seqno (when HEADER_SEQNO64B is set)
} udp_datagram; } udp_datagram;
// 2. Client Header V1 (Used for First Packet Exchange)
// Corresponds to 'struct client_hdr_v1' in payloads.h
typedef struct {
int32_t flags;
int32_t numThreads;
int32_t mPort;
int32_t mBufLen;
int32_t mWinBand;
int32_t mAmount;
} client_hdr_v1;
// --- API --- // --- API ---
void iperf_init_led(led_strip_handle_t handle); void iperf_init_led(led_strip_handle_t handle);
void iperf_set_pps(uint32_t pps); void iperf_set_pps(uint32_t pps);
uint32_t iperf_get_pps(void); uint32_t iperf_get_pps(void);
// New: Get snapshot of current stats // Get snapshot of current stats
void iperf_get_stats(iperf_stats_t *stats); void iperf_get_stats(iperf_stats_t *stats);
// New: Print formatted status to stdout (for CLI/Python) // Print formatted status to stdout (for CLI/Python)
void iperf_print_status(void); void iperf_print_status(void);
void iperf_start(iperf_cfg_t *cfg); void iperf_start(iperf_cfg_t *cfg);
void iperf_stop(void); void iperf_stop(void);
#endif // IPERF_H #endif

View File

@ -8,9 +8,10 @@ import glob
# --- 1. Protocol Class (Async Logic) --- # --- 1. Protocol Class (Async Logic) ---
class SerialController(asyncio.Protocol): class SerialController(asyncio.Protocol):
def __init__(self, port_name, args, loop, completion_future, sync_event): def __init__(self, port_name, cmd_type, value, loop, completion_future, sync_event):
self.port_name = port_name self.port_name = port_name
self.args = args self.cmd_type = cmd_type # 'start', 'stop', 'pps', 'status'
self.value = value
self.loop = loop self.loop = loop
self.transport = None self.transport = None
self.buffer = "" self.buffer = ""
@ -19,16 +20,16 @@ class SerialController(asyncio.Protocol):
self.status_data = {} self.status_data = {}
# Command Construction # Command Construction
if args.action == 'pps': if self.cmd_type == 'pps':
self.cmd_str = f"iperf pps {args.value}\n" self.cmd_str = f"iperf pps {self.value}\n"
self.target_key = "IPERF_PPS_UPDATED" self.target_key = "IPERF_PPS_UPDATED"
elif args.action == 'status': elif self.cmd_type == 'status':
self.cmd_str = "iperf status\n" self.cmd_str = "iperf status\n"
self.target_key = "IPERF_STATUS" self.target_key = "IPERF_STATUS"
elif args.action == 'start': elif self.cmd_type == 'start':
self.cmd_str = "iperf start\n" self.cmd_str = "iperf start\n"
self.target_key = "IPERF_STARTED" self.target_key = "IPERF_STARTED"
elif args.action == 'stop': elif self.cmd_type == 'stop':
self.cmd_str = "iperf stop\n" self.cmd_str = "iperf stop\n"
self.target_key = "IPERF_STOPPED" self.target_key = "IPERF_STOPPED"
@ -40,8 +41,9 @@ class SerialController(asyncio.Protocol):
self.loop.create_task(self.await_trigger_and_send()) self.loop.create_task(self.await_trigger_and_send())
async def await_trigger_and_send(self): async def await_trigger_and_send(self):
# 3. Wait here until ALL devices are connected and ready # 3. Wait here until trigger is fired (allows sync start/stop)
await self.sync_event.wait() if self.sync_event:
await self.sync_event.wait()
# 4. FIRE! # 4. FIRE!
self.transport.write(self.cmd_str.encode()) self.transport.write(self.cmd_str.encode())
@ -54,7 +56,7 @@ class SerialController(asyncio.Protocol):
line = line.strip() line = line.strip()
# --- Status Parsing --- # --- Status Parsing ---
if self.args.action == 'status': if self.cmd_type == 'status':
if "IPERF_STATUS" in line: if "IPERF_STATUS" in line:
m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line) m = re.search(r'Src=([\d\.]+), Dst=([\d\.]+), Running=(\d+), Config=(\d+), Actual=(\d+), Err=([-\d\.]+)%, Pkts=(\d+), AvgBW=([\d\.]+) Mbps', line)
if m: if m:
@ -97,7 +99,7 @@ class SerialController(asyncio.Protocol):
def connection_lost(self, exc): def connection_lost(self, exc):
if not self.completion_future.done(): if not self.completion_future.done():
if self.args.action == 'status' and 'main' in self.status_data: if self.cmd_type == 'status' and 'main' in self.status_data:
d = self.status_data['main'] d = self.status_data['main']
output = (f"{d['src']} -> {d['dst']} | {d['run']}, " output = (f"{d['src']} -> {d['dst']} | {d['run']}, "
f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)") f"Cfg:{d['cfg']}, Act:{d['act']}, BW:{d['bw']}M (Partial)")
@ -108,10 +110,9 @@ class SerialController(asyncio.Protocol):
# --- 2. Helper Functions --- # --- 2. Helper Functions ---
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('action', choices=['start', 'stop', 'pps', 'status']) parser.add_argument('action', choices=['start', 'stop', 'pps', 'status', 'step-all'])
parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS') parser.add_argument('value_arg', nargs='?', type=int, help='Value for PPS')
parser.add_argument('--value', type=int, help='Value for PPS') parser.add_argument('--value', type=int, help='Value for PPS')
# Updated help text
parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'") parser.add_argument('--devices', required=True, help="List (e.g. /dev/ttyUSB0, /dev/ttyUSB1), Range (/dev/ttyUSB0-29), or 'all'")
args = parser.parse_args() args = parser.parse_args()
@ -122,16 +123,12 @@ def parse_arguments():
return args return args
def natural_sort_key(s): def natural_sort_key(s):
"""Sorts strings with numbers naturally (USB2 before USB10)"""
return [int(text) if text.isdigit() else text.lower() return [int(text) if text.isdigit() else text.lower()
for text in re.split('([0-9]+)', s)] for text in re.split('([0-9]+)', s)]
def expand_devices(device_str): def expand_devices(device_str):
# NEW: Handle "all" case
if device_str.lower() == 'all': if device_str.lower() == 'all':
# Find all ttyUSB devices
devices = glob.glob('/dev/ttyUSB*') devices = glob.glob('/dev/ttyUSB*')
# Sort them numerically so output is clean
devices.sort(key=natural_sort_key) devices.sort(key=natural_sort_key)
if not devices: if not devices:
print("Error: No /dev/ttyUSB* devices found!") print("Error: No /dev/ttyUSB* devices found!")
@ -151,51 +148,83 @@ def expand_devices(device_str):
devices.append(part) devices.append(part)
return devices return devices
# --- 3. Async Entry Point --- # --- 3. Async Execution Core ---
async def run_device(port, args, sync_event): async def run_single_cmd(port, cmd_type, value, sync_event=None):
"""Runs a single command on a single port."""
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
fut = loop.create_future() fut = loop.create_future()
try: try:
await serial_asyncio.create_serial_connection( await serial_asyncio.create_serial_connection(
loop, lambda: SerialController(port, args, loop, fut, sync_event), port, baudrate=115200) loop,
lambda: SerialController(port, cmd_type, value, loop, fut, sync_event),
# Wait for the result (includes the wait for the event inside the protocol) port,
baudrate=115200
)
return await asyncio.wait_for(fut, timeout=5.0) return await asyncio.wait_for(fut, timeout=5.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return None return None
except Exception: except Exception:
return None return None
async def async_main(args, devices): async def run_parallel_action(devices, action, value):
print(f"Initializing {len(devices)} devices for '{args.action}'...") """Runs the specified action on all devices in parallel."""
print(f"Initializing {len(devices)} devices for '{action}'...")
# Create the Global Trigger
sync_event = asyncio.Event() sync_event = asyncio.Event()
tasks = [run_single_cmd(d, action, value, sync_event) for d in devices]
# Create all tasks, passing the shared event # Allow connections to settle
# Connection setup happens immediately here
tasks = [run_device(d, args, sync_event) for d in devices]
# Give a brief moment for all serial ports to open and "settle"
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
if len(devices) > 1: # Fire all commands at once
print(f">>> TRIGGERING {len(devices)} DEVICES <<<") sync_event.set()
else:
print(f">>> TRIGGERING {devices[0]} <<<")
sync_event.set() # Unblocks all devices instantly
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
print("\nResults:") print("\nResults:")
for dev, res in zip(devices, results): for dev, res in zip(devices, results):
if args.action == 'status': if action == 'status':
print(f"{dev}: {res if res else 'TIMEOUT'}") print(f"{dev}: {res if res else 'TIMEOUT'}")
else: else:
status = "OK" if res is True else "FAIL" status = "OK" if res is True else "FAIL"
print(f"{dev}: {status}") print(f"{dev}: {status}")
async def run_step_all(devices):
"""Stops all, then starts/stops devices one by one."""
print("\n>>> STEP-ALL PHASE 1: STOPPING ALL DEVICES <<<")
await run_parallel_action(devices, 'stop', None)
print("\n>>> STEP-ALL PHASE 2: SEQUENTIAL TEST <<<")
for i, dev in enumerate(devices):
print(f"\n[{i+1}/{len(devices)}] Testing {dev}...")
# Start
print(f" -> Starting {dev}...")
res_start = await run_single_cmd(dev, 'start', None, None) # No sync needed for single
if res_start is not True:
print(f" -> FAILED to start {dev}. Skipping.")
continue
# Wait (Run Traffic)
print(" -> Running traffic (5 seconds)...")
await asyncio.sleep(5)
# Stop
print(f" -> Stopping {dev}...")
res_stop = await run_single_cmd(dev, 'stop', None, None)
if res_stop is not True:
print(f" -> Warning: Failed to stop {dev}")
else:
print(f" -> {dev} OK")
# Wait (Gap between devices)
print(" -> Waiting 1 second...")
await asyncio.sleep(1)
async def async_main(args, devices):
if args.action == 'step-all':
await run_step_all(devices)
else:
await run_parallel_action(devices, args.action, args.value)
# --- 4. Main Execution Block --- # --- 4. Main Execution Block ---
if __name__ == '__main__': if __name__ == '__main__':
args = parse_arguments() args = parse_arguments()

View File

@ -1,48 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys import sys
import os import os
# --- AUTO-BOOTSTRAP ESP-IDF ENVIRONMENT ---
# This block detects if the script is running without 'get_idf'
# and automatically re-launches it using the ESP-IDF Python environment.
try:
import serial_asyncio
except ImportError:
# 1. Define the location of your ESP-IDF Python Env (from your .bashrc)
home = os.path.expanduser("~")
base_env_dir = os.path.join(home, ".espressif", "python_env")
idf_python_path = None
# Search for the v6.0 environment
if os.path.exists(base_env_dir):
for d in os.listdir(base_env_dir):
if d.startswith("idf6.0") and "py3" in d:
candidate = os.path.join(base_env_dir, d, "bin", "python")
if os.path.exists(candidate):
idf_python_path = candidate
break
if idf_python_path:
print(f"\033[93m[Bootstrap] Missing libraries. Switching to ESP-IDF Python: {idf_python_path}\033[0m")
# 2. Update PATH to include 'esptool.py' (which lives in the venv bin)
venv_bin = os.path.dirname(idf_python_path)
os.environ["PATH"] = venv_bin + os.pathsep + os.environ["PATH"]
# 3. Optional: Set IDF_PATH if missing
if "IDF_PATH" not in os.environ:
default_idf = os.path.join(home, "Code", "esp32", "esp-idf-v6")
if os.path.exists(default_idf):
os.environ["IDF_PATH"] = default_idf
# 4. Re-execute the script using the correct interpreter
os.execv(idf_python_path, [idf_python_path] + sys.argv)
else:
print("\033[91mError: Could not find 'serial_asyncio' or the ESP-IDF environment.\033[0m")
print("Please run 'get_idf' manually.")
sys.exit(1)
# ------------------------------------------
import asyncio import asyncio
import serial_asyncio import serial_asyncio
import argparse import argparse