more on robustness, also mitigate thundering herd

This commit is contained in:
Bob 2025-12-15 19:25:30 -08:00
parent 090c66f649
commit 954075ea6e
4 changed files with 311 additions and 178 deletions

View File

@ -25,7 +25,6 @@ void cmd_transport_register_listener(cmd_line_handler_t handler) {
} }
} }
// Trim trailing whitespace (CR, LF)
static void trim_trailing(char *s) { static void trim_trailing(char *s) {
int n = strlen(s); int n = strlen(s);
while (n > 0 && (s[n-1] == '\r' || s[n-1] == '\n' || isspace((unsigned char)s[n-1]))) { while (n > 0 && (s[n-1] == '\r' || s[n-1] == '\n' || isspace((unsigned char)s[n-1]))) {
@ -33,11 +32,12 @@ static void trim_trailing(char *s) {
} }
} }
// Dispatch line to listeners, then to ESP Console // --- UPDATED DISPATCH LOGIC ---
static void dispatch_line(char *line, cmd_reply_func_t reply_func, void *reply_ctx) { static void dispatch_line(char *line, cmd_reply_func_t reply_func, void *reply_ctx) {
bool handled = false; bool handled = false;
// 1. Offer to registered listeners (e.g. wifi_cfg) // 1. Offer to registered listeners (e.g. wifi_cfg)
// Note: The listener (wifi_cfg) is now responsible for sending "OK" if it handles the line.
for (int i = 0; i < s_listener_count; i++) { for (int i = 0; i < s_listener_count; i++) {
if (s_listeners[i] && s_listeners[i](line, reply_func, reply_ctx)) { if (s_listeners[i] && s_listeners[i](line, reply_func, reply_ctx)) {
handled = true; handled = true;
@ -45,14 +45,26 @@ static void dispatch_line(char *line, cmd_reply_func_t reply_func, void *reply_c
} }
} }
// 2. If not handled, pass to system console (for commands like 'mode_monitor') // 2. If not handled, pass to system console
if (!handled && strlen(line) > 0) { if (!handled && strlen(line) > 0) {
int ret; int ret;
esp_err_t err = esp_console_run(line, &ret); esp_err_t err = esp_console_run(line, &ret);
if (err == ESP_ERR_NOT_FOUND) { if (err == ESP_ERR_NOT_FOUND) {
// Unrecognized command - silent ignore or reply error // Robustness: Always reply, even for unknown commands
} else if (err != ESP_OK) { if (reply_func) reply_func("ERROR: Unknown Command\n", reply_ctx);
ESP_LOGE(TAG, "Console run error: %s", esp_err_to_name(err)); } else if (err == ESP_OK) {
if (ret == 0) {
// Command Success -> Send ACK so Python knows to proceed
if (reply_func) reply_func("OK\n", reply_ctx);
} else {
// Command logic failed (e.g., iperf returned 1)
char buf[64];
snprintf(buf, sizeof(buf), "ERROR: Command failed (ret=%d)\n", ret);
if (reply_func) reply_func(buf, reply_ctx);
}
} else {
if (reply_func) reply_func("ERROR: System execution failed\n", reply_ctx);
} }
} }
} }
@ -66,7 +78,6 @@ static void uart_reply(const char *msg, void *ctx) {
static void uart_listener_task(void *arg) { static void uart_listener_task(void *arg) {
char line[256]; char line[256];
// Disable buffering
setvbuf(stdin, NULL, _IONBF, 0); setvbuf(stdin, NULL, _IONBF, 0);
setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stdout, NULL, _IONBF, 0);
@ -120,11 +131,7 @@ static void usb_listener_task(void *arg) {
void cmd_transport_init(void) { void cmd_transport_init(void) {
if (s_inited) return; if (s_inited) return;
s_inited = true; s_inited = true;
// Start UART Listener
xTaskCreatePinnedToCore(uart_listener_task, "cmd_uart", 4096, NULL, 5, NULL, tskNO_AFFINITY); xTaskCreatePinnedToCore(uart_listener_task, "cmd_uart", 4096, NULL, 5, NULL, tskNO_AFFINITY);
// Start USB Listener (if supported)
#if SOC_USB_SERIAL_JTAG_SUPPORTED #if SOC_USB_SERIAL_JTAG_SUPPORTED
xTaskCreatePinnedToCore(usb_listener_task, "cmd_usb", 4096, NULL, 5, NULL, tskNO_AFFINITY); xTaskCreatePinnedToCore(usb_listener_task, "cmd_usb", 4096, NULL, 5, NULL, tskNO_AFFINITY);
#endif #endif

View File

@ -256,7 +256,7 @@ uint32_t iperf_get_pps(void) {
return 1000000 / s_iperf_ctrl.cfg.pacing_period_us; return 1000000 / s_iperf_ctrl.cfg.pacing_period_us;
} }
// --- UPDATED UDP Client ---
static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) { static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
if (!iperf_wait_for_ip()) { if (!iperf_wait_for_ip()) {
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
@ -282,15 +282,10 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
status_led_set_state(LED_STATE_TRANSMITTING_SLOW); status_led_set_state(LED_STATE_TRANSMITTING_SLOW);
// --- Prepare Headers ---
udp_datagram *udp_hdr = (udp_datagram *)ctrl->buffer; udp_datagram *udp_hdr = (udp_datagram *)ctrl->buffer;
client_hdr_v1 *client_hdr = (client_hdr_v1 *)(ctrl->buffer + sizeof(udp_datagram)); client_hdr_v1 *client_hdr = (client_hdr_v1 *)(ctrl->buffer + sizeof(udp_datagram));
// Fill Client Header once with just the FLAG set.
// This is idempotent, so sending it every time is fine and allows server to detect settings.
iperf_generate_client_hdr(&ctrl->cfg, client_hdr); iperf_generate_client_hdr(&ctrl->cfg, client_hdr);
// --- Session Start ---
s_stats.running = true; s_stats.running = true;
s_session_start_time = esp_timer_get_time(); s_session_start_time = esp_timer_get_time();
s_session_end_time = 0; s_session_end_time = 0;
@ -308,8 +303,6 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
int64_t last_rate_check = esp_timer_get_time(); int64_t last_rate_check = esp_timer_get_time();
uint32_t packets_since_check = 0; uint32_t packets_since_check = 0;
// --- Use 64-bit integer for Packet ID ---
int64_t packet_id = 0; int64_t packet_id = 0;
struct timespec ts; struct timespec ts;
@ -321,11 +314,8 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
else while (esp_timer_get_time() < next_send_time) taskYIELD(); else while (esp_timer_get_time() < next_send_time) taskYIELD();
for (int k = 0; k < ctrl->cfg.burst_count; k++) { for (int k = 0; k < ctrl->cfg.burst_count; k++) {
// Update UDP Header (Seq/Time)
int64_t current_id = packet_id++; int64_t current_id = packet_id++;
// --- SPLIT 64-BIT ID ---
// Server logic: packetID = ntohl(id) | (ntohl(id2) << 32)
udp_hdr->id = htonl((uint32_t)(current_id & 0xFFFFFFFF)); udp_hdr->id = htonl((uint32_t)(current_id & 0xFFFFFFFF));
udp_hdr->id2 = htonl((uint32_t)((current_id >> 32) & 0xFFFFFFFF)); udp_hdr->id2 = htonl((uint32_t)((current_id >> 32) & 0xFFFFFFFF));
@ -339,12 +329,16 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
packets_since_check++; packets_since_check++;
s_session_packets++; s_session_packets++;
} else { } else {
if (errno != 12) { // --- ROBUST FIX: Never Abort ---
ESP_LOGE(TAG, "Send failed: %d", errno); // If send fails (buffer full, routing issue, etc.), we just yield and retry next loop.
status_led_set_state(LED_STATE_FAILED); // We do NOT goto exit.
goto exit; if (errno != 12) {
} // Log rarely to avoid spamming serial
vTaskDelay(pdMS_TO_TICKS(10)); if ((packet_id % 100) == 0) {
ESP_LOGW(TAG, "Send error: %d (Ignored)", errno);
}
}
vTaskDelay(pdMS_TO_TICKS(10));
} }
} }
@ -361,17 +355,17 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
else next_state = IPERF_STATE_TX_SLOW; else next_state = IPERF_STATE_TX_SLOW;
switch (next_state) { switch (next_state) {
case IPERF_STATE_TX: s_time_tx_us += interval_us; break; case IPERF_STATE_TX: s_time_tx_us += interval_us; break;
case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break; case IPERF_STATE_TX_SLOW: s_time_slow_us += interval_us; break;
case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break; case IPERF_STATE_TX_STALLED: s_time_stalled_us += interval_us; break;
default: break; default: break;
} }
if (next_state != s_current_fsm_state) { if (next_state != s_current_fsm_state) {
switch (next_state) { switch (next_state) {
case IPERF_STATE_TX: s_edge_tx++; break; case IPERF_STATE_TX: s_edge_tx++; break;
case IPERF_STATE_TX_SLOW: s_edge_slow++; break; case IPERF_STATE_TX_SLOW: s_edge_slow++; break;
case IPERF_STATE_TX_STALLED: s_edge_stalled++; break; case IPERF_STATE_TX_STALLED: s_edge_stalled++; break;
default: break; default: break;
} }
s_current_fsm_state = next_state; s_current_fsm_state = next_state;
} }
@ -384,32 +378,25 @@ static esp_err_t iperf_start_udp_client(iperf_ctrl_t *ctrl) {
next_send_time += ctrl->cfg.pacing_period_us; next_send_time += ctrl->cfg.pacing_period_us;
} }
exit: udp_datagram *hdr = (udp_datagram *)ctrl->buffer;
// Termination Packets int64_t final_id = -packet_id;
{ hdr->id = htonl((uint32_t)(final_id & 0xFFFFFFFF));
udp_datagram *hdr = (udp_datagram *)ctrl->buffer; hdr->id2 = htonl((uint32_t)((final_id >> 32) & 0xFFFFFFFF));
// --- NEGATED 64-BIT TERMINATION ID --- clock_gettime(CLOCK_REALTIME, &ts);
// Server logic: if (packetID < 0) terminate = true; hdr->tv_sec = htonl((uint32_t)ts.tv_sec);
int64_t final_id = -packet_id; hdr->tv_usec = htonl(ts.tv_nsec / 1000);
hdr->id = htonl((uint32_t)(final_id & 0xFFFFFFFF)); for(int i=0; i<10; i++) {
hdr->id2 = htonl((uint32_t)((final_id >> 32) & 0xFFFFFFFF)); sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr));
vTaskDelay(pdMS_TO_TICKS(2));
clock_gettime(CLOCK_REALTIME, &ts);
hdr->tv_sec = htonl((uint32_t)ts.tv_sec);
hdr->tv_usec = htonl(ts.tv_nsec / 1000);
for(int i=0; i<10; i++) {
sendto(sockfd, ctrl->buffer, ctrl->cfg.send_len, 0, (struct sockaddr *)&addr, sizeof(addr));
vTaskDelay(pdMS_TO_TICKS(2));
}
ESP_LOGI(TAG, "Sent termination packets (ID: %" PRId64 ")", final_id);
} }
ESP_LOGI(TAG, "Sent termination packets (ID: %" PRId64 ")", final_id);
close(sockfd); close(sockfd);
s_stats.running = false; s_stats.running = false;
s_session_end_time = esp_timer_get_time(); s_session_end_time = esp_timer_get_time();
s_stats.actual_pps = 0; s_stats.actual_pps = 0;
status_led_set_state(LED_STATE_CONNECTED); status_led_set_state(LED_STATE_CONNECTED); // <--- This is your "Solid Green"
printf("IPERF_STOPPED\n"); printf("IPERF_STOPPED\n");
return ESP_OK; return ESP_OK;
} }

View File

@ -306,6 +306,9 @@ static bool wifi_cfg_cmd_handler(const char *line, cmd_reply_func_t reply_func,
strcpy(s.iperf_dest, "192.168.1.50"); strcpy(s.iperf_dest, "192.168.1.50");
strcpy(s.iperf_role, "CLIENT"); strcpy(s.iperf_role, "CLIENT");
strcpy(s.iperf_proto, "UDP"); strcpy(s.iperf_proto, "UDP");
// --- NEW: ACK Start of CFG ---
if (reply_func) reply_func("OK\n", reply_ctx);
return true; return true;
} }
return false; return false;
@ -329,7 +332,8 @@ static bool wifi_cfg_cmd_handler(const char *line, cmd_reply_func_t reply_func,
csi_mgr_save_enable_state(s.csi_enable); csi_mgr_save_enable_state(s.csi_enable);
#endif #endif
if (reply_func) reply_func("Config saved. Reconnecting...\n", reply_ctx); // --- NEW: ACK End of CFG ---
if (reply_func) reply_func("OK Config Saved\n", reply_ctx);
// Apply changes immediately // Apply changes immediately
wifi_cfg_apply_from_nvs(); wifi_cfg_apply_from_nvs();
@ -339,6 +343,10 @@ static bool wifi_cfg_cmd_handler(const char *line, cmd_reply_func_t reply_func,
} }
on_cfg_line(line, &s); on_cfg_line(line, &s);
// --- NEW: ACK Intermediate Line ---
if (reply_func) reply_func("OK\n", reply_ctx);
return true; return true;
} }

