From 538d2031857d18474f3f33298f5133a9ef66bde7 Mon Sep 17 00:00:00 2001 From: Robert McMahon Date: Tue, 16 Dec 2025 15:01:27 -0800 Subject: [PATCH] serial console --- esp32_deploy.py | 240 ++++++++++++------------------------------------ 1 file changed, 59 insertions(+), 181 deletions(-) diff --git a/esp32_deploy.py b/esp32_deploy.py index c89f646..e184360 100755 --- a/esp32_deploy.py +++ b/esp32_deploy.py @@ -79,9 +79,9 @@ class UnifiedDeployWorker: self.log = DeviceLoggerAdapter(logger, {'connid': port}) self.regex_chip_type = re.compile(r'Detecting chip type... (ESP32\S*)') - self.regex_ready = re.compile(r'Entering idle loop|esp32>', re.IGNORECASE) + # Updated regex to look for the Shell Prompt + self.regex_prompt = re.compile(r'esp32>', re.IGNORECASE) self.regex_got_ip = re.compile(r'got ip:(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) - self.regex_status_ip = re.compile(r'Src=(\d+\.\d+\.\d+\.\d+)', re.IGNORECASE) self.regex_version = re.compile(r'APP_VERSION:\s*([0-9\.]+)', re.IGNORECASE) async def run(self): @@ -89,12 +89,8 @@ class UnifiedDeployWorker: if self.args.check_version: return await self._query_version() - # --- CHANGE: Acquire Semaphore EARLY to protect Chip ID Detection --- - # This prevents 30 concurrent 'esptool chip_id' calls from crashing the USB bus. + # --- Acquire Semaphore EARLY to protect Chip ID Detection --- async with self.flash_sem: - - # 1. Logic to Determine Target (Auto-Detect) - # We do this here so it is protected by the semaphore. detected_target = None if self.args.target == 'auto' and not self.args.config_only: detected_target = await self._identify_chip() @@ -102,30 +98,24 @@ class UnifiedDeployWorker: self.log.error("Failed to auto-detect chip type.") return False self.log.info(f"Auto-detected: {Colors.CYAN}{detected_target}{Colors.RESET}") - # We temporarily override self.args.target just for this worker instance - # (Note: In a real object we might want a separate attribute, but this works for the flow) target_to_use = detected_target else: target_to_use = self.args.target - # 2. Flash Firmware (if needed) if not self.args.config_only: if self.args.flash_erase: if not await self._erase_flash(): return False - # Pass the determined target explicitly to flash_firmware helper - # (We need to slightly modify _flash_firmware to accept this arg, or set it on self) self.target_for_flash = target_to_use if not await self._flash_firmware(): return False - # --- Semaphore Released Here (Config can run in parallel) --- + # --- Semaphore Released Here --- await asyncio.sleep(2.0) if not self.args.flash_only: if self.args.ssid and self.args.password: - - # Thundering Herd Mitigation (WiFi Association) + # Thundering Herd Mitigation if self.total_devs > 1: delay = random.uniform(0, self.total_devs * 0.5) self.log.info(f"Staggering config start by {delay:.1f}s...") @@ -157,33 +147,52 @@ class UnifiedDeployWorker: except Exception as e: return False try: - # Reset DTR/RTS + # Reset DTR/RTS logic writer.transport.serial.dtr = False writer.transport.serial.rts = True await asyncio.sleep(0.1) writer.transport.serial.rts = False writer.transport.serial.dtr = False - # 1. Clear Boot Logs (Best Effort) - await self._wait_for_boot(reader, writer) - - # 2. Send Config (Robust Loop) - if not await self._send_config_await_ack(reader, writer): - self.log.error("Configuration failed: Device did not ACK commands.") - return False - - self.log.info(f"{Colors.GREEN}Config accepted. Waiting for WiFi Association...{Colors.RESET}") - - # 3. Wait for Association (Polling) - ip_found = await self._wait_for_association(reader, writer) - - if ip_found: - self.log.info(f"{Colors.GREEN}Connected: {self.target_ip}{Colors.RESET}") - return True - else: - self.log.warning(f"{Colors.YELLOW}Config saved, but WiFi association timed out.{Colors.RESET}") + # 1. Wait for Shell Prompt + if not await self._wait_for_prompt(reader, writer, timeout=15): + self.log.error("Shell prompt not detected.") return False + # 2. Send Configuration via CLI + # Command: wifi_config -s "SSID" -p "PASS" -i "IP" + # Note: The Shell will auto-reboot after this command. + cmd = f'wifi_config -s "{self.args.ssid}" -p "{self.args.password}" -i "{self.target_ip}"' + if not self.args.iperf_client and not self.args.iperf_server: + # If just connecting, maybe we want DHCP? + # But if target_ip is set, we force static. + pass + + self.log.info(f"Sending: {cmd}") + writer.write(f"{cmd}\n".encode()) + await writer.drain() + + # 3. Wait for the reboot and new prompt + # The device prints "Rebooting..." then restarts. + self.log.info("Waiting for reboot...") + await asyncio.sleep(3.0) # Give it time to actually reset + + if not await self._wait_for_prompt(reader, writer, timeout=20): + self.log.error("Device did not return to prompt after reboot.") + return False + + self.log.info(f"{Colors.GREEN}Reboot complete. Shell Ready.{Colors.RESET}") + + # 4. (Optional) Start iperf if requested + # The new firmware does not auto-start iperf on boot unless commanded. + if not self.args.no_iperf: + self.log.info("Starting iperf listener...") + writer.write(b"iperf start\n") + await writer.drain() + await asyncio.sleep(0.5) + + return True + except Exception as e: self.log.error(f"Config Error: {e}") return False @@ -191,151 +200,29 @@ class UnifiedDeployWorker: writer.close() await writer.wait_closed() - async def _wait_for_boot(self, reader, writer): - end_time = time.time() + 12 - last_poke = time.time() - while time.time() < end_time: - try: - if time.time() - last_poke > 1.5: - writer.write(b'\n') - await writer.drain() - last_poke = time.time() - try: - line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1) - line = line_bytes.decode('utf-8', errors='ignore').strip() - if not line: continue - if self.regex_ready.search(line): return True - except asyncio.TimeoutError: continue - except Exception as e: - return False - return False - - async def _send_config_await_ack(self, reader, writer): - try: - while not reader.at_eof(): - await asyncio.wait_for(reader.read(1000), timeout=0.01) - except (asyncio.TimeoutError, asyncio.LimitOverrunError): - pass - - csi_val = '1' if self.args.csi_enable else '0' - role_str = "SERVER" if self.args.iperf_server else "CLIENT" - iperf_enable_val = '0' if self.args.no_iperf else '1' - period_us = int(self.args.iperf_period * 1000000) - - commands = [ - (f"SSID:{self.args.ssid}", 2.0), - (f"PASS:{self.args.password}", 2.0), - (f"IP:{self.target_ip}", 2.0), - (f"MASK:{self.args.netmask}", 2.0), - (f"GW:{self.args.gateway}", 2.0), - (f"DHCP:0", 2.0), - (f"BAND:{self.args.band}", 2.0), - (f"BW:{self.args.bandwidth}", 2.0), - (f"POWERSAVE:{self.args.powersave}", 2.0), - (f"MODE:{self.args.mode}", 2.0), - (f"MON_CH:{self.args.monitor_channel}", 2.0), - (f"CSI:{csi_val}", 2.0), - (f"IPERF_PERIOD_US:{period_us}", 2.0), - (f"IPERF_ROLE:{role_str}", 2.0), - (f"IPERF_PROTO:{self.args.iperf_proto}", 2.0), - (f"IPERF_DST_IP:{self.args.iperf_dest_ip}", 2.0), - (f"IPERF_PORT:{self.args.iperf_port}", 2.0), - (f"IPERF_BURST:{self.args.iperf_burst}", 2.0), - (f"IPERF_LEN:{self.args.iperf_len}", 2.0), - (f"IPERF_ENABLED:{iperf_enable_val}", 2.0), - ("END", 5.0) - ] - - entered_cfg = False - for attempt in range(5): - writer.write(b"CFG\n") - await writer.drain() - try: - await self._wait_for_ack_token(reader, "OK", timeout=1.0) - entered_cfg = True - break - except asyncio.TimeoutError: - pass - await asyncio.sleep(0.5) - - if not entered_cfg: - self.log.error("Failed to enter CFG mode (No ACK on CFG command)") - return False - - for cmd_str, timeout in commands: - cmd = cmd_str + "\n" - writer.write(cmd.encode('utf-8')) - await writer.drain() - try: - await self._wait_for_ack_token(reader, token="OK", timeout=timeout) - except asyncio.TimeoutError: - self.log.error(f"Timeout: Device did not ACK command '{cmd_str}'") - return False - except Exception as e: - self.log.error(f"Error while configuring: {e}") - return False - return True - - async def _wait_for_ack_token(self, reader, token, timeout): + async def _wait_for_prompt(self, reader, writer, timeout): end_time = time.time() + timeout + last_poke = time.time() + while time.time() < end_time: + # Poke 'enter' occasionally to solicit a prompt + if time.time() - last_poke > 1.0: + writer.write(b'\n') + await writer.drain() + last_poke = time.time() + try: - line_bytes = await asyncio.wait_for(reader.readline(), timeout=timeout) - line = line_bytes.decode('utf-8', errors='ignore').strip() - if token in line: return True - if "ERROR" in line: raise Exception(f"Device reported ERROR: {line}") - except asyncio.TimeoutError: - raise - raise asyncio.TimeoutError - - async def _wait_for_association(self, reader, writer): - """ - Polls 'iperf status' to check if the Static IP is active. - Drains buffer aggressively to avoid lag. - """ - # Timeout covers Association + Handshake time (30s is safe for 30 devices) - timeout = time.time() + 30 - last_poll = 0 - - while time.time() < timeout: - now = time.time() - # Send Poll every 2 seconds - if now - last_poll > 2.0: - try: - writer.write(b"iperf status\n") - await writer.drain() - last_poll = now - except Exception: - pass - - # Aggressively read all lines available in buffer - try: - # Read with a very short timeout to keep loop spinning - line_bytes = await asyncio.wait_for(reader.readline(), timeout=0.1) - line = line_bytes.decode('utf-8', errors='ignore').strip() - - if line: - # Check for spontaneous event or poll response - m1 = self.regex_got_ip.search(line) - m2 = self.regex_status_ip.search(line) - - found_ip = None - if m1: found_ip = m1.group(1) - if m2: found_ip = m2.group(1) - - if found_ip == self.target_ip: - return True - + line_bytes = await asyncio.wait_for(reader.read(1024), timeout=0.1) + output = line_bytes.decode('utf-8', errors='ignore') + if "esp32>" in output: + return True except asyncio.TimeoutError: continue - except Exception as e: - self.log.error(f"Assoc Wait Error: {e}") - return False - + except Exception: + break return False - # [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware as they were] - # (Rest of methods omitted for brevity as they are unchanged) + # [Keep _query_version, _identify_chip, _erase_flash, _flash_firmware AS IS] async def _query_version(self): try: reader, writer = await serial_asyncio.open_serial_connection(url=self.port, baudrate=115200) @@ -393,10 +280,8 @@ class UnifiedDeployWorker: return False async def _flash_firmware(self): - # Use the target determined securely in run(), or fallback to args if manual target_to_use = getattr(self, 'target_for_flash', self.args.target) - # Safety check: run() should have resolved 'auto' before calling this if target_to_use == 'auto': self.log.error("Logic Error: Target is still 'auto' inside flash firmware.") return False @@ -405,7 +290,6 @@ class UnifiedDeployWorker: firmware_dir = self.project_dir / "firmware" unique_app = None - # 1. Find the specific app binary for this config if firmware_dir.exists(): for f in os.listdir(firmware_dir): if f.endswith(f"_{suffix}.bin") and not f.startswith("bootloader") and not f.startswith("partition") and not f.startswith("ota_data") and not f.startswith("phy_init"): @@ -415,7 +299,6 @@ class UnifiedDeployWorker: self.log.error(f"Binary for config '{suffix}' not found in firmware/.") return False - # 2. Define paths for bootloader, partition table, etc. unique_boot = f"bootloader_{suffix}.bin" unique_part = f"partition-table_{suffix}.bin" unique_ota = f"ota_data_initial_{suffix}.bin" @@ -426,7 +309,6 @@ class UnifiedDeployWorker: return False try: - # 3. Parse flash_args to construct the exact esptool command with open(flash_args_path, 'r') as f: content = f.read().replace('\n', ' ').strip() raw_args = [x for x in content.split(' ') if x] @@ -448,7 +330,6 @@ class UnifiedDeployWorker: self.log.info(f"Flashing {firmware_dir / unique_app}...") - # 4. Execute Flashing proc = await asyncio.create_subprocess_exec(*cmd, cwd=self.project_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) @@ -642,13 +523,10 @@ async def run_deployment(args): for i, dev in enumerate(devs): raw_port_number = extract_device_number(dev.device) - # --- NEW LOGIC: Handling Zero-Based (ttyUSB) vs One-Based (esp_port) --- if args.ip_device_based: if "esp_port" in dev.device: - # 1-based naming (esp_port_1 -> Offset 0) offset = raw_port_number - 1 else: - # 0-based naming (ttyUSB0 -> Offset 0) offset = raw_port_number target_ip = str(start_ip + offset)