UmberHubManager/api/examples/python/rdo_test_cli/rdo_test_cli.py

589 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
#System
import time
import sys
import os
from enum import IntEnum
#BrainStem
import brainstem
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
from brainstem.result import Result
#Local files
from argument_parser import *
#Generic Utilities
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities"))
sys.path.insert(0, parent_dir)
from brainstem_helpers import *
from poll_until import poll_until
#Power Delivery Utilities
pd_dir = os.path.abspath(os.path.join(script_dir, "../utilities/powerdelivery"))
sys.path.insert(0, pd_dir)
from bs_pd_packet_filtering import (
validate_rdo_handshake,
clear_logger_packets,
)
#=============================================================================
# Constants
#=============================================================================
BAIL_COUNTER = 20
VOLTAGE_TOLERANCE_PERCENT = 5 # USB PD spec allows ±5% for fixed PDOs
MAX_TEST_RETRIES = 1 # Number of retries for failed tests
#=============================================================================
# Exit Codes
#=============================================================================
class ExitCode(IntEnum):
"""Exit codes for the RDO Test CLI script."""
SUCCESS = 0
CONNECTION_ERROR = 1
CAPABILITY_CHECK_FAILED = 2
PDO_TEST_FAILED = 3
RDO_TEST_FAILED = 4
PORT_DISABLE_FAILED = 5
PORT_ENABLE_FAILED = 6
UNKNOWN = 255
EXIT_CODE_DESCRIPTIONS = {
ExitCode.SUCCESS: "All tests passed",
ExitCode.CONNECTION_ERROR: "Failed to connect to device",
ExitCode.CAPABILITY_CHECK_FAILED: "Device does not support required capabilities",
ExitCode.PDO_TEST_FAILED: "One or more PDO tests failed",
ExitCode.RDO_TEST_FAILED: "One or more RDO tests failed",
ExitCode.PORT_DISABLE_FAILED: "Failed to disable port",
ExitCode.PORT_ENABLE_FAILED: "Failed to enable port",
ExitCode.UNKNOWN: "Unknown error occurred",
}
class ProgramExit(Exception):
"""
Custom exception to signal program exit with a specific code.
This exception is used instead of sys.exit() to allow context managers
to properly clean up before the program exits.
"""
def __init__(self, code, message=None):
self.code = code
self.message = message
super().__init__(message)
def exit_with_code(code, additional_message=None):
"""Print exit code message and raise ProgramExit exception.
Raises ProgramExit instead of calling sys.exit() directly to ensure
context managers can properly clean up.
"""
if isinstance(code, ExitCode):
message = "EXIT CODE %d: %s" % (code.value, EXIT_CODE_DESCRIPTIONS[code])
if additional_message:
message += " - %s" % additional_message
print(message, file=sys.stderr)
raise ProgramExit(code, message)
else:
print("EXIT CODE %d: Unknown exit code" % code, file=sys.stderr)
raise ProgramExit(code, additional_message)
#=============================================================================
# Device Capability Check
#=============================================================================
def verify_test_port(stem, port_index):
"""
Verify that the test port is valid for RDO testing.
Checks:
- Port is a downstream port (not upstream/control)
- Port is not the input power source
Exits with error if validation fails.
"""
# Check that the port is downstream
port_entity = brainstem.entity.Port(stem, port_index)
result = port_entity.getDataRole()
if basic_error_handling(stem, result, "Could not get data role for port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "getDataRole port %d" % port_index)
if result.value != _BS_C.portDataRole_Downstream_Value:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is not a downstream port (role=%d) and cannot be tested" % (port_index, result.value))
print("Port %d: Data role validated as downstream" % port_index)
# Check that the port is not the input power source
system = brainstem.entity.System(stem, 0)
result = system.getInputPowerSource()
if basic_error_handling(stem, result, "Could not get input power source"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "getInputPowerSource")
if result.value == port_index:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is the input power source and cannot be tested." % port_index)
print("Port %d: Verified not the input power source (power source=%d)" % (port_index, result.value))
#=============================================================================
# Test Functions
#=============================================================================
def test_voltage(stem, port_index, expected_voltage):
"""
Checks the current voltage vs the expected voltage using VOLTAGE_TOLERANCE_PERCENT.
Returns:
tuple: (passed: bool, actual_voltage: int, voltage_min: int, voltage_max: int)
"""
port_entity = brainstem.entity.Port(stem, port_index)
voltage_result = port_entity.getVbusVoltage()
if voltage_result.error:
return (False, 0, 0, 0)
# Calculate tolerance based on percentage of expected voltage
tolerance = expected_voltage * VOLTAGE_TOLERANCE_PERCENT / 100
voltage_min = int(expected_voltage - tolerance)
voltage_max = int(expected_voltage + tolerance)
passed = voltage_min <= voltage_result.value <= voltage_max
return (passed, voltage_result.value, voltage_min, voltage_max)
def wait_for_voltage_to_establish(stem, port_index, expected_voltage):
"""Polls until voltage reaches expected value within tolerance."""
def check_voltage():
passed, actual, v_min, v_max = test_voltage(stem, port_index, expected_voltage)
return (passed, (actual, v_min, v_max))
result = poll_until(check_voltage, timeout=BAIL_COUNTER * 0.2, interval=0.2)
actual, v_min, v_max = result.value if result.value else (0, 0, 0)
if result.timed_out:
print("Voltage out of range: %duV (expected: %duV ±%d%% = %duV to %duV)" % (
actual, expected_voltage, VOLTAGE_TOLERANCE_PERCENT, v_min, v_max))
return 1
print("Voltage: %duV (expected: %duV ±%d%% = %duV to %duV)" % (
actual, expected_voltage, VOLTAGE_TOLERANCE_PERCENT, v_min, v_max))
return 0
def is_pd_established(stem, port_index):
"""
Check if PD communication has been established by confirming a valid
RDO exists (either local when sinking or remote when sourcing).
"""
pd_entity = brainstem.entity.PowerDelivery(stem, port_index)
#Local RDO - This will be set if the port is SINKING power.
rdo_local = pd_entity.getRequestDataObject(_BS_C.powerdeliveryPartnerLocal)
if not basic_error_handling(stem, rdo_local) and rdo_local.value != 0:
return 0
#Remote RDO - This will be set if the port is SOURCING power.
rdo_remote = pd_entity.getRequestDataObject(_BS_C.powerdeliveryPartnerRemote)
if not basic_error_handling(stem, rdo_remote) and rdo_remote.value != 0:
return 0
return 1
def wait_for_pd_to_establish(stem, port_index):
"""Polls until PD communication is established or timeout."""
def check_pd():
error = is_pd_established(stem, port_index)
return (error == 0, error)
result = poll_until(check_pd, timeout=BAIL_COUNTER * 0.2, interval=0.2)
if result.timed_out:
return 1
return 0
def wait_for_rdo_to_set(stem, port_index, rdo):
"""Wait for RDO to be set and verified successfully."""
pd_entity = brainstem.entity.PowerDelivery(stem, port_index)
def set_rdo():
err_result = pd_entity.setRequestDataObject(rdo)
return (not err_result, err_result)
result = poll_until(set_rdo, timeout=BAIL_COUNTER * 0.1, interval=0.1)
if result.timed_out:
return 1
return 0
def test_rdo(stem, logger, port_index, rdo, expected_voltage):
"""Test a specific RDO configuration."""
pd_entity = brainstem.entity.PowerDelivery(stem, port_index)
err_result = pd_entity.setPowerRole(_BS_C.powerdeliveryPowerRoleSink)
if basic_error_handling(stem, err_result, "Failed to set Power Role"):
return 1
# Clear logger of any stale PD packets before starting the RDO transaction
if logger:
clear_logger_packets(logger)
error = wait_for_rdo_to_set(stem, port_index, rdo)
if error:
print("Failed to set RDO %08X" % rdo)
return 1
# If the user has the PD Logging software feature we can confirm that the RDO was accepted
# by inspecting the PD logging traffic and validating the specific RDO value was sent.
if logger:
error = validate_rdo_handshake(logger, rdo)
if error == 1:
print("RDO was rejected by the power supply")
return 1
elif error == 2:
print("Timeout waiting for power supply ready")
return 1
error = wait_for_pd_to_establish(stem, port_index)
if error:
print("Failed to establish PD connection")
return 1
error = wait_for_voltage_to_establish(stem, port_index, expected_voltage)
if error:
print("Voltage is not as expected")
return 1
#TODO: Test loading of the RDO via External Load
# Enable Rail for "port_index"
# Check for expected current.
# Disable Rail for "port_index"
return 0
def toggle_port(stem, port_index, settle_time=0.2):
"""Disable and re-enable a port to reset the connection."""
port_entity = brainstem.entity.Port(stem, port_index)
err = port_entity.setEnabled(False)
if basic_error_handling(stem, err, "Failed to disable port %d" % port_index):
exit_with_code(ExitCode.PORT_DISABLE_FAILED, "port %d" % port_index)
time.sleep(settle_time)
err = port_entity.setEnabled(True)
if basic_error_handling(stem, err, "Failed to enable port %d" % port_index):
exit_with_code(ExitCode.PORT_ENABLE_FAILED, "port %d" % port_index)
time.sleep(settle_time)
def test_dut_pdo(stem, port_index, pdo_index, expected_pdo):
"""Test a specific PDO value."""
pd_entity = brainstem.entity.PowerDelivery(stem, port_index)
result = pd_entity.getPowerDataObject(_BS_C.powerdeliveryPartnerRemote, _BS_C.powerdeliveryPowerRoleSource, pdo_index)
if basic_error_handling(stem, result, "Failed to get PDO %d on port %d" % (pdo_index, port_index)):
return 1
print("PDO:%d: 0x%08X : Expected PDO: 0x%08X" % (pdo_index, result.value, expected_pdo))
if result.value != expected_pdo:
print("FAIL - Unexpected Host PDO: 0x%08X : Expected PDO: 0x%08X" % (result.value, expected_pdo))
return 1
return 0
def run_with_retry(test_func, test_name, max_retries=MAX_TEST_RETRIES):
"""
Run a test function with optional retries.
Args:
test_func: Callable that returns 0 on success, non-zero on failure
test_name: Name of the test for logging
max_retries: Number of retries after initial failure (default: MAX_TEST_RETRIES)
Returns:
0 if test passed (on any attempt), non-zero if all attempts failed
"""
result = test_func()
if result == 0:
return 0
for retry in range(max_retries):
print(" Retry %d/%d: %s" % (retry + 1, max_retries, test_name))
result = test_func()
if result == 0:
return 0
return result
#=============================================================================
# CLI Manager - Context manager for cleanup
#=============================================================================
class CLI_Manager:
"""Context manager that handles cleanup of the stem and restores device state."""
def __init__(self, test_port):
self.stem = None # brainstem.module.Module object
self.test_port = test_port # Port index being tested
self.original_power_role = None # Original power role to restore on exit
self.logger = None # PDChannelLogger object
self.pd_logging_enabled = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.restore_port_power_role()
if self.logger:
self.logger.setEnabled(False)
self.logger = None
if self.stem:
self.stem.disconnect()
self.stem = None
return False # Ensure exception propagates
def check_device_capability(self):
"""
Checks to see if the device can execute the required commands for the test.
Uses hasUEI to verify the device supports the necessary operations.
Sets self.pd_logging_enabled based on device capabilities.
Uses self.test_port for the port index.
"""
stem = self.stem
port_index = self.test_port
#---------------------------------------------------------------------------------
#cmdPORT Checks
#---------------------------------------------------------------------------------
#Check how many Port Entities this device has.
result = stem.classQuantity(_BS_C.cmdPORT)
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdPORT"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdPORT class quantity")
if port_index >= result.value:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is out of range. Device has %d ports." % (port_index, result.value))
#Check that we can enable/disable the port
result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portPortEnabled, port_index, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result, "Cannot enable/disable port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d setEnabled" % port_index)
#Check that we can get vbus voltage
result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portVbusVoltage, port_index, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get vbus voltage on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getVbusVoltage" % port_index)
#Check that we can get data role
result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portDataRole, port_index, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get data role on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getDataRole" % port_index)
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
#cmdSYSTEM Checks
#---------------------------------------------------------------------------------
#Check that we can get input power source
result = stem.hasUEI(_BS_C.cmdSYSTEM, _BS_C.systemInputPowerSource, 0, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get input power source"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "getInputPowerSource")
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
#cmdPOWERDELIVERY Checks
#---------------------------------------------------------------------------------
#Check how many PowerDelivery Entities this device has.
result = stem.classQuantity(_BS_C.cmdPOWERDELIVERY)
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdPOWERDELIVERY"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdPOWERDELIVERY class quantity")
if port_index >= result.value:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is out of range for PowerDelivery. Device has %d PD ports." % (port_index, result.value))
#Check that we can get RDO's
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestDataObject, port_index, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get RDOs on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getRDO" % port_index)
#Check that we can set RDO's
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestDataObject, port_index, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result, "Cannot set RDOs on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d setRDO" % port_index)
#Check that we can get PDO's
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryPowerDataObject, port_index, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get PDOs on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getPDO" % port_index)
#Check that we can set power role
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryPowerRole, port_index, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result, "Cannot set power role on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d setPowerRole" % port_index)
#Check that we can get power role
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryPowerRole, port_index, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result, "Cannot get power role on port %d" % port_index):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getPowerRole" % port_index)
#Check that we can enable PD Logging
#In this example this is optional, but offers a more complete test.
pd = brainstem.entity.PowerDelivery(stem, port_index)
result =pd.get_UEI8(_BS_C.powerdeliveryLogEnable)
if result.error == Result.NO_ERROR:
self.pd_logging_enabled = True
else:
print("This device does not have the PD Logging software feature. That portion of this example will be bypassed.")
self.pd_logging_enabled = False
#---------------------------------------------------------------------------------
def save_port_power_role(self):
"""
Save the port's power role for later restoration.
Sets self.original_power_role.
Uses self.test_port for the port index.
"""
pd_entity = brainstem.entity.PowerDelivery(self.stem, self.test_port)
result = pd_entity.getPowerRole()
if basic_error_handling(self.stem, result, "Could not get current power role for port %d" % self.test_port):
print("Warning: Will not be able to restore power role on exit")
self.original_power_role = None
else:
self.original_power_role = result.value
print("Port %d: Saved original power role: %d" % (self.test_port, self.original_power_role))
def restore_port_power_role(self):
"""
Restore the port's power role that was saved by save_port_power_role.
Uses self.stem, self.test_port, and self.original_power_role.
"""
if self.stem is None or self.test_port is None or self.original_power_role is None:
return
print("\n--- Restoring port power role ---")
pd_entity = brainstem.entity.PowerDelivery(self.stem, self.test_port)
err = pd_entity.setPowerRole(self.original_power_role)
if basic_error_handling(self.stem, err, "Failed to restore power role"):
print("Warning: Failed to restore power role to %d" % self.original_power_role)
else:
print("Restored power role to %d" % self.original_power_role)
#=============================================================================
# Main
#=============================================================================
def main(argv):
exit_code = ExitCode.UNKNOWN
try:
print("Provided Arguments:")
print(argv)
arg_parser = CustomArgumentParser(argv)
with CLI_Manager(arg_parser.test_port) as cli:
#Setup
#/////////////////////////////////////////////////////////////////////
# Note: This code uses the base Module class instead of a specific device type.
# The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc.
# This allows our code to be more generic; however, we don't really know what we are or what
# we are capable of so we must do a handful of capability checks.
cli.stem = create_and_connect_stem(arg_parser.sn)
if cli.stem is None:
exit_with_code(ExitCode.CONNECTION_ERROR, "Serial: 0x%08X" % arg_parser.sn if arg_parser.sn else "first found")
cli.check_device_capability()
verify_test_port(cli.stem, cli.test_port)
cli.save_port_power_role()
if cli.pd_logging_enabled:
cli.logger = create_logger(cli.stem, cli.test_port) #Create Logger object for given port
#/////////////////////////////////////////////////////////////////////
#Work
#/////////////////////////////////////////////////////////////////////
expected_host_pdos = arg_parser.expected_host_pdos
expected_host_pdos_voltage = arg_parser.expected_host_pdos_voltage
expected_rdos = arg_parser.expected_rdos
pdo_failures = 0
rdo_failures = 0
print("\n--- Testing PDO's ---")
for x in range(len(expected_host_pdos)):
#+1 PDO start from index 1
result = test_dut_pdo(cli.stem, cli.test_port, x+1, expected_host_pdos[x])
if result:
print(" Error: testing PDO: %d - 0x%08X" % (x+1, expected_host_pdos[x]))
pdo_failures += 1
else:
print(" Success testing PDO: %d" % (x+1))
print("\n")
toggle_port(cli.stem, cli.test_port)
print("--- Testing RDO's ---")
for x in range(len(expected_rdos)):
rdo_index = x + 1 # RDO indices start from 1
test_name = "RDO %d (0x%08X)" % (rdo_index, expected_rdos[x])
# Some power supplies can't keep up with rapid cycling through RDO's.
result = run_with_retry(
lambda idx=x: test_rdo(cli.stem, cli.logger, cli.test_port, expected_rdos[idx], expected_host_pdos_voltage[idx]),
test_name
)
if result:
print(" Error: testing %s" % test_name)
rdo_failures += 1
else:
print(" Success testing RDO: %d" % rdo_index)
#/////////////////////////////////////////////////////////////////////
# Print summary
print("\n" + "=" * 60)
print("TEST RESULTS SUMMARY")
print("=" * 60)
print("PDO Tests: %d/%d passed" % (len(expected_host_pdos) - pdo_failures, len(expected_host_pdos)))
print("RDO Tests: %d/%d passed" % (len(expected_rdos) - rdo_failures, len(expected_rdos)))
print("=" * 60)
if pdo_failures > 0:
exit_code = ExitCode.PDO_TEST_FAILED
elif rdo_failures > 0:
exit_code = ExitCode.RDO_TEST_FAILED
else:
exit_code = ExitCode.SUCCESS
except ProgramExit as e:
exit_code = e.code
# Exit at the end, after context manager cleanup is complete
return exit_code
if __name__ == '__main__':
sys.exit(main(sys.argv))