UmberHubManager/api/examples/python/speed_test_cli/speed_test_cli.py

717 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
#System
import time
import sys
import os
from enum import IntEnum
#BrainStem
import brainstem
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
from brainstem.result import Result
#Local files
from argument_parser import *
#Generic Utilities
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities"))
sys.path.insert(0, parent_dir)
from brainstem_helpers import *
from poll_until import poll_until
#=============================================================================
# Device Speed
#=============================================================================
class DeviceSpeed(IntEnum):
"""USB device speed enumeration matching DeviceNode speed values."""
UNKNOWN = 0
LOW_SPEED = 1 # 1.5M
FULL_SPEED = 2 # 12M
HIGH_SPEED = 3 # 480M
SUPER_SPEED = 4 # 5G
SUPER_SPEED_PLUS = 5 # 10G
#=============================================================================
# Constants
#=============================================================================
DATA_RATE_CONFIGURATIONS = [
# USB 2.0 (HS) USB 3.0 (SS) Expected Speed Label
(_BS_C.usbsystemDataHSMaxDatarate_None, _BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus, DeviceSpeed.SUPER_SPEED_PLUS, "10G"),
(_BS_C.usbsystemDataHSMaxDatarate_None, _BS_C.usbsystemDataSSMaxDatarate_SuperSpeed, DeviceSpeed.SUPER_SPEED, "5G"),
(_BS_C.usbsystemDataHSMaxDatarate_HighSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.HIGH_SPEED, "480M"),
(_BS_C.usbsystemDataHSMaxDatarate_FullSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.FULL_SPEED, "12M"),
# At this time no Acroname products support this configuration. It exists for completeness.
(_BS_C.usbsystemDataHSMaxDatarate_LowSpeed, _BS_C.usbsystemDataSSMaxDatarate_None, DeviceSpeed.LOW_SPEED, "1.5M"),
]
# Errors that indicate a range/index error.
ALLOWED_RANGE_ERRORS = [
Result.INDEX_RANGE_ERROR,
Result.RANGE_ERROR,
Result.UNIMPLEMENTED_ERROR,
Result.PARAMETER_ERROR,
]
# Errors that indicate an unsupported configuration
UNSUPPORTED_CONFIG_ERRORS = [
Result.PARAMETER_ERROR,
Result.RANGE_ERROR,
Result.UNIMPLEMENTED_ERROR,
]
MAX_ACRONAME_PORTS = 30 # A value that is greater than any Acroname port count.
CONTROL_PORT_MIN_MICRO_VOLTS = 4000000 # Minimum voltage (μV) to consider control port connected
#=============================================================================
# Exit Codes
#=============================================================================
class ExitCode(IntEnum):
"""Exit codes for the Speed Test script."""
SUCCESS = 0
CONNECTION_ERROR = 1
CAPABILITY_CHECK_FAILED = 2
CONFIGURATION_ERROR = 3
TEST_FAILED = 4
UNKNOWN = 255
EXIT_CODE_DESCRIPTIONS = {
ExitCode.SUCCESS: "All speed tests passed",
ExitCode.CONNECTION_ERROR: "Failed to connect to device",
ExitCode.CAPABILITY_CHECK_FAILED: "Device does not support required capabilities",
ExitCode.CONFIGURATION_ERROR: "Failed to configure hub data rate settings",
ExitCode.TEST_FAILED: "One or more speed tests failed",
ExitCode.UNKNOWN: "Unknown error occurred",
}
class ProgramExit(Exception):
"""
Custom exception to signal program exit with a specific code.
This exception is used instead of sys.exit() to allow context managers
to properly clean up before the program exits.
"""
def __init__(self, code, message=None):
self.code = code
self.message = message
super().__init__(message)
def exit_with_code(code, additional_message=None):
"""Print exit code message and raise ProgramExit exception.
Raises ProgramExit instead of calling sys.exit() directly to ensure
context managers can properly clean up.
"""
if isinstance(code, ExitCode):
message = "EXIT CODE %d: %s" % (code.value, EXIT_CODE_DESCRIPTIONS[code])
if additional_message:
message += " - %s" % additional_message
print(message, file=sys.stderr)
raise ProgramExit(code, message)
else:
print("EXIT CODE %d: Unknown exit code" % code, file=sys.stderr)
raise ProgramExit(code, additional_message)
#=============================================================================
# Configuration Result
#=============================================================================
class ConfigResult(IntEnum):
"""Result of attempting to configure a data rate setting."""
SUCCESS = 0
UNSUPPORTED = 1 # Device does not support this configuration
ERROR = 2 # Other error occurred
def check_device_capability(stem, test_ports=None):
"""
Checks to see if the device can execute the required commands for the test.
Uses hasUEI to verify the device supports the necessary operations.
Args:
stem: The BrainStem module connection
test_ports: List of ports to validate. If None, only checks USBSystem capabilities.
"""
#---------------------------------------------------------------------------------
#cmdUSBSYSTEM Checks
#---------------------------------------------------------------------------------
#Check how many USBSystem Entities this device has.
result = stem.classQuantity(_BS_C.cmdUSBSYSTEM)
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdUSBSYSTEM"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdUSBSYSTEM class quantity")
if result.value < 1:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "Device does not have USBSystem capabilities")
#Check that we can set HS max data rate
result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemDataHSMaxDatarate, 0, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result, "Device does not support setting HS max data rate"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "setDataHSMaxDatarate")
#Check that we can set SS max data rate
result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemDataSSMaxDatarate, 0, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result, "Device does not support setting SS max data rate"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "setDataSSMaxDatarate")
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
#cmdPORT Checks
#---------------------------------------------------------------------------------
#Check how many Port Entities this device has.
result = stem.classQuantity(_BS_C.cmdPORT)
if basic_error_handling(stem, result, "Could not acquire class quantity for cmdPORT"):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "cmdPORT class quantity")
# If no test_ports provided, skip per-port validation
if test_ports is None:
return
#Check that each test port is valid
for port in test_ports:
if port >= result.value:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is out of range. Device has %d ports." % (port, result.value))
#Check that we can enable/disable the port
result_uei = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portPortEnabled, port, (_BS_C.ueiOPTION_SET))
if basic_error_handling(stem, result_uei, "Cannot enable/disable port %d" % port):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d setEnabled" % port)
#Check that we can get data speed
result_uei = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portDataSpeed, port, (_BS_C.ueiOPTION_GET))
if basic_error_handling(stem, result_uei, "Cannot get data speed on port %d" % port):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "port %d getDataSpeed" % port)
#---------------------------------------------------------------------------------
def verify_control_port_connected(stem):
"""
Verify that the control port is connected with adequate voltage.
Finds the port with data role portDataRole_Control_Value and checks
that it has voltage > CONTROL_PORT_MIN_MICRO_VOLTS.
Exits with error if control port is not found or voltage is too low.
"""
print("\n--- Verifying Control Port Connection ---")
control_port_idx = None
# Find the control port
for port_idx in range(MAX_ACRONAME_PORTS):
port_entity = brainstem.entity.Port(stem, port_idx)
result = port_entity.getDataRole()
# Stop if we've exceeded valid ports
if result.error in ALLOWED_RANGE_ERRORS:
break
if basic_error_handling(stem, result, "Error getting data role for port %d" % port_idx):
continue
if result.value == _BS_C.portDataRole_Control_Value:
control_port_idx = port_idx
print("Found control port: Port %d" % port_idx)
break
if control_port_idx is None:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No control port found on device")
# Check voltage on control port
port_entity = brainstem.entity.Port(stem, control_port_idx)
result = port_entity.getVbusVoltage()
if basic_error_handling(stem, result, "Error getting voltage for control port %d" % control_port_idx):
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "Cannot read control port voltage")
if result.value < CONTROL_PORT_MIN_MICRO_VOLTS:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "The control port does not appear to be connected. Please connect it and try again." )
def validate_ports_not_upstream(stem, ports):
"""
Validate that none of the specified ports are upstream ports.
Exits with error if any port is an upstream port.
Args:
stem: The BrainStem module connection
ports: List of port indices to validate
"""
for port in ports:
port_entity = brainstem.entity.Port(stem, port)
result = port_entity.getDataRole()
if basic_error_handling(stem, result, "Warning: Could not get data role for port %d" % port):
continue
if result.value == _BS_C.portDataRole_Upstream_Value:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED,
"Port %d is an upstream port and cannot be tested" % port)
print("Port %d: Data role validated (role=%d)" % (port, result.value))
def discover_downstream_ports(stem):
"""
Discover all downstream ports on the hub by iterating through ports
and checking their data role.
Iterates from port 0 up to MAX_ACRONAME_PORTS, stopping when an index
out of range error is encountered.
Returns:
List of port indices that are downstream ports.
"""
downstream_ports = []
for port_idx in range(MAX_ACRONAME_PORTS):
try:
port_entity = brainstem.entity.Port(stem, port_idx)
result = port_entity.getDataRole()
# Check if this is an index out of range error (indicates we've exceeded valid ports)
if result.error in ALLOWED_RANGE_ERRORS:
print("Port discovery complete: found %d total ports" % port_idx)
break
# Handle other errors
if basic_error_handling(stem, result, "Warning: Error getting data role for port %d" % port_idx):
continue
if result.value == _BS_C.portDataRole_Downstream_Value:
downstream_ports.append(port_idx)
print("Port %d: Downstream" % port_idx)
else:
print("Port %d: Not downstream (role=%d)" % (port_idx, result.value))
except Exception as e:
# Catch any unexpected exceptions and stop iteration
print("Port discovery stopped at port %d: %s" % (port_idx, str(e)))
break
print("Discovered %d downstream ports: %s" % (len(downstream_ports), downstream_ports))
return downstream_ports
def filter_ports_with_devices(stem, ports):
"""
Filter the list of ports to only include those with connected devices.
Checks each port's data speed - if it has a value (> 0), a device is enumerated.
Also captures the maximum speed each device is capable of.
Returns:
tuple: (list of port indices with devices, dict mapping port -> max DeviceSpeed)
"""
ports_with_devices = []
device_max_speeds = {}
for port in ports:
port_entity = brainstem.entity.Port(stem, port)
result = port_entity.getDataSpeed()
if basic_error_handling(stem, result, "Port %d: Error getting data speed" % port):
continue
if result.value > 0:
speed = decode_data_speed(result.value)
print("Port %d: Device connected at %s" % (port, speed.name))
ports_with_devices.append(port)
device_max_speeds[port] = speed
else:
print("Port %d: No device enumerated" % port)
print("Found %d ports with devices: %s" % (len(ports_with_devices), ports_with_devices))
return ports_with_devices, device_max_speeds
def decode_data_speed(speed_value):
"""
Decode raw port data speed bits into a DeviceSpeed enum.
The raw speed value is a bitmask from the hub's getDataSpeed() call.
This function translates it to the DeviceNode speed enumeration format.
"""
# Test USB 3.0 first so that we find the highest speed first.
# USB Hubs enumerate at both USB 2.0 and USB 3.0 speeds.
if speed_value & (1 << _BS_C.portDataSpeed_Connected_3p0_Bit):
print("Device enumerated as USB 3.0")
if speed_value & (1 << _BS_C.portDataSpeed_ss_5G_Bit):
print("Data speed is 5G")
return DeviceSpeed.SUPER_SPEED
elif speed_value & (1 << _BS_C.portDataSpeed_ss_10G_Bit):
print("Data speed is 10G")
return DeviceSpeed.SUPER_SPEED_PLUS
elif speed_value & (1 << _BS_C.portDataSpeed_Connected_2p0_Bit):
print("Device enumerated as USB 2.0")
if speed_value & (1 << _BS_C.portDataSpeed_ls_1p5M_Bit):
print("Data speed is 1.5M")
return DeviceSpeed.LOW_SPEED
elif speed_value & (1 << _BS_C.portDataSpeed_fs_12M_Bit):
print("Data speed is 12M")
return DeviceSpeed.FULL_SPEED
elif speed_value & (1 << _BS_C.portDataSpeed_hs_480M_Bit):
print("Data speed is 480M")
return DeviceSpeed.HIGH_SPEED
print("Device enumerated as unknown")
return DeviceSpeed.UNKNOWN
def compare_device_speeds(hub_speed, os_speed, expected_speed):
"""
Compare the speed reported by the hub vs the OS vs the expected speed.
Returns:
True if all speeds match expected, False otherwise.
"""
if os_speed == DeviceSpeed.UNKNOWN:
print("Cannot compare: OS speed not available.")
return False
if hub_speed != os_speed:
print("FAIL: Hub/OS mismatch - Hub: %s, OS: %s" % (hub_speed.name, os_speed.name))
return False
if hub_speed != expected_speed:
print("FAIL: Speed mismatch - Got: %s, Expected: %s" % (hub_speed.name, expected_speed.name))
return False
print("PASS: Device speed matches expected: %s" % (expected_speed.name))
return True
def get_device_speed_from_os(port, serial_number=0, timeout=8.0):
"""
Get the device speed as reported by the OS for a specific port.
Polls the hub until a device enumerates or timeout is reached.
"""
def check_devices():
# Note: getDownstreamDevices() is a static function and does not require a stem connection;
# however, it only returns devices which are physically connected to Acroname hubs.
devices = brainstem.discover.getDownstreamDevices()
for device in devices.value:
if device.hub_port != port:
continue
if serial_number != 0 and device.hub_serial_number != serial_number:
continue
return True, DeviceSpeed(device.speed)
return False, DeviceSpeed.UNKNOWN
result = poll_until(check_devices, timeout=timeout, interval=0.3)
if result.timed_out:
print("Timeout waiting for OS to enumerate device")
return DeviceSpeed.UNKNOWN
return result.value
def prepare_hub_max_datarate_configuration(stem, ports, hs_speed, ss_speed, settle_time=0.5):
"""
Prepare the hub for a speed test by configuring data rate limits.
Disables all specified ports, configures the HS/SS max data rates, then re-enables all ports.
Since max data rate settings are hub-wide, all ports must be cycled together.
Returns:
ConfigResult.SUCCESS: Configuration applied successfully
ConfigResult.UNSUPPORTED: Device does not support this data rate configuration
ConfigResult.ERROR: Other error occurred
"""
usb_system = brainstem.entity.USBSystem(stem, 0)
# Disable all ports
for port in ports:
port_entity = brainstem.entity.Port(stem, port)
err = port_entity.setEnabled(False)
if basic_error_handling(stem, err, "Failed to disable port %d" % port):
return ConfigResult.ERROR
# Configure hub-wide max HS data rates
err = usb_system.setDataHSMaxDatarate(hs_speed)
if err in UNSUPPORTED_CONFIG_ERRORS:
print("Configuration not supported by device (HS): %s" % Result.getErrorText(err))
# Re-enable ports before returning
for port in ports:
brainstem.entity.Port(stem, port).setEnabled(True)
return ConfigResult.UNSUPPORTED
if basic_error_handling(stem, err, "Failed to set HS max data rate"):
return ConfigResult.ERROR
# Configure hub-wide max SS data rates
err = usb_system.setDataSSMaxDatarate(ss_speed)
if err in UNSUPPORTED_CONFIG_ERRORS:
print("Configuration not supported by device (SS): %s" % Result.getErrorText(err))
# Re-enable ports before returning
for port in ports:
brainstem.entity.Port(stem, port).setEnabled(True)
return ConfigResult.UNSUPPORTED
if basic_error_handling(stem, err, "Failed to set SS max data rate"):
return ConfigResult.ERROR
# Re-enable all ports
for port in ports:
port_entity = brainstem.entity.Port(stem, port)
err = port_entity.setEnabled(True)
if basic_error_handling(stem, err, "Failed to enable port %d" % port):
return ConfigResult.ERROR
time.sleep(settle_time) # Allow time for devices to enumerate.
return ConfigResult.SUCCESS
def get_device_speed_from_hub(stem, port, timeout=5.0, max_retries=5):
"""
Get the device speed as reported by the hub for a specific port.
Polls the hub until a device enumerates or timeout/max retries is reached.
"""
retry_count = 0 # Will be used as a "nonlocal" variable.
port_entity = brainstem.entity.Port(stem, port)
def check_speed():
nonlocal retry_count # Uses scope variable to keep state through multiple calls.
speed_result = port_entity.getDataSpeed()
if speed_result.error:
print("Error getting data speed: %s - retry %d" % (speed_result.error, retry_count))
retry_count += 1
if retry_count >= max_retries:
return True, speed_result # Stop polling due to max retries
return False, speed_result
elif speed_result.value > 0:
print("Device enumeration detected")
return True, speed_result
return False, speed_result
result = poll_until(check_speed, timeout=timeout, interval=0.3)
if result.timed_out:
print("Timeout waiting for device to enumerate")
speed_value = result.value.value if result.value else 0
return decode_data_speed(speed_value)
def print_results_table(results, test_ports, speed_labels):
"""
Print a summary table of test results.
Results can be:
True - Test passed
False - Test failed
None - Configuration not supported by device (N/A)
"""
# Calculate column widths
port_col_width = 6 # "Port X"
speed_col_width = max(len(label) for label in speed_labels) + 2
# Print header
print("\n" + "=" * 60)
print("TEST RESULTS SUMMARY")
print("=" * 60)
# Print column headers
header = "Port".ljust(port_col_width) + " | "
header += " | ".join(label.center(speed_col_width) for label in speed_labels)
print(header)
print("-" * len(header))
# Print each row (port)
all_passed = True
for port in test_ports:
row = ("%s" % port).ljust(port_col_width) + " | "
cells = []
for label in speed_labels:
passed = results.get((port, label), "--")
if passed is True:
cells.append("PASS".center(speed_col_width))
elif passed is False:
cells.append("FAIL".center(speed_col_width))
all_passed = False
elif passed is None:
# Configuration not supported by device - not a failure
cells.append("N/A".center(speed_col_width))
else:
cells.append("--".center(speed_col_width))
row += " | ".join(cells)
print(row)
print("=" * 60)
# Print overall result
if all_passed:
print("OVERALL: ALL TESTS PASSED")
else:
print("OVERALL: SOME TESTS FAILED")
print("=" * 60)
return all_passed
#Context manager that handles cleanup of the stem.
class CLI_Manager:
def __init__(self):
self.stem = None # brainstem.module.Module object
self.test_ports = None
self.device_max_speeds = {} # Maps port -> max DeviceSpeed
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.stem and self.test_ports:
# Restore hub to default configuration before disconnecting
print("\n--- Restoring hub to default configuration ---")
prepare_hub_max_datarate_configuration(
self.stem,
self.test_ports,
_BS_C.usbsystemDataHSMaxDatarate_HighSpeed,
_BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus
)
if self.stem:
print("\n--- Disconnecting from device ---")
self.stem.disconnect()
self.stem = None
return False # Ensure exception propagates
def main(argv):
exit_code = ExitCode.UNKNOWN
try:
print("Provided Arguments:")
print(argv)
arg_parser = CustomArgumentParser(argv)
with CLI_Manager() as cli:
#Setup
#/////////////////////////////////////////////////////////////////////
#---------------------------------------------------------------------
# Note: This code uses the base Module class instead of a specific device type.
# The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc.
# This allows our code to be more generic; however, we don't really know what we are or what
# we are capable of so we must do a handful of capability checks.
# 1. Connect to device
cli.stem = create_and_connect_stem(arg_parser.sn)
if cli.stem is None:
exit_with_code(ExitCode.CONNECTION_ERROR, "Serial: 0x%08X" % arg_parser.sn if arg_parser.sn else "first found")
# 2. Check basic device capabilities before calling any other APIs
check_device_capability(cli.stem)
# 3. Verify control port is connected before proceeding
verify_control_port_connected(cli.stem)
# 4. Determine candidate ports based on mode
if arg_parser.automatic:
print("\n--- Automatic Port Discovery ---")
candidate_ports = discover_downstream_ports(cli.stem)
if not candidate_ports:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No downstream ports found on device")
else:
# Manual mode - validate specified ports aren't upstream
validate_ports_not_upstream(cli.stem, arg_parser.test_ports)
candidate_ports = arg_parser.test_ports
# 5. Ensure ports are enabled and the max data rates are set.
print("\n--- Resetting hub to default configuration ---")
prepare_hub_max_datarate_configuration(
cli.stem,
candidate_ports,
_BS_C.usbsystemDataHSMaxDatarate_HighSpeed,
_BS_C.usbsystemDataSSMaxDatarate_SuperSpeedPlus,
settle_time=3.0 # Some device might need more or less time to enumerate.
)
# 6. Filter to ports with connected devices (common to both modes)
ports_with_devices, device_max_speeds = filter_ports_with_devices(cli.stem, candidate_ports)
if not ports_with_devices:
exit_with_code(ExitCode.CAPABILITY_CHECK_FAILED, "No devices connected to downstream ports")
cli.test_ports = ports_with_devices
cli.device_max_speeds = device_max_speeds
# 7. Validate capabilities for the specific test ports
check_device_capability(cli.stem, cli.test_ports)
print("\n--- Setup complete: testing ports %s ---" % cli.test_ports)
#---------------------------------------------------------------------
#/////////////////////////////////////////////////////////////////////
#Work
#/////////////////////////////////////////////////////////////////////
# Track test results: (port, speed_label) -> True/False/None
# None indicates the configuration is unsupported by the device
results = {}
speed_labels = [config[3] for config in DATA_RATE_CONFIGURATIONS]
# Loop through the data rate configurations
for hs_speed, ss_speed, expected_speed, speed_label in DATA_RATE_CONFIGURATIONS:
config_result = prepare_hub_max_datarate_configuration(cli.stem, cli.test_ports, hs_speed, ss_speed)
if config_result == ConfigResult.ERROR:
exit_with_code(ExitCode.CONFIGURATION_ERROR, "Failed to prepare hub for %s" % speed_label)
if config_result == ConfigResult.UNSUPPORTED:
# Mark all ports as unsupported for this configuration
print("--- Skipping %s tests (configuration not supported by BrainStem device) ---" % speed_label)
for port in cli.test_ports:
results[(port, speed_label)] = None # None = unsupported/N/A
continue
# Test each port with the current configuration
for port in cli.test_ports:
# Check if the device is capable of the expected speed
if port in cli.device_max_speeds:
device_max = cli.device_max_speeds[port]
if expected_speed > device_max:
print("\n--- Skipping Port %d @ %s (device max speed is %s) ---" % (port, speed_label, device_max.name))
results[(port, speed_label)] = None # None = N/A
continue
print("\n--- Testing Port %d @ %s (expecting %s) ---" % (port, speed_label, expected_speed.name))
hub_speed = get_device_speed_from_hub(cli.stem, port)
os_speed = get_device_speed_from_os(port, arg_parser.sn)
success = compare_device_speeds(hub_speed, os_speed, expected_speed)
results[(port, speed_label)] = success
#/////////////////////////////////////////////////////////////////////
# Print summary table
all_passed = print_results_table(results, cli.test_ports, speed_labels)
if all_passed:
exit_code = ExitCode.SUCCESS
else:
exit_code = ExitCode.TEST_FAILED
except ProgramExit as e:
exit_code = e.code
# Exit at the end, after context manager cleanup is complete
return exit_code
if __name__ == '__main__':
sys.exit(main(sys.argv))