#!/usr/bin/env python3 #System import time import sys import os #BrainStem import brainstem from brainstem import _BS_C #Gives access to aProtocolDef.h constants. from brainstem.pd_channel_logger import PDChannelLogger from brainstem.result import Result #Local files from argument_parser import * #Power Delivery Utilities script_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities/PowerDelivery")) sys.path.insert(0, parent_dir) from bs_pd_packet_filtering import * #Generic Utilities script_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities")) sys.path.insert(0, parent_dir) from brainstem_helpers import * BAIL_COUNTER = 40 #Note: Apple DFU related information was acquired from here: #https://github.com/AsahiLinux/docs/wiki/HW:USB-PD #Users should keep in mind that this information was NOT publicly distributed #by Apple and no guarantee of accuracy can be made. #Users should use this code at their own risk and Acroname accepts #no responsibility for any damage caused by this code. CMD_GET_ACTION = 0x10 CMD_GET_ACTION_INFO = 0x11 CMD_PREFORM_ACTION = 0x12 #Apple Vendor ID APPLE_VID = 0x05AC #Apple DFU command number to string. COMMAND_LOOKUP = { 0x0103: "PD Reset", 0x0105: "Reboot", 0x0106: "DFU / Hold Mode", 0x0306: "Debug UART", 0x0606: "DFU USB", 0x4606: "Debug USB", 0x0803: "I2C Bus (1)", 0x0809: "I2C Bus (2)", } def is_pd_established(stem, port_index): pd = brainstem.entity.PowerDelivery(stem, port_index) #Local RDO - This will be set if the port is SINKING power. rdo_local = pd.getRequestDataObject(_BS_C.powerdeliveryPartnerLocal) if not basic_error_handling(stem, rdo_local) and rdo_local.value != 0: return True #Remote RDO - This will be set if the port is SOURCING power. rdo_remote = pd.getRequestDataObject(_BS_C.powerdeliveryPartnerRemote) if not basic_error_handling(stem, rdo_remote) and rdo_remote.value != 0: return True return False #Turn the indicated port off and confirm that the voltage bleeds off. def turn_port_off_and_confirm_voltage(stem, port_index, max_voltage=200000): #0.2VDC port = brainstem.entity.Port(stem, port_index) error = port.setEnabled(False) s = "Exiting: Failed to disable port: %d - Error: %d" % (port_index, error) basic_error_handling(stem, error, s, exit=True, exit_code=9) bail_count = 0 while True: result = port.getVbusVoltage() if not basic_error_handling(stem, result) and result.value < max_voltage: break; bail_count += 1 if bail_count > BAIL_COUNTER: print("Exiting Failed to turn off port: %d - Result: %s - Bail Count: %d" % (port_index, result, bail_count)) sys.exit(5) time.sleep(.2) #Turn the indicated port off and confirm power delivery negotiations are completed (defined RDO) def turn_port_on_and_confirm_power_delivery(stem, port_index): port = brainstem.entity.Port(stem, port_index) error = port.setEnabled(True) s = "Exiting: Failed to enable port: %d - Error: %d" % (port_index, error) basic_error_handling(stem, error, s, exit=True, exit_code=9) bail_count = 0 while not is_pd_established(stem, port_index): bail_count += 1 if bail_count > BAIL_COUNTER: print("Exiting Failed to find a pd connection on port: %d - Bail Count: %d" % (port_index, bail_count)) sys.exit(5) time.sleep(.2) #Convenience function for combining the off confirm with the on confirm functions def toggle_and_wait_for_device(stem, port_index): turn_port_off_and_confirm_voltage(stem, port_index) turn_port_on_and_confirm_power_delivery(stem, port_index) import struct #Helper function for constructing basic apple dfu related vdm's def construct_apple_dfu_vdm(sop, command, action=None, action_arg=None): if CMD_GET_ACTION == command: return struct.pack('= max_time_seconds: print("Exiting: Failed to send/receive apple dfu vdm's") sys.exit(5) time.sleep(.1) #Toggle the port one more time to ensure it is in a working state. toggle_and_wait_for_device(stem, port_index) return None #Discovers the actions a device supports def discover_actions(stem, port_index, sop, logger): supported_actions = [] req = construct_apple_dfu_vdm(sop, CMD_GET_ACTION) result_packet = handle_send_receive_apple_dfu_vdm(stem, port_index, sop, req, logger) if result_packet and result_packet.error == Result.NO_ERROR: supported_actions = get_two_byte_list_from_buffer(result_packet.value.payload, start_offset=6) print("Discovered Actions: ", format_list_as_hex_string_list(supported_actions, width=4)) for action in supported_actions: print("Action: 0x%04X - %s" % (action, COMMAND_LOOKUP.get(action, "Unknown"))) print("----------------------------------------------------------") return supported_actions #Discovers the actions arguments that a device action supports. def discover_action_arguments(stem, port_index, sop, action, logger): supported_args = [] print("Discovering Arguments for Action: 0x%04X - %s" % (action, COMMAND_LOOKUP.get(action, "Unknown"))) req = construct_apple_dfu_vdm(sop, CMD_GET_ACTION_INFO, action) result_packet = handle_send_receive_apple_dfu_vdm(stem, port_index, sop, req, logger) if result_packet and result_packet.error == Result.NO_ERROR: supported_args = get_two_byte_list_from_buffer(result_packet.value.payload, start_offset=6) print("Discovered Arguments: ", format_list_as_hex_string_list(supported_args, width=4)) print("----------------------------------------------------------") return supported_args #Checks to see if the device can execute the required commands for the defined port. #This also includes some functionality from software features. def check_device_capability(stem, port_index): #Note: We don't check cmdSYSTEM because all Acroname devices support this Entity. #--------------------------------------------------------------------------------- #cmdPORT Checks #--------------------------------------------------------------------------------- #Check how many Port Entities this devices has. result = stem.classQuantity(_BS_C.cmdPORT) s = "Exiting: Could not acquire class quantity for cmdPORT - %s" % (result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that the supplied port does not exceed the amount of Port entities this devices has. if port_index > result.value: print("The provided port is out of range of supported ports for this device (cmdPORT). - %s" % (result)) sys.exit(2) #Check that we can enable/disable a port result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portPortEnabled, port_index, (_BS_C.ueiOPTION_SET)) s = "Exiting: There is an error with this devices ability to enable port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that we can get vbus voltage result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portVbusVoltage, port_index, (_BS_C.ueiOPTION_GET)) s = "Exiting: There is an error with this devices ability to get vbus voltage on port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #--------------------------------------------------------------------------------- #--------------------------------------------------------------------------------- #cmdUSBSYSTEM Checks #--------------------------------------------------------------------------------- #Check how many USBSystem Entities this devices has. result = stem.classQuantity(_BS_C.cmdUSBSYSTEM) s = "Exiting: Could not acquire class quantity for cmdUSBSYSTEM - %s" % (result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that the supplied port does not exceed the amount of PowerDelivery entities this devices has. if result.value < 1: print("Exiting The USBSystem index is out of range for this device (cmdUSBSYSTEM). - %s" % (result)) sys.exit(2) #Check that we can get the upstream port result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemUpstreamPort, 0, (_BS_C.ueiOPTION_GET)) s = "Exiting: There is an error with this devices ability to enable port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #--------------------------------------------------------------------------------- #--------------------------------------------------------------------------------- #cmdPOWERDELIVERY Checks #--------------------------------------------------------------------------------- #Check how many PowerDelivery Entities this devices has. result = stem.classQuantity(_BS_C.cmdPOWERDELIVERY) s = "Exiting: Could not acquire class quantity for cmdPOWERDELIVERY - %s" % (result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that the supplied port does not exceed the amount of PowerDelivery entities this devices has. if port_index > result.value: print("Exiting The provided port is out of range of supported ports for this device (cmdPOWERDELIVERY). - %s" % (result)) sys.exit(2) #Check that we can send VDM's result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryVDM, port_index, (_BS_C.ueiOPTION_SET)) #TODO: indicate in the error message that this might require a software features update. s = "Exiting: There is an error with this devices ability to send VDM messages on port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that we can get RDO's result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestDataObject, port_index, (_BS_C.ueiOPTION_GET)) s = "Exiting: There is an error with this devices ability to get RDOs on port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that we can send PD Requests result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestCommand, port_index, (_BS_C.ueiOPTION_SET)) s = "Exiting: There is an error with this devices ability to send PD requests on port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that we can send PD VDMs result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryVDM, port_index, (_BS_C.ueiOPTION_SET)) s = "Exiting: There is an error with this devices ability to send PD VDM messages on port: %d - %s" % (port_index, result) basic_error_handling(stem, result, s, exit=True, exit_code=2) #Check that we can enable PD Logging result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEnable, port_index, (_BS_C.ueiOPTION_SET)) s = "Exiting: There is an error with this devices ability to send Power Delivery Logging Enable requests on port: %d - %s" % (port_index, result) s += "\nDoes this devices have the PD Logging software feature?" basic_error_handling(stem, result, s, exit=True, exit_code=2) #TODO: This check doesn't exist in firmware yet. # #Check that we can register for PD Logging events # result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEvent, port_index, (_BS_C.ueiOPTION_SET)) # s = "Exiting: There is an error with this devices ability to register for Power Delivery Events on port: %d - %s" % (port_index, result) # s += "\nDoes this devices have the PD Logging software feature?" # basic_error_handling(stem, result, s, exit=True, exit_code=2) #--------------------------------------------------------------------------------- #Ensure the software library is >=2.11.x def check_software_version_capability(): if not brainstem.version.isAtLeast(2, 11, 2).value: print("Exiting: This application requires BrainStem >= 2.11.2") sys.exit(14) #Context manager that handles cleanup of the stem and logger. #It also attempts to return the power mode. class CLI_Manager: def __init__(self): self.stem = None #brainstem.module.Module objects self.logger = None #PDChannelLogger objects self.power_role = None #values acquired by getPowerRole() self.port = None #port number associated with the value above. def __enter__(self): #You must have this function for the context manager to work correctly return self def __exit__(self, exc_type, exc_value, traceback): if self.logger: self.logger.setEnabled(False) self.logger = None if self.stem: #Attempts to restore the devices original power mode if it was changed. pd = brainstem.entity.PowerDelivery(self.stem, self.port) #We only support one stem result = pd.setPowerRole(self.power_role) basic_error_handling(self.stem, result, "Error Applying user's original power role.") self.stem.disconnect() self.stem = None return False # Ensure exception propagates #Acquire the current power role of the device so that it can be restored and #set the needed power role based on the sop. def handle_power_role(stem, port, sop): pd = brainstem.entity.PowerDelivery(stem, port) pr_result = pd.getPowerRole() basic_error_handling(stem, pr_result, "pd.getPowerRole()", exit=True, exit_code=10) set_power_role = 0 if 4 == sop: set_power_role = _BS_C.powerdeliveryPowerRoleSource elif 3 == sop: set_power_role = _BS_C.powerdeliveryPowerRoleSink else: #Arg parser should be handling this. print("Unsupported SOP") sys.exit(76) error = pd.setPowerRole(set_power_role) basic_error_handling(stem, error, ("pd.setPowerRole(%d)" % (set_power_role)), exit=True, exit_code=11) #Power should be on; however, if the power role changed then we should wait for PD to be established. turn_port_on_and_confirm_power_delivery(stem, port) return pr_result.value #return the original value #Check that the requested port is not our upstream port. We don't want to cut our head off. def check_upstream_port(stem, port): usb_system = brainstem.entity.USBSystem(stem, 0) upstream_result = usb_system.getUpstream() if not basic_error_handling(stem, upstream_result, "usb_system.getUpstream()", exit=True, exit_code=77): if upstream_result.value == port: print("Unable to Discover DFU capabilities on the upstream port. Select a different port") sys.exit(78) def main(argv): try: print("Provided Arguments:") print(argv) arg_parser = CustomArgumentParser(argv) with CLI_Manager() as cli: cli.port = arg_parser.port #Setup #///////////////////////////////////////////////////////////////////// #Only connect if we need to. if (arg_parser.CLI_ACTION_DISCOVER == arg_parser.cli_action) or \ (arg_parser.CLI_ACTION_SEND == arg_parser.cli_action and arg_parser.send): #--------------------------------------------------------------------- #Note: This code doesn't use the typical BrainStem object, but instead uses a Module object. #The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc. #This allows our code to be more generic; however, we don't really know what we are or what #we are capable of and that is why we do a handful of capability checks here. cli.stem = create_and_connect_stem(arg_parser.sn) check_device_capability(cli.stem, arg_parser.port) check_software_version_capability() #--------------------------------------------------------------------- check_upstream_port(cli.stem, arg_parser.port) cli.power_role = handle_power_role(cli.stem, cli.port, arg_parser.sop) cli.logger = create_logger(cli.stem, cli.port) #Create Logger object for given port #///////////////////////////////////////////////////////////////////// #Work #///////////////////////////////////////////////////////////////////// if arg_parser.CLI_ACTION_DISCOVER == arg_parser.cli_action: if arg_parser.action != None: discover_action_arguments(cli.stem, cli.port, arg_parser.sop, arg_parser.action, logger) else: supported_actions = discover_actions(cli.stem, cli.port, arg_parser.sop, cli.logger) for action in supported_actions: discover_action_arguments(cli.stem, cli.port, arg_parser.sop, action, cli.logger) elif arg_parser.CLI_ACTION_SEND == arg_parser.cli_action: vdm = construct_apple_dfu_vdm(arg_parser.sop, CMD_PREFORM_ACTION, arg_parser.action, arg_parser.action_argument) print("Provided Action: 0x%04X" % (arg_parser.action)) print("Provided Action Argument: 0x%04X" % (arg_parser.action_argument)) print("Requested VDM as integer list: ", vdm) print("Requested VDM as hex string list: ", format_list_as_hex_string_list(vdm, width=2)) if arg_parser.send: handle_send_receive_apple_dfu_vdm(cli.stem, cli.port, arg_parser.sop, vdm, cli.logger) else: print("Provide the '-x' argument to send this VDM") else: #Arg parser should be handling this. print("Unsupported Action") sys.exit(79) #///////////////////////////////////////////////////////////////////// except SystemExit as e: return e return 0 if __name__ == '__main__': sys.exit(main(sys.argv))