717 lines
29 KiB
Python
Executable File
717 lines
29 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
#System
|
|
import time
|
|
import sys
|
|
import os
|
|
from enum import IntEnum
|
|
|
|
#BrainStem
|
|
import brainstem
|
|
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
|
|
from brainstem.result import Result
|
|
|
|
#Local files
|
|
from argument_parser import *
|
|
|
|
#Generic Utilities
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities"))
|
|
sys.path.insert(0, parent_dir)
|
|
from brainstem_helpers import *
|
|
from poll_until import poll_until
|
|
|
|
|
|
#=============================================================================
|
|
# Device Speed
|
|
#=============================================================================
|
|
class DeviceSpeed(IntEnum):
|
|
"""USB device speed enumeration matching DeviceNode speed values."""
|
|
UNKNOWN = 0
|
|
LOW_SPEED = 1 # 1.5M
|
|
FULL_SPEED = 2 # 12M
|
|
HIGH_SPEED = 3 # 480M
|
|
SUPER_SPEED = 4 # 5G
|
|
SUPER_SPEED_PLUS = 5 # 10G
|
|
|
|
|
|
#=============================================================================
|
|
# Constants
|
|
#=============================================================================
|
|
DATA_RATE_CONFIGURATIONS = [
|
|
# USB 2.0 (HS) USB 3.0 (SS) Expected Speed Label
|
|
(_BS_C.usbsystemDataHSMaxDatarate_None, _BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus, DeviceSpeed.SUPER_SPEED_PLUS, "10G"),
|
|
(_BS_C.usbsystemDataHSMaxDatarate_None, _BS_C.usbsystemDataSSMaxDatarate_SuperSpeed, DeviceSpeed.SUPER_SPEED, "5G"),
|
|
(_BS_C.usbsystemDataHSMaxDatarate_HighSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.HIGH_SPEED, "480M"),
|
|
(_BS_C.usbsystemDataHSMaxDatarate_FullSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.FULL_SPEED, "12M"),
|
|
|
|
# At this time no Acroname products support this configuration. It exists for completeness.
|
|
(_BS_C.usbsystemDataHSMaxDatarate_LowSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.LOW_SPEED, "1.5M"),
|
|
]
|
|
|
|
# Errors that indicate a range/index error.
|
|
ALLOWED_RANGE_ERRORS = [
|
|
Result.INDEX_RANGE_ERROR,
|
|
Result.RANGE_ERROR,
|
|
Result.UNIMPLEMENTED_ERROR,
|
|
Result.PARAMETER_ERROR,
|
|
]
|
|
|
|
# Errors that indicate an unsupported configuration
|
|
UNSUPPORTED_CONFIG_ERRORS = [
|
|
Result.PARAMETER_ERROR,
|
|
Result.RANGE_ERROR,
|
|
Result.UNIMPLEMENTED_ERROR,
|
|
]
|
|
|
|
MAX_ACRONAME_PORTS = 30 # A value that is greater than any Acroname port count.
|
|
CONTROL_PORT_MIN_MICRO_VOLTS = 4000000 # Minimum voltage (μV) to consider control port connected
|
|
|
|
|
|
#=============================================================================
|
|
# Exit Codes
|
|
#=============================================================================
|
|
class ExitCode(IntEnum):
|
|
"""Exit codes for the Speed Test script."""
|
|
SUCCESS = 0
|
|
CONNECTION_ERROR = 1
|
|
CAPABILITY_CHECK_FAILED = 2
|
|
CONFIGURATION_ERROR = 3
|
|
TEST_FAILED = 4
|
|
UNKNOWN = 255
|
|
|
|
|
|
EXIT_CODE_DESCRIPTIONS = {
|
|
ExitCode.SUCCESS: "All speed tests passed",
|
|
ExitCode.CONNECTION_ERROR: "Failed to connect to device",
|
|
ExitCode.CAPABILITY_CHECK_FAILED: "Device does not support required capabilities",
|
|
ExitCode.CONFIGURATION_ERROR: "Failed to configure hub data rate settings",
|
|
ExitCode.TEST_FAILED: "One or more speed tests failed",
|
|
ExitCode.UNKNOWN: "Unknown error occurred",
|
|
}
|
|
|
|
|
|
class ProgramExit(Exception):
|
|
"""
|
|
Custom exception to signal program exit with a specific code.
|
|
|
|
This exception is used instead of sys.exit() to allow context managers
|
|
to properly clean up before the program exits.
|
|
"""
|
|
def __init__(self, code, message=None):
|
|
self.code = code
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
|
|
def exit_with_code(code, additional_message=None):
|
|
"""Print exit code message and raise ProgramExit exception.
|
|
|
|
Raises ProgramExit instead of calling sys.exit() directly to ensure
|
|
context managers can properly clean up.
|
|
"""
|
|
if isinstance(code, ExitCode):
|
|
message = "EXIT CODE %d: %s" % (code.value, EXIT_CODE_DESCRIPTIONS[code])
|
|
if additional_message:
|
|
message += " - %s" % additional_message
|
|
print(message, file=sys.stderr)
|
|
raise ProgramExit(code, message)
|
|
else:
|
|
print("EXIT CODE %d: Unknown exit code" % code, file=sys.stderr)
|
|
raise ProgramExit(code, additional_message)
|
|
|
|
|
|
#=============================================================================
|
|
# Configuration Result
|
|
#=============================================================================
|
|
class ConfigResult(IntEnum):
|
|
"""Result of attempting to configure a data rate setting."""
|
|
SUCCESS = 0
|
|
UNSUPPORTED = 1 # Device does not support this configuration
|
|
ERROR = 2 # Other error occurred
|
|
|
|
|
|
|
|
def check_device_capability(stem, test_ports=None):
|
|
"""
|
|
Checks to see if the device can execute the required commands for the test.
|
|
Uses hasUEI to verify the device supports the necessary operations.
|
|
|
|
Args:
|
|
stem: The BrainStem module connection
|
|
test_ports: List of ports to validate. If None, only checks USBSystem capabilities.
|
|
"""
|
|
#---------------------------------------------------------------------------------
|
|
#cmdUSBSYSTEM Checks
|
|
#---------------------------------------------------------------------------------
|
|
#Check how many USBSystem Entities this device has.
|
|
result = stem.classQuantity(_BS_C.cmdUSBSYSTEM)
|
|
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdUSBSYSTEM"):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdUSBSYSTEM class quantity")
|
|
|
|
if result.value < 1:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "Device does not have USBSystem capabilities")
|
|
|
|
#Check that we can set HS max data rate
|
|
result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemDataHSMaxDatarate, 0, (_BS_C.ueiOPTION_SET))
|
|
if basic_error_handling(stem, result, "Device does not support setting HS max data rate"):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "setDataHSMaxDatarate")
|
|
|
|
#Check that we can set SS max data rate
|
|
result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemDataSSMaxDatarate, 0, (_BS_C.ueiOPTION_SET))
|
|
if basic_error_handling(stem, result, "Device does not support setting SS max data rate"):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "setDataSSMaxDatarate")
|
|
#---------------------------------------------------------------------------------
|
|
|
|
#---------------------------------------------------------------------------------
|
|
#cmdPORT Checks
|
|
#---------------------------------------------------------------------------------
|
|
#Check how many Port Entities this device has.
|
|
result = stem.classQuantity(_BS_C.cmdPORT)
|
|
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdPORT"):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdPORT class quantity")
|
|
|
|
# If no test_ports provided, skip per-port validation
|
|
if test_ports is None:
|
|
return
|
|
|
|
#Check that each test port is valid
|
|
for port in test_ports:
|
|
if port >= result.value:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
|
|
"Port %d is out of range. Device has %d ports." % (port, result.value))
|
|
|
|
#Check that we can enable/disable the port
|
|
result_uei = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portPortEnabled, port, (_BS_C.ueiOPTION_SET))
|
|
if basic_error_handling(stem, result_uei, "Cannot enable/disable port %d" % port):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d setEnabled" % port)
|
|
|
|
#Check that we can get data speed
|
|
result_uei = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portDataSpeed, port, (_BS_C.ueiOPTION_GET))
|
|
if basic_error_handling(stem, result_uei, "Cannot get data speed on port %d" % port):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getDataSpeed" % port)
|
|
#---------------------------------------------------------------------------------
|
|
|
|
|
|
def verify_control_port_connected(stem):
|
|
"""
|
|
Verify that the control port is connected with adequate voltage.
|
|
|
|
Finds the port with data role portDataRole_Control_Value and checks
|
|
that it has voltage > CONTROL_PORT_MIN_MICRO_VOLTS.
|
|
|
|
Exits with error if control port is not found or voltage is too low.
|
|
"""
|
|
print("\n--- Verifying Control Port Connection ---")
|
|
|
|
control_port_idx = None
|
|
|
|
# Find the control port
|
|
for port_idx in range(MAX_ACRONAME_PORTS):
|
|
port_entity = brainstem.entity.Port(stem, port_idx)
|
|
result = port_entity.getDataRole()
|
|
|
|
# Stop if we've exceeded valid ports
|
|
if result.error in ALLOWED_RANGE_ERRORS:
|
|
break
|
|
|
|
if basic_error_handling(stem, result, "Error getting data role for port %d" % port_idx):
|
|
continue
|
|
|
|
if result.value == _BS_C.portDataRole_Control_Value:
|
|
control_port_idx = port_idx
|
|
print("Found control port: Port %d" % port_idx)
|
|
break
|
|
|
|
if control_port_idx is None:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No control port found on device")
|
|
|
|
# Check voltage on control port
|
|
port_entity = brainstem.entity.Port(stem, control_port_idx)
|
|
result = port_entity.getVbusVoltage()
|
|
|
|
if basic_error_handling(stem, result, "Error getting voltage for control port %d" % control_port_idx):
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "Cannot read control port voltage")
|
|
|
|
if result.value < CONTROL_PORT_MIN_MICRO_VOLTS:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "The control port does not appear to be connected. Please connect it and try again." )
|
|
|
|
|
|
def validate_ports_not_upstream(stem, ports):
|
|
"""
|
|
Validate that none of the specified ports are upstream ports.
|
|
|
|
Exits with error if any port is an upstream port.
|
|
|
|
Args:
|
|
stem: The BrainStem module connection
|
|
ports: List of port indices to validate
|
|
"""
|
|
for port in ports:
|
|
port_entity = brainstem.entity.Port(stem, port)
|
|
result = port_entity.getDataRole()
|
|
|
|
if basic_error_handling(stem, result, "Warning: Could not get data role for port %d" % port):
|
|
continue
|
|
|
|
if result.value == _BS_C.portDataRole_Upstream_Value:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
|
|
"Port %d is an upstream port and cannot be tested" % port)
|
|
|
|
print("Port %d: Data role validated (role=%d)" % (port, result.value))
|
|
|
|
|
|
def discover_downstream_ports(stem):
|
|
"""
|
|
Discover all downstream ports on the hub by iterating through ports
|
|
and checking their data role.
|
|
|
|
Iterates from port 0 up to MAX_ACRONAME_PORTS, stopping when an index
|
|
out of range error is encountered.
|
|
|
|
Returns:
|
|
List of port indices that are downstream ports.
|
|
"""
|
|
downstream_ports = []
|
|
for port_idx in range(MAX_ACRONAME_PORTS):
|
|
try:
|
|
port_entity = brainstem.entity.Port(stem, port_idx)
|
|
result = port_entity.getDataRole()
|
|
|
|
# Check if this is an index out of range error (indicates we've exceeded valid ports)
|
|
if result.error in ALLOWED_RANGE_ERRORS:
|
|
print("Port discovery complete: found %d total ports" % port_idx)
|
|
break
|
|
|
|
# Handle other errors
|
|
if basic_error_handling(stem, result, "Warning: Error getting data role for port %d" % port_idx):
|
|
continue
|
|
|
|
if result.value == _BS_C.portDataRole_Downstream_Value:
|
|
downstream_ports.append(port_idx)
|
|
print("Port %d: Downstream" % port_idx)
|
|
else:
|
|
print("Port %d: Not downstream (role=%d)" % (port_idx, result.value))
|
|
|
|
except Exception as e:
|
|
# Catch any unexpected exceptions and stop iteration
|
|
print("Port discovery stopped at port %d: %s" % (port_idx, str(e)))
|
|
break
|
|
|
|
print("Discovered %d downstream ports: %s" % (len(downstream_ports), downstream_ports))
|
|
return downstream_ports
|
|
|
|
|
|
def filter_ports_with_devices(stem, ports):
|
|
"""
|
|
Filter the list of ports to only include those with connected devices.
|
|
|
|
Checks each port's data speed - if it has a value (> 0), a device is enumerated.
|
|
Also captures the maximum speed each device is capable of.
|
|
|
|
Returns:
|
|
tuple: (list of port indices with devices, dict mapping port -> max DeviceSpeed)
|
|
"""
|
|
ports_with_devices = []
|
|
device_max_speeds = {}
|
|
|
|
for port in ports:
|
|
port_entity = brainstem.entity.Port(stem, port)
|
|
result = port_entity.getDataSpeed()
|
|
|
|
if basic_error_handling(stem, result, "Port %d: Error getting data speed" % port):
|
|
continue
|
|
|
|
if result.value > 0:
|
|
speed = decode_data_speed(result.value)
|
|
print("Port %d: Device connected at %s" % (port, speed.name))
|
|
ports_with_devices.append(port)
|
|
device_max_speeds[port] = speed
|
|
else:
|
|
print("Port %d: No device enumerated" % port)
|
|
|
|
print("Found %d ports with devices: %s" % (len(ports_with_devices), ports_with_devices))
|
|
return ports_with_devices, device_max_speeds
|
|
|
|
|
|
def decode_data_speed(speed_value):
|
|
"""
|
|
Decode raw port data speed bits into a DeviceSpeed enum.
|
|
|
|
The raw speed value is a bitmask from the hub's getDataSpeed() call.
|
|
This function translates it to the DeviceNode speed enumeration format.
|
|
"""
|
|
# Test USB 3.0 first so that we find the highest speed first.
|
|
# USB Hubs enumerate at both USB 2.0 and USB 3.0 speeds.
|
|
if speed_value & (1 << _BS_C.portDataSpeed_Connected_3p0_Bit):
|
|
print("Device enumerated as USB 3.0")
|
|
|
|
if speed_value & (1 << _BS_C.portDataSpeed_ss_5G_Bit):
|
|
print("Data speed is 5G")
|
|
return DeviceSpeed.SUPER_SPEED
|
|
elif speed_value & (1 << _BS_C.portDataSpeed_ss_10G_Bit):
|
|
print("Data speed is 10G")
|
|
return DeviceSpeed.SUPER_SPEED_PLUS
|
|
|
|
elif speed_value & (1 << _BS_C.portDataSpeed_Connected_2p0_Bit):
|
|
print("Device enumerated as USB 2.0")
|
|
|
|
if speed_value & (1 << _BS_C.portDataSpeed_ls_1p5M_Bit):
|
|
print("Data speed is 1.5M")
|
|
return DeviceSpeed.LOW_SPEED
|
|
elif speed_value & (1 << _BS_C.portDataSpeed_fs_12M_Bit):
|
|
print("Data speed is 12M")
|
|
return DeviceSpeed.FULL_SPEED
|
|
elif speed_value & (1 << _BS_C.portDataSpeed_hs_480M_Bit):
|
|
print("Data speed is 480M")
|
|
return DeviceSpeed.HIGH_SPEED
|
|
|
|
print("Device enumerated as unknown")
|
|
return DeviceSpeed.UNKNOWN
|
|
|
|
|
|
def compare_device_speeds(hub_speed, os_speed, expected_speed):
|
|
"""
|
|
Compare the speed reported by the hub vs the OS vs the expected speed.
|
|
|
|
Returns:
|
|
True if all speeds match expected, False otherwise.
|
|
"""
|
|
if os_speed == DeviceSpeed.UNKNOWN:
|
|
print("Cannot compare: OS speed not available.")
|
|
return False
|
|
|
|
if hub_speed != os_speed:
|
|
print("FAIL: Hub/OS mismatch - Hub: %s, OS: %s" % (hub_speed.name, os_speed.name))
|
|
return False
|
|
|
|
if hub_speed != expected_speed:
|
|
print("FAIL: Speed mismatch - Got: %s, Expected: %s" % (hub_speed.name, expected_speed.name))
|
|
return False
|
|
|
|
print("PASS: Device speed matches expected: %s" % (expected_speed.name))
|
|
return True
|
|
|
|
|
|
def get_device_speed_from_os(port, serial_number=0, timeout=8.0):
|
|
"""
|
|
Get the device speed as reported by the OS for a specific port.
|
|
|
|
Polls the hub until a device enumerates or timeout is reached.
|
|
"""
|
|
def check_devices():
|
|
# Note: getDownstreamDevices() is a static function and does not require a stem connection;
|
|
# however, it only returns devices which are physically connected to Acroname hubs.
|
|
devices = brainstem.discover.getDownstreamDevices()
|
|
for device in devices.value:
|
|
if device.hub_port != port:
|
|
continue
|
|
if serial_number != 0 and device.hub_serial_number != serial_number:
|
|
continue
|
|
return True, DeviceSpeed(device.speed)
|
|
return False, DeviceSpeed.UNKNOWN
|
|
|
|
result = poll_until(check_devices, timeout=timeout, interval=0.3)
|
|
|
|
if result.timed_out:
|
|
print("Timeout waiting for OS to enumerate device")
|
|
return DeviceSpeed.UNKNOWN
|
|
|
|
return result.value
|
|
|
|
|
|
def prepare_hub_max_datarate_configuration(stem, ports, hs_speed, ss_speed, settle_time=0.5):
|
|
"""
|
|
Prepare the hub for a speed test by configuring data rate limits.
|
|
|
|
Disables all specified ports, configures the HS/SS max data rates, then re-enables all ports.
|
|
Since max data rate settings are hub-wide, all ports must be cycled together.
|
|
|
|
Returns:
|
|
ConfigResult.SUCCESS: Configuration applied successfully
|
|
ConfigResult.UNSUPPORTED: Device does not support this data rate configuration
|
|
ConfigResult.ERROR: Other error occurred
|
|
"""
|
|
usb_system = brainstem.entity.USBSystem(stem, 0)
|
|
# Disable all ports
|
|
for port in ports:
|
|
port_entity = brainstem.entity.Port(stem, port)
|
|
err = port_entity.setEnabled(False)
|
|
if basic_error_handling(stem, err, "Failed to disable port %d" % port):
|
|
return ConfigResult.ERROR
|
|
|
|
# Configure hub-wide max HS data rates
|
|
err = usb_system.setDataHSMaxDatarate(hs_speed)
|
|
if err in UNSUPPORTED_CONFIG_ERRORS:
|
|
print("Configuration not supported by device (HS): %s" % Result.getErrorText(err))
|
|
# Re-enable ports before returning
|
|
for port in ports:
|
|
brainstem.entity.Port(stem, port).setEnabled(True)
|
|
return ConfigResult.UNSUPPORTED
|
|
if basic_error_handling(stem, err, "Failed to set HS max data rate"):
|
|
return ConfigResult.ERROR
|
|
|
|
# Configure hub-wide max SS data rates
|
|
err = usb_system.setDataSSMaxDatarate(ss_speed)
|
|
if err in UNSUPPORTED_CONFIG_ERRORS:
|
|
print("Configuration not supported by device (SS): %s" % Result.getErrorText(err))
|
|
# Re-enable ports before returning
|
|
for port in ports:
|
|
brainstem.entity.Port(stem, port).setEnabled(True)
|
|
return ConfigResult.UNSUPPORTED
|
|
if basic_error_handling(stem, err, "Failed to set SS max data rate"):
|
|
return ConfigResult.ERROR
|
|
|
|
# Re-enable all ports
|
|
for port in ports:
|
|
port_entity = brainstem.entity.Port(stem, port)
|
|
err = port_entity.setEnabled(True)
|
|
if basic_error_handling(stem, err, "Failed to enable port %d" % port):
|
|
return ConfigResult.ERROR
|
|
|
|
time.sleep(settle_time) # Allow time for devices to enumerate.
|
|
|
|
return ConfigResult.SUCCESS
|
|
|
|
|
|
def get_device_speed_from_hub(stem, port, timeout=5.0, max_retries=5):
|
|
"""
|
|
Get the device speed as reported by the hub for a specific port.
|
|
|
|
Polls the hub until a device enumerates or timeout/max retries is reached.
|
|
"""
|
|
retry_count = 0 # Will be used as a "nonlocal" variable.
|
|
port_entity = brainstem.entity.Port(stem, port)
|
|
|
|
def check_speed():
|
|
nonlocal retry_count # Uses scope variable to keep state through multiple calls.
|
|
speed_result = port_entity.getDataSpeed()
|
|
|
|
if speed_result.error:
|
|
print("Error getting data speed: %s - retry %d" % (speed_result.error, retry_count))
|
|
retry_count += 1
|
|
if retry_count >= max_retries:
|
|
return True, speed_result # Stop polling due to max retries
|
|
return False, speed_result
|
|
elif speed_result.value > 0:
|
|
print("Device enumeration detected")
|
|
return True, speed_result
|
|
return False, speed_result
|
|
|
|
result = poll_until(check_speed, timeout=timeout, interval=0.3)
|
|
|
|
if result.timed_out:
|
|
print("Timeout waiting for device to enumerate")
|
|
|
|
speed_value = result.value.value if result.value else 0
|
|
return decode_data_speed(speed_value)
|
|
|
|
|
|
def print_results_table(results, test_ports, speed_labels):
|
|
"""
|
|
Print a summary table of test results.
|
|
|
|
Results can be:
|
|
True - Test passed
|
|
False - Test failed
|
|
None - Configuration not supported by device (N/A)
|
|
"""
|
|
# Calculate column widths
|
|
port_col_width = 6 # "Port X"
|
|
speed_col_width = max(len(label) for label in speed_labels) + 2
|
|
|
|
# Print header
|
|
print("\n" + "=" * 60)
|
|
print("TEST RESULTS SUMMARY")
|
|
print("=" * 60)
|
|
|
|
# Print column headers
|
|
header = "Port".ljust(port_col_width) + " | "
|
|
header += " | ".join(label.center(speed_col_width) for label in speed_labels)
|
|
print(header)
|
|
print("-" * len(header))
|
|
|
|
# Print each row (port)
|
|
all_passed = True
|
|
for port in test_ports:
|
|
row = ("%s" % port).ljust(port_col_width) + " | "
|
|
cells = []
|
|
for label in speed_labels:
|
|
passed = results.get((port, label), "--")
|
|
if passed is True:
|
|
cells.append("PASS".center(speed_col_width))
|
|
elif passed is False:
|
|
cells.append("FAIL".center(speed_col_width))
|
|
all_passed = False
|
|
elif passed is None:
|
|
# Configuration not supported by device - not a failure
|
|
cells.append("N/A".center(speed_col_width))
|
|
else:
|
|
cells.append("--".center(speed_col_width))
|
|
row += " | ".join(cells)
|
|
print(row)
|
|
|
|
print("=" * 60)
|
|
|
|
# Print overall result
|
|
if all_passed:
|
|
print("OVERALL: ALL TESTS PASSED")
|
|
else:
|
|
print("OVERALL: SOME TESTS FAILED")
|
|
print("=" * 60)
|
|
|
|
return all_passed
|
|
|
|
|
|
#Context manager that handles cleanup of the stem.
|
|
class CLI_Manager:
|
|
def __init__(self):
|
|
self.stem = None # brainstem.module.Module object
|
|
self.test_ports = None
|
|
self.device_max_speeds = {} # Maps port -> max DeviceSpeed
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if self.stem and self.test_ports:
|
|
# Restore hub to default configuration before disconnecting
|
|
print("\n--- Restoring hub to default configuration ---")
|
|
prepare_hub_max_datarate_configuration(
|
|
self.stem,
|
|
self.test_ports,
|
|
_BS_C.usbsystemDataHSMaxDatarate_HighSpeed,
|
|
_BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus
|
|
)
|
|
|
|
if self.stem:
|
|
print("\n--- Disconnecting from device ---")
|
|
self.stem.disconnect()
|
|
self.stem = None
|
|
|
|
return False # Ensure exception propagates
|
|
|
|
|
|
def main(argv):
|
|
exit_code = ExitCode.UNKNOWN
|
|
|
|
try:
|
|
print("Provided Arguments:")
|
|
print(argv)
|
|
arg_parser = CustomArgumentParser(argv)
|
|
|
|
with CLI_Manager() as cli:
|
|
#Setup
|
|
#/////////////////////////////////////////////////////////////////////
|
|
#---------------------------------------------------------------------
|
|
# Note: This code uses the base Module class instead of a specific device type.
|
|
# The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc.
|
|
# This allows our code to be more generic; however, we don't really know what we are or what
|
|
# we are capable of so we must do a handful of capability checks.
|
|
|
|
# 1. Connect to device
|
|
cli.stem = create_and_connect_stem(arg_parser.sn)
|
|
if cli.stem is None:
|
|
exit_with_code(ExitCode.CONNECTION_ERROR, "Serial: 0x%08X" % arg_parser.sn if arg_parser.sn else "first found")
|
|
|
|
# 2. Check basic device capabilities before calling any other APIs
|
|
check_device_capability(cli.stem)
|
|
|
|
# 3. Verify control port is connected before proceeding
|
|
verify_control_port_connected(cli.stem)
|
|
|
|
# 4. Determine candidate ports based on mode
|
|
if arg_parser.automatic:
|
|
print("\n--- Automatic Port Discovery ---")
|
|
candidate_ports = discover_downstream_ports(cli.stem)
|
|
if not candidate_ports:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No downstream ports found on device")
|
|
else:
|
|
# Manual mode - validate specified ports aren't upstream
|
|
validate_ports_not_upstream(cli.stem, arg_parser.test_ports)
|
|
candidate_ports = arg_parser.test_ports
|
|
|
|
# 5. Ensure ports are enabled and the max data rates are set.
|
|
print("\n--- Resetting hub to default configuration ---")
|
|
prepare_hub_max_datarate_configuration(
|
|
cli.stem,
|
|
candidate_ports,
|
|
_BS_C.usbsystemDataHSMaxDatarate_HighSpeed,
|
|
_BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus,
|
|
settle_time=3.0 # Some device might need more or less time to enumerate.
|
|
)
|
|
|
|
# 6. Filter to ports with connected devices (common to both modes)
|
|
ports_with_devices, device_max_speeds = filter_ports_with_devices(cli.stem, candidate_ports)
|
|
if not ports_with_devices:
|
|
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No devices connected to downstream ports")
|
|
|
|
cli.test_ports = ports_with_devices
|
|
cli.device_max_speeds = device_max_speeds
|
|
|
|
# 7. Validate capabilities for the specific test ports
|
|
check_device_capability(cli.stem, cli.test_ports)
|
|
|
|
print("\n--- Setup complete: testing ports %s ---" % cli.test_ports)
|
|
#---------------------------------------------------------------------
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
#Work
|
|
#/////////////////////////////////////////////////////////////////////
|
|
# Track test results: (port, speed_label) -> True/False/None
|
|
# None indicates the configuration is unsupported by the device
|
|
results = {}
|
|
speed_labels = [config[3] for config in DATA_RATE_CONFIGURATIONS]
|
|
|
|
# Loop through the data rate configurations
|
|
for hs_speed, ss_speed, expected_speed, speed_label in DATA_RATE_CONFIGURATIONS:
|
|
config_result = prepare_hub_max_datarate_configuration(cli.stem, cli.test_ports, hs_speed, ss_speed)
|
|
|
|
if config_result == ConfigResult.ERROR:
|
|
exit_with_code(ExitCode.CONFIGURATION_ERROR, "Failed to prepare hub for %s" % speed_label)
|
|
|
|
if config_result == ConfigResult.UNSUPPORTED:
|
|
# Mark all ports as unsupported for this configuration
|
|
print("--- Skipping %s tests (configuration not supported by BrainStem device) ---" % speed_label)
|
|
for port in cli.test_ports:
|
|
results[(port, speed_label)] = None # None = unsupported/N/A
|
|
continue
|
|
|
|
# Test each port with the current configuration
|
|
for port in cli.test_ports:
|
|
# Check if the device is capable of the expected speed
|
|
if port in cli.device_max_speeds:
|
|
device_max = cli.device_max_speeds[port]
|
|
if expected_speed > device_max:
|
|
print("\n--- Skipping Port %d @ %s (device max speed is %s) ---" % (port, speed_label, device_max.name))
|
|
results[(port, speed_label)] = None # None = N/A
|
|
continue
|
|
|
|
print("\n--- Testing Port %d @ %s (expecting %s) ---" % (port, speed_label, expected_speed.name))
|
|
|
|
hub_speed = get_device_speed_from_hub(cli.stem, port)
|
|
os_speed = get_device_speed_from_os(port, arg_parser.sn)
|
|
|
|
success = compare_device_speeds(hub_speed, os_speed, expected_speed)
|
|
results[(port, speed_label)] = success
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
# Print summary table
|
|
all_passed = print_results_table(results, cli.test_ports, speed_labels)
|
|
|
|
if all_passed:
|
|
exit_code = ExitCode.SUCCESS
|
|
else:
|
|
exit_code = ExitCode.TEST_FAILED
|
|
|
|
except ProgramExit as e:
|
|
exit_code = e.code
|
|
|
|
# Exit at the end, after context manager cleanup is complete
|
|
return exit_code
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(sys.argv))
|
|
|