View File

@ -10,6 +10,7 @@ import time
import shutil import shutil
import logging import logging
import glob import glob
import random
from pathlib import Path from pathlib import Path
# Ensure detection script is available # Ensure detection script is available
@ -68,35 +69,68 @@ def auto_detect_devices():
return detect_esp32.detect_esp32_devices() return detect_esp32.detect_esp32_devices()
class UnifiedDeployWorker: class UnifiedDeployWorker:
def __init__(self, port, target_ip, args, project_dir, flash_sem): def __init__(self, port, target_ip, args, project_dir, flash_sem, total_devs):
self.port = port self.port = port
self.target_ip = target_ip self.target_ip = target_ip
self.args = args self.args = args
self.project_dir = Path(project_dir) self.project_dir = Path(project_dir)
self.flash_sem = flash_sem self.flash_sem = flash_sem
self.total_devs = total_devs
self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.log = DeviceLoggerAdapter(logger, {'connid': port})
self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)')
self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE) self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE)
self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_csi_saved = re.compile(r'CSI enable state saved|Config saved', re.IGNORECASE) self.regex_status_ip = re.compile(r'Src=(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE)
self.regex_version = re.compile(r'APP_VERSION:\s*([0-9\.]+)', re.IGNORECASE) self.regex_version = re.compile(r'APP_VERSION:\s*([0-9\.]+)', re.IGNORECASE)
async def run(self): async def run(self):
try: try:
if self.args.check_version: if self.args.check_version:
ver = await self._query_version() return await self._query_version()
return ver
if not self.args.config_only: # --- CHANGE: Acquire Semaphore EARLY to protect Chip ID Detection ---
async with self.flash_sem: # This prevents 30 concurrent 'esptool chip_id' calls from crashing the USB bus.
async with self.flash_sem:
# 1. Logic to Determine Target (Auto-Detect)
# We do this here so it is protected by the semaphore.
detected_target = None
if self.args.target == 'auto' and not self.args.config_only:
detected_target = await self._identify_chip()
if not detected_target:
self.log.error("Failed to auto-detect chip type.")
return False
self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}")
# We temporarily override self.args.target just for this worker instance
# (Note: In a real object we might want a separate attribute, but this works for the flow)
target_to_use = detected_target
else:
target_to_use = self.args.target
# 2. Flash Firmware (if needed)
if not self.args.config_only:
if self.args.flash_erase: if self.args.flash_erase:
if not await self._erase_flash(): return False if not await self._erase_flash(): return False
# Pass the determined target explicitly to flash_firmware helper
# (We need to slightly modify _flash_firmware to accept this arg, or set it on self)
self.target_for_flash = target_to_use
if not await self._flash_firmware(): return False if not await self._flash_firmware(): return False
await asyncio.sleep(2.0)
# --- Semaphore Released Here (Config can run in parallel) ---
await asyncio.sleep(2.0)
if not self.args.flash_only: if not self.args.flash_only:
if self.args.ssid and self.args.password: if self.args.ssid and self.args.password:
# Thundering Herd Mitigation (WiFi Association)
if self.total_devs > 1:
delay = random.uniform(0, self.total_devs * 0.5)
self.log.info(f"Staggering config start by {delay:.1f}s...")
await asyncio.sleep(delay)
success = False success = False
for attempt in range(1, 4): for attempt in range(1, 4):
self.log.info(f"Configuring (Attempt {attempt}/3)...") self.log.info(f"Configuring (Attempt {attempt}/3)...")
@ -111,13 +145,197 @@ class UnifiedDeployWorker:
return False return False
else: else:
self.log.warning("No SSID/Password provided, skipping config") self.log.warning("No SSID/Password provided, skipping config")
if self.args.config_only: return False
return True return True
except Exception as e: except Exception as e:
self.log.error(f"Worker Exception: {e}") self.log.error(f"Worker Exception: {e}")
return False return False
async def _configure_device(self):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
return False
try:
# Reset DTR/RTS
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
writer.transport.serial.dtr = False
# 1. Clear Boot Logs (Best Effort)
await self._wait_for_boot(reader, writer)
# 2. Send Config (Robust Loop)
if not await self._send_config_await_ack(reader, writer):
self.log.error("Configuration failed: Device did not ACK commands.")
return False
self.log.info(f"{Colors.GREEN}Config accepted. Waiting for WiFi Association...{Colors.RESET}")
# 3. Wait for Association (Polling)
ip_found = await self._wait_for_association(reader, writer)
if ip_found:
self.log.info(f"{Colors.GREEN}Connected: {self.target_ip}{Colors.RESET}")
return True
else:
self.log.warning(f"{Colors.YELLOW}Config saved, but WiFi association timed out.{Colors.RESET}")
return False
except Exception as e:
self.log.error(f"Config Error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _wait_for_boot(self, reader, writer):
end_time = time.time() + 12
last_poke = time.time()
while time.time() < end_time:
try:
if time.time() - last_poke > 1.5:
writer.write(b'\n')
await writer.drain()
last_poke = time.time()
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
if self.regex_ready.search(line): return True
except asyncio.TimeoutError: continue
except Exception as e:
return False
return False
async def _send_config_await_ack(self, reader, writer):
try:
while not reader.at_eof():
await asyncio.wait_for(reader.read(1000), timeout=0.01)
except (asyncio.TimeoutError, asyncio.LimitOverrunError):
pass
csi_val = '1' if self.args.csi_enable else '0'
role_str = "SERVER" if self.args.iperf_server else "CLIENT"
iperf_enable_val = '0' if self.args.no_iperf else '1'
period_us = int(self.args.iperf_period * 1000000)
commands = [
(f"SSID:{self.args.ssid}", 2.0),
(f"PASS:{self.args.password}", 2.0),
(f"IP:{self.target_ip}", 2.0),
(f"MASK:{self.args.netmask}", 2.0),
(f"GW:{self.args.gateway}", 2.0),
(f"DHCP:0", 2.0),
(f"BAND:{self.args.band}", 2.0),
(f"BW:{self.args.bandwidth}", 2.0),
(f"POWERSAVE:{self.args.powersave}", 2.0),
(f"MODE:{self.args.mode}", 2.0),
(f"MON_CH:{self.args.monitor_channel}", 2.0),
(f"CSI:{csi_val}", 2.0),
(f"IPERF_PERIOD_US:{period_us}", 2.0),
(f"IPERF_ROLE:{role_str}", 2.0),
(f"IPERF_PROTO:{self.args.iperf_proto}", 2.0),
(f"IPERF_DST_IP:{self.args.iperf_dest_ip}", 2.0),
(f"IPERF_PORT:{self.args.iperf_port}", 2.0),
(f"IPERF_BURST:{self.args.iperf_burst}", 2.0),
(f"IPERF_LEN:{self.args.iperf_len}", 2.0),
(f"IPERF_ENABLED:{iperf_enable_val}", 2.0),
("END", 5.0)
]
entered_cfg = False
for attempt in range(5):
writer.write(b"CFG\n")
await writer.drain()
try:
await self._wait_for_ack_token(reader, "OK", timeout=1.0)
entered_cfg = True
break
except asyncio.TimeoutError:
pass
await asyncio.sleep(0.5)
if not entered_cfg:
self.log.error("Failed to enter CFG mode (No ACK on CFG command)")
return False
for cmd_str, timeout in commands:
cmd = cmd_str + "\n"
writer.write(cmd.encode('utf-8'))
await writer.drain()
try:
await self._wait_for_ack_token(reader, token="OK", timeout=timeout)
except asyncio.TimeoutError:
self.log.error(f"Timeout: Device did not ACK command '{cmd_str}'")
return False
except Exception as e:
self.log.error(f"Error while configuring: {e}")
return False
return True
async def _wait_for_ack_token(self, reader, token, timeout):
end_time = time.time() + timeout
while time.time() < end_time:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=timeout)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if token in line: return True
if "ERROR" in line: raise Exception(f"Device reported ERROR: {line}")
except asyncio.TimeoutError:
raise
raise asyncio.TimeoutError
async def _wait_for_association(self, reader, writer):
"""
Polls 'iperf status' to check if the Static IP is active.
Drains buffer aggressively to avoid lag.
"""
# Timeout covers Association + Handshake time (30s is safe for 30 devices)
timeout = time.time() + 30
last_poll = 0
while time.time() < timeout:
now = time.time()
# Send Poll every 2 seconds
if now - last_poll > 2.0:
try:
writer.write(b"iperf status\n")
await writer.drain()
last_poll = now
except Exception:
pass
# Aggressively read all lines available in buffer
try:
# Read with a very short timeout to keep loop spinning
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if line:
# Check for spontaneous event or poll response
m1 = self.regex_got_ip.search(line)
m2 = self.regex_status_ip.search(line)
found_ip = None
if m1: found_ip = m1.group(1)
if m2: found_ip = m2.group(1)
if found_ip == self.target_ip:
return True
except asyncio.TimeoutError:
continue
except Exception as e:
self.log.error(f"Assoc Wait Error: {e}")
return False
return False
# [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware as they were]
# (Rest of methods omitted for brevity as they are unchanged)
async def _query_version(self): async def _query_version(self):
try: try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
@ -152,13 +370,18 @@ class UnifiedDeployWorker:
return "Error" return "Error"
async def _identify_chip(self): async def _identify_chip(self):
cmd = ['esptool.py', '-p', self.port, 'chip_id'] for attempt in range(1, 4):
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) cmd = ['esptool.py', '-p', self.port, 'chip_id']
stdout, stderr = await proc.communicate() try:
output = stdout.decode() + stderr.decode() proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
match = self.regex_chip_type.search(output) stdout, stderr = await proc.communicate()
if match: output = stdout.decode() + stderr.decode()
return match.group(1).lower().replace('-', '') match = self.regex_chip_type.search(output)
if match:
return match.group(1).lower().replace('-', '')
if attempt < 3: await asyncio.sleep(1.0)
except Exception as e:
self.log.warning(f"Chip ID check exception (Attempt {attempt}): {e}")
return None return None
async def _erase_flash(self): async def _erase_flash(self):
@ -170,20 +393,19 @@ class UnifiedDeployWorker:
return False return False
async def _flash_firmware(self): async def _flash_firmware(self):
detected_target = None # Use the target determined securely in run(), or fallback to args if manual
if self.args.target == 'auto': target_to_use = getattr(self, 'target_for_flash', self.args.target)
detected_target = await self._identify_chip()
if not detected_target: # Safety check: run() should have resolved 'auto' before calling this
self.log.error("Failed to auto-detect chip type.") if target_to_use == 'auto':
return False self.log.error("Logic Error: Target is still 'auto' inside flash firmware.")
self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}") return False
target_to_use = detected_target
else:
target_to_use = self.args.target
suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu) suffix = generate_config_suffix(target_to_use, self.args.csi_enable, self.args.ampdu)
firmware_dir = self.project_dir / "firmware" firmware_dir = self.project_dir / "firmware"
unique_app = None unique_app = None
# 1. Find the specific app binary for this config
if firmware_dir.exists(): if firmware_dir.exists():
for f in os.listdir(firmware_dir): for f in os.listdir(firmware_dir):
if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition") and not f.startswith("ota_data") and not f.startswith("phy_init"): if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition") and not f.startswith("ota_data") and not f.startswith("phy_init"):
@ -193,6 +415,7 @@ class UnifiedDeployWorker:
self.log.error(f"Binary for config '{suffix}' not found in firmware/.") self.log.error(f"Binary for config '{suffix}' not found in firmware/.")
return False return False
# 2. Define paths for bootloader, partition table, etc.
unique_boot = f"bootloader_{suffix}.bin" unique_boot = f"bootloader_{suffix}.bin"
unique_part = f"partition-table_{suffix}.bin" unique_part = f"partition-table_{suffix}.bin"
unique_ota = f"ota_data_initial_{suffix}.bin" unique_ota = f"ota_data_initial_{suffix}.bin"
@ -203,10 +426,12 @@ class UnifiedDeployWorker:
return False return False
try: try:
# 3. Parse flash_args to construct the exact esptool command
with open(flash_args_path, 'r') as f: with open(flash_args_path, 'r') as f:
content = f.read().replace('\n', ' ').strip() content = f.read().replace('\n', ' ').strip()
raw_args = [x for x in content.split(' ') if x] raw_args = [x for x in content.split(' ') if x]
final_args = [] final_args = []
for arg in raw_args: for arg in raw_args:
if arg.endswith('bootloader.bin'): final_args.append(str(firmware_dir / unique_boot)) if arg.endswith('bootloader.bin'): final_args.append(str(firmware_dir / unique_boot))
elif arg.endswith('partition-table.bin'): final_args.append(str(firmware_dir / unique_part)) elif arg.endswith('partition-table.bin'): final_args.append(str(firmware_dir / unique_part))
@ -222,6 +447,8 @@ class UnifiedDeployWorker:
'write_flash'] + final_args 'write_flash'] + final_args
self.log.info(f"Flashing {firmware_dir / unique_app}...") self.log.info(f"Flashing {firmware_dir / unique_app}...")
# 4. Execute Flashing
proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
try: try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
@ -236,102 +463,6 @@ class UnifiedDeployWorker:
self.log.error(f"Flash Prep Error: {e}") self.log.error(f"Flash Prep Error: {e}")
return False return False
async def _configure_device(self):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200)
except Exception as e:
return False
try:
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
writer.transport.serial.dtr = False
if not await self._wait_for_boot(reader, writer):
self.log.warning("Boot prompt missed (sending blindly)...")
await self._send_config(writer)
is_configured = await self._verify_configuration(reader)
if is_configured:
self.log.info(f"{Colors.GREEN}Config verified. IP: {self.target_ip}{Colors.RESET}")
writer.transport.serial.dtr = False
writer.transport.serial.rts = True
await asyncio.sleep(0.1)
writer.transport.serial.rts = False
return True
else:
self.log.error(f"{Colors.RED}Config verification failed.{Colors.RESET}")
return False
except Exception as e:
self.log.error(f"Config Error: {e}")
return False
finally:
writer.close()
await writer.wait_closed()
async def _wait_for_boot(self, reader, writer):
end_time = time.time() + 12
last_poke = time.time()
while time.time() < end_time:
try:
if time.time() - last_poke > 1.5:
writer.write(b'\n')
await writer.drain()
last_poke = time.time()
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
if self.regex_ready.search(line): return True
except asyncio.TimeoutError: continue
except Exception as e:
self.log.error(f"Read error: {e}")
return False
return False
async def _send_config(self, writer):
await asyncio.sleep(0.5)
writer.write(b'\n')
await writer.drain()
await asyncio.sleep(0.2)
csi_val = '1' if self.args.csi_enable else '0'
role_str = "SERVER" if self.args.iperf_server else "CLIENT"
iperf_enable_val = '0' if self.args.no_iperf else '1'
period_us = int(self.args.iperf_period * 1000000)
config_lines = [
"CFG",
f"SSID:{self.args.ssid}", f"PASS:{self.args.password}",
f"IP:{self.target_ip}", f"MASK:{self.args.netmask}", f"GW:{self.args.gateway}",
f"DHCP:0", f"BAND:{self.args.band}", f"BW:{self.args.bandwidth}",
f"POWERSAVE:{self.args.powersave}", f"MODE:{self.args.mode}",
f"MON_CH:{self.args.monitor_channel}", f"CSI:{csi_val}",
f"IPERF_PERIOD_US:{period_us}", f"IPERF_ROLE:{role_str}",
f"IPERF_PROTO:{self.args.iperf_proto}", f"IPERF_DST_IP:{self.args.iperf_dest_ip}",
f"IPERF_PORT:{self.args.iperf_port}", f"IPERF_BURST:{self.args.iperf_burst}",
f"IPERF_LEN:{self.args.iperf_len}", f"IPERF_ENABLED:{iperf_enable_val}",
"END"
]
for line in config_lines:
cmd = line + "\r\n"
writer.write(cmd.encode('utf-8'))
await writer.drain()
await asyncio.sleep(0.1)
async def _verify_configuration(self, reader):
timeout = time.time() + 15
while time.time() < timeout:
try:
line_bytes = await asyncio.wait_for(reader.readline(), timeout=1.0)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line: continue
if self.regex_csi_saved.search(line): return True
m = self.regex_got_ip.search(line)
if m and m.group(1) == self.target_ip: return True
except asyncio.TimeoutError: continue
return False
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool') parser = argparse.ArgumentParser(description='ESP32 Unified Deployment Tool')
parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options') parser.add_argument('-i', '--interactive', action='store_true', help='Prompt for build options')
@ -529,7 +660,7 @@ async def run_deployment(args):
if not args.check_version: if not args.check_version:
print(f" [{dev.device}] Sequential IP: {target_ip} (Offset: +{offset})") print(f" [{dev.device}] Sequential IP: {target_ip} (Offset: +{offset})")
tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, project_dir, flash_sem).run()) tasks.append(UnifiedDeployWorker(dev.device, target_ip, args, project_dir, flash_sem, len(devs)).run())
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
if args.check_version: if args.check_version: