154 lines
6.1 KiB
Python
Executable File
154 lines
6.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
#System
|
|
import time
|
|
import sys
|
|
import os
|
|
|
|
#BrainStem
|
|
import brainstem
|
|
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
|
|
from brainstem.result import Result
|
|
|
|
#Local files
|
|
from argument_parser import *
|
|
|
|
#Power Delivery Utilities
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities/PowerDelivery"))
|
|
sys.path.insert(0, parent_dir)
|
|
from bs_pd_packet_filtering import *
|
|
|
|
#Generic Utilities
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities"))
|
|
sys.path.insert(0, parent_dir)
|
|
from brainstem_helpers import *
|
|
|
|
|
|
#Checks to see if the device can execute the required commands for a given port.
|
|
#This also includes some functionality from software features.
|
|
def check_device_capability(stem, port_index):
|
|
#Note: I don't check System commands. All devices have a system class.
|
|
|
|
#Check how many PowerDelivery Entities this devices has.
|
|
result = stem.classQuantity(_BS_C.cmdPOWERDELIVERY)
|
|
s = "Exiting: Could not acquire class quantity for cmdPOWERDELIVERY - %s" % (result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that the supplied port does not exceed the amount of PowerDelivery entities this devices has.
|
|
if port_index > result.value:
|
|
print("Exiting The provided port is out of range of supported ports for this device (cmdPOWERDELIVERY). - %s" % (result))
|
|
sys.exit(2)
|
|
|
|
#Check that we can send PD Requests
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestCommand, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to send PD requests on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can enable PD Logging
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEnable, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to send Power Delivery Logging Enable requests on port: %d - %s" % (port_index, result)
|
|
s += "\nDoes this devices have the PD Logging software feature?"
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#TODO: This check doesn't exist in firmware yet.
|
|
# #Check that we can register for PD Logging events
|
|
# result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEvent, port_index, (_BS_C.ueiOPTION_SET))
|
|
# s = "Exiting: There is an error with this devices ability to register for Power Delivery Events on port: %d - %s" % (port_index, result)
|
|
# s += "\nDoes this devices have the PD Logging software feature?"
|
|
# basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
|
|
#Ensure the software library is >=2.11.x
|
|
def check_software_version_capability():
|
|
if not brainstem.version.isAtLeast(2, 11, 2).value:
|
|
print("Exiting: This application requires BrainStem >= 2.11.2")
|
|
sys.exit(14)
|
|
|
|
|
|
def handle_request(stem, port_index, request, logger, sop):
|
|
func_tx = None
|
|
func_rx = None
|
|
func_decode = None
|
|
|
|
requests_dict = get_request_dict()
|
|
if requests_dict.get(request, None):
|
|
func_tx = requests_dict[request][0]
|
|
func_rx = requests_dict[request][1]
|
|
func_decode = requests_dict[request][2]
|
|
else:
|
|
print("The following request is not currently supported: %d" %(request))
|
|
return None
|
|
|
|
if func_tx and func_rx and func_decode:
|
|
pd = brainstem.entity.PowerDelivery(stem, port_index)
|
|
result = pd.request(request)
|
|
basic_error_handling(stem, result, ("pd.sendRequest(%d)" % (request)))
|
|
result = pd_packet_filter(logger, sop, func_tx, func_rx)
|
|
if result and result.error == Result.NO_ERROR:
|
|
return func_decode(result.value)
|
|
|
|
return None
|
|
|
|
|
|
#Context manager that handles cleanup of the stem and logger.
|
|
class CLI_Manager:
|
|
def __init__(self):
|
|
self.stem = None #brainstem.module.Module objects
|
|
self.logger = None #PDChannelLogger objects
|
|
|
|
def __enter__(self): #You must have this function for the context manager to work correctly
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if self.logger:
|
|
self.logger.setEnabled(False)
|
|
self.logger = None
|
|
|
|
if self.stem:
|
|
self.stem.disconnect()
|
|
self.stem = None
|
|
|
|
return False # Ensure exception propagates
|
|
|
|
|
|
def main(argv):
|
|
try:
|
|
print("Provided Arguments:")
|
|
print(argv)
|
|
arg_parser = CustomArgumentParser(argv)
|
|
|
|
with CLI_Manager() as cli:
|
|
|
|
#Setup
|
|
#/////////////////////////////////////////////////////////////////////
|
|
#---------------------------------------------------------------------
|
|
#Note: This code doesn't use the typical BrainStem object, but instead uses a Module object.
|
|
#The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc.
|
|
#This allows our code to be more generic; however, we don't really know what we are or what
|
|
#we are capable of and that is why we do a handful of capability checks here.
|
|
cli.stem = create_and_connect_stem(arg_parser.sn)
|
|
check_device_capability(cli.stem, arg_parser.port)
|
|
check_software_version_capability()
|
|
#---------------------------------------------------------------------
|
|
|
|
cli.logger = create_logger(cli.stem, arg_parser.port) #Create Logger object for given port
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
#Work
|
|
#/////////////////////////////////////////////////////////////////////
|
|
request_result = handle_request(cli.stem, arg_parser.port, arg_parser.request, cli.logger, arg_parser.sop)
|
|
print(request_result)
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
except SystemExit as e:
|
|
return e
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(sys.argv))
|
|
|