446 lines
20 KiB
Python
Executable File
446 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#System
|
|
import time
|
|
import sys
|
|
import os
|
|
|
|
#BrainStem
|
|
import brainstem
|
|
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
|
|
from brainstem.pd_channel_logger import PDChannelLogger
|
|
from brainstem.result import Result
|
|
|
|
#Local files
|
|
from argument_parser import *
|
|
|
|
#Power Delivery Utilities
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities/PowerDelivery"))
|
|
sys.path.insert(0, parent_dir)
|
|
from bs_pd_packet_filtering import *
|
|
|
|
#Generic Utilities
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.abspath(os.path.join(script_dir, "../utilities"))
|
|
sys.path.insert(0, parent_dir)
|
|
from brainstem_helpers import *
|
|
|
|
BAIL_COUNTER = 40
|
|
|
|
#Note: Apple DFU related information was acquired from here:
|
|
#https://github.com/AsahiLinux/docs/wiki/HW:USB-PD
|
|
#Users should keep in mind that this information was NOT publicly distributed
|
|
#by Apple and no guarantee of accuracy can be made.
|
|
#Users should use this code at their own risk and Acroname accepts
|
|
#no responsibility for any damage caused by this code.
|
|
CMD_GET_ACTION = 0x10
|
|
CMD_GET_ACTION_INFO = 0x11
|
|
CMD_PREFORM_ACTION = 0x12
|
|
|
|
#Apple Vendor ID
|
|
APPLE_VID = 0x05AC
|
|
|
|
#Apple DFU command number to string.
|
|
COMMAND_LOOKUP = {
|
|
0x0103: "PD Reset",
|
|
0x0105: "Reboot",
|
|
0x0106: "DFU / Hold Mode",
|
|
0x0306: "Debug UART",
|
|
0x0606: "DFU USB",
|
|
0x4606: "Debug USB",
|
|
0x0803: "I2C Bus (1)",
|
|
0x0809: "I2C Bus (2)",
|
|
}
|
|
|
|
|
|
def is_pd_established(stem, port_index):
|
|
pd = brainstem.entity.PowerDelivery(stem, port_index)
|
|
|
|
#Local RDO - This will be set if the port is SINKING power.
|
|
rdo_local = pd.getRequestDataObject(_BS_C.powerdeliveryPartnerLocal)
|
|
if not basic_error_handling(stem, rdo_local) and rdo_local.value != 0:
|
|
return True
|
|
|
|
#Remote RDO - This will be set if the port is SOURCING power.
|
|
rdo_remote = pd.getRequestDataObject(_BS_C.powerdeliveryPartnerRemote)
|
|
if not basic_error_handling(stem, rdo_remote) and rdo_remote.value != 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
#Turn the indicated port off and confirm that the voltage bleeds off.
|
|
def turn_port_off_and_confirm_voltage(stem, port_index, max_voltage=200000): #0.2VDC
|
|
port = brainstem.entity.Port(stem, port_index)
|
|
error = port.setEnabled(False)
|
|
s = "Exiting: Failed to disable port: %d - Error: %d" % (port_index, error)
|
|
basic_error_handling(stem, error, s, exit=True, exit_code=9)
|
|
|
|
bail_count = 0
|
|
while True:
|
|
result = port.getVbusVoltage()
|
|
if not basic_error_handling(stem, result) and result.value < max_voltage:
|
|
break;
|
|
|
|
bail_count += 1
|
|
if bail_count > BAIL_COUNTER:
|
|
print("Exiting Failed to turn off port: %d - Result: %s - Bail Count: %d" % (port_index, result, bail_count))
|
|
sys.exit(5)
|
|
|
|
time.sleep(.2)
|
|
|
|
|
|
#Turn the indicated port off and confirm power delivery negotiations are completed (defined RDO)
|
|
def turn_port_on_and_confirm_power_delivery(stem, port_index):
|
|
port = brainstem.entity.Port(stem, port_index)
|
|
error = port.setEnabled(True)
|
|
s = "Exiting: Failed to enable port: %d - Error: %d" % (port_index, error)
|
|
basic_error_handling(stem, error, s, exit=True, exit_code=9)
|
|
|
|
bail_count = 0
|
|
while not is_pd_established(stem, port_index):
|
|
bail_count += 1
|
|
if bail_count > BAIL_COUNTER:
|
|
print("Exiting Failed to find a pd connection on port: %d - Bail Count: %d" % (port_index, bail_count))
|
|
sys.exit(5)
|
|
|
|
time.sleep(.2)
|
|
|
|
|
|
#Convenience function for combining the off confirm with the on confirm functions
|
|
def toggle_and_wait_for_device(stem, port_index):
|
|
turn_port_off_and_confirm_voltage(stem, port_index)
|
|
turn_port_on_and_confirm_power_delivery(stem, port_index)
|
|
|
|
|
|
import struct
|
|
#Helper function for constructing basic apple dfu related vdm's
|
|
def construct_apple_dfu_vdm(sop, command, action=None, action_arg=None):
|
|
if CMD_GET_ACTION == command:
|
|
return struct.pack('<LBBHL', sop, command, 0x80, APPLE_VID, 0)
|
|
|
|
elif CMD_GET_ACTION_INFO == command:
|
|
if action == None:
|
|
raise RuntimeError("Missing action parameter for this command type")
|
|
return struct.pack('<LBBHL', sop, command, 0x80, APPLE_VID, action)
|
|
|
|
elif CMD_PREFORM_ACTION == command:
|
|
if action == None or action_arg == None:
|
|
raise RuntimeError("Missing action or action_arg parameter for this command type.")
|
|
return struct.pack('<LBBHLHH', sop, command, 0x80, APPLE_VID, action, 0x00, action_arg)
|
|
|
|
else:
|
|
raise RuntimeError("Unsupported vdm command: %d" % (command))
|
|
|
|
|
|
#Convenience function for sending VDM's
|
|
def send_vdm(stem, port_index, buffer):
|
|
pd = brainstem.entity.PowerDelivery(stem, port_index)
|
|
error = pd.set_UEIBytes(_BS_C.powerdeliveryVDM, buffer)
|
|
basic_error_handling(stem, error)
|
|
|
|
|
|
#Simple send/receive with retry for a VDM sending.
|
|
def handle_send_receive_apple_dfu_vdm(stem, port_index, sop, vdm, logger, max_time_seconds=5):
|
|
start_time = time.time()
|
|
while True:
|
|
current_time = time.time()
|
|
elapsed_time = current_time - start_time
|
|
|
|
#Some DFU supporting devices will only respond to one command per connection.
|
|
#This function will toggle the port off in the hopes we can request again.
|
|
toggle_and_wait_for_device(stem, port_index)
|
|
|
|
#Some devices are chatty when they come online (especially Apple devices).
|
|
#Let them finish before we start asking questions.
|
|
wait_for_silence(logger, silence_time_seconds=1)
|
|
|
|
send_vdm(stem, port_index, vdm)
|
|
result_packet = pd_packet_filter(logger, sop, filter_tx_apple_dfu_vdm, filter_rx_apple_dfu_vdm)
|
|
|
|
if result_packet:
|
|
return result_packet
|
|
|
|
if elapsed_time >= max_time_seconds:
|
|
print("Exiting: Failed to send/receive apple dfu vdm's")
|
|
sys.exit(5)
|
|
|
|
time.sleep(.1)
|
|
|
|
#Toggle the port one more time to ensure it is in a working state.
|
|
toggle_and_wait_for_device(stem, port_index)
|
|
|
|
return None
|
|
|
|
|
|
#Discovers the actions a device supports
|
|
def discover_actions(stem, port_index, sop, logger):
|
|
supported_actions = []
|
|
req = construct_apple_dfu_vdm(sop, CMD_GET_ACTION)
|
|
result_packet = handle_send_receive_apple_dfu_vdm(stem, port_index, sop, req, logger)
|
|
if result_packet and result_packet.error == Result.NO_ERROR:
|
|
supported_actions = get_two_byte_list_from_buffer(result_packet.value.payload, start_offset=6)
|
|
print("Discovered Actions: ", format_list_as_hex_string_list(supported_actions, width=4))
|
|
for action in supported_actions:
|
|
print("Action: 0x%04X - %s" % (action, COMMAND_LOOKUP.get(action, "Unknown")))
|
|
print("----------------------------------------------------------")
|
|
|
|
return supported_actions
|
|
|
|
|
|
#Discovers the actions arguments that a device action supports.
|
|
def discover_action_arguments(stem, port_index, sop, action, logger):
|
|
supported_args = []
|
|
print("Discovering Arguments for Action: 0x%04X - %s" % (action, COMMAND_LOOKUP.get(action, "Unknown")))
|
|
req = construct_apple_dfu_vdm(sop, CMD_GET_ACTION_INFO, action)
|
|
result_packet = handle_send_receive_apple_dfu_vdm(stem, port_index, sop, req, logger)
|
|
if result_packet and result_packet.error == Result.NO_ERROR:
|
|
supported_args = get_two_byte_list_from_buffer(result_packet.value.payload, start_offset=6)
|
|
print("Discovered Arguments: ", format_list_as_hex_string_list(supported_args, width=4))
|
|
print("----------------------------------------------------------")
|
|
|
|
return supported_args
|
|
|
|
|
|
#Checks to see if the device can execute the required commands for the defined port.
|
|
#This also includes some functionality from software features.
|
|
def check_device_capability(stem, port_index):
|
|
#Note: We don't check cmdSYSTEM because all Acroname devices support this Entity.
|
|
|
|
#---------------------------------------------------------------------------------
|
|
#cmdPORT Checks
|
|
#---------------------------------------------------------------------------------
|
|
#Check how many Port Entities this devices has.
|
|
result = stem.classQuantity(_BS_C.cmdPORT)
|
|
s = "Exiting: Could not acquire class quantity for cmdPORT - %s" % (result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that the supplied port does not exceed the amount of Port entities this devices has.
|
|
if port_index > result.value:
|
|
print("The provided port is out of range of supported ports for this device (cmdPORT). - %s" % (result))
|
|
sys.exit(2)
|
|
|
|
#Check that we can enable/disable a port
|
|
result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portPortEnabled, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to enable port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can get vbus voltage
|
|
result = stem.hasUEI(_BS_C.cmdPORT, _BS_C.portVbusVoltage, port_index, (_BS_C.ueiOPTION_GET))
|
|
s = "Exiting: There is an error with this devices ability to get vbus voltage on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
#---------------------------------------------------------------------------------
|
|
|
|
|
|
#---------------------------------------------------------------------------------
|
|
#cmdUSBSYSTEM Checks
|
|
#---------------------------------------------------------------------------------
|
|
#Check how many USBSystem Entities this devices has.
|
|
result = stem.classQuantity(_BS_C.cmdUSBSYSTEM)
|
|
s = "Exiting: Could not acquire class quantity for cmdUSBSYSTEM - %s" % (result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
|
|
#Check that the supplied port does not exceed the amount of PowerDelivery entities this devices has.
|
|
if result.value < 1:
|
|
print("Exiting The USBSystem index is out of range for this device (cmdUSBSYSTEM). - %s" % (result))
|
|
sys.exit(2)
|
|
|
|
#Check that we can get the upstream port
|
|
result = stem.hasUEI(_BS_C.cmdUSBSYSTEM, _BS_C.usbsystemUpstreamPort, 0, (_BS_C.ueiOPTION_GET))
|
|
s = "Exiting: There is an error with this devices ability to enable port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
#---------------------------------------------------------------------------------
|
|
|
|
|
|
#---------------------------------------------------------------------------------
|
|
#cmdPOWERDELIVERY Checks
|
|
#---------------------------------------------------------------------------------
|
|
#Check how many PowerDelivery Entities this devices has.
|
|
result = stem.classQuantity(_BS_C.cmdPOWERDELIVERY)
|
|
s = "Exiting: Could not acquire class quantity for cmdPOWERDELIVERY - %s" % (result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that the supplied port does not exceed the amount of PowerDelivery entities this devices has.
|
|
if port_index > result.value:
|
|
print("Exiting The provided port is out of range of supported ports for this device (cmdPOWERDELIVERY). - %s" % (result))
|
|
sys.exit(2)
|
|
|
|
#Check that we can send VDM's
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryVDM, port_index, (_BS_C.ueiOPTION_SET))
|
|
#TODO: indicate in the error message that this might require a software features update.
|
|
s = "Exiting: There is an error with this devices ability to send VDM messages on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can get RDO's
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestDataObject, port_index, (_BS_C.ueiOPTION_GET))
|
|
|
|
s = "Exiting: There is an error with this devices ability to get RDOs on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can send PD Requests
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryRequestCommand, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to send PD requests on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can send PD VDMs
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryVDM, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to send PD VDM messages on port: %d - %s" % (port_index, result)
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#Check that we can enable PD Logging
|
|
result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEnable, port_index, (_BS_C.ueiOPTION_SET))
|
|
s = "Exiting: There is an error with this devices ability to send Power Delivery Logging Enable requests on port: %d - %s" % (port_index, result)
|
|
s += "\nDoes this devices have the PD Logging software feature?"
|
|
basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#TODO: This check doesn't exist in firmware yet.
|
|
# #Check that we can register for PD Logging events
|
|
# result = stem.hasUEI(_BS_C.cmdPOWERDELIVERY, _BS_C.powerdeliveryLogEvent, port_index, (_BS_C.ueiOPTION_SET))
|
|
# s = "Exiting: There is an error with this devices ability to register for Power Delivery Events on port: %d - %s" % (port_index, result)
|
|
# s += "\nDoes this devices have the PD Logging software feature?"
|
|
# basic_error_handling(stem, result, s, exit=True, exit_code=2)
|
|
|
|
#---------------------------------------------------------------------------------
|
|
|
|
|
|
#Ensure the software library is >=2.11.x
|
|
def check_software_version_capability():
|
|
if not brainstem.version.isAtLeast(2, 11, 2).value:
|
|
print("Exiting: This application requires BrainStem >= 2.11.2")
|
|
sys.exit(14)
|
|
|
|
|
|
#Context manager that handles cleanup of the stem and logger.
|
|
#It also attempts to return the power mode.
|
|
class CLI_Manager:
|
|
def __init__(self):
|
|
self.stem = None #brainstem.module.Module objects
|
|
self.logger = None #PDChannelLogger objects
|
|
self.power_role = None #values acquired by getPowerRole()
|
|
self.port = None #port number associated with the value above.
|
|
|
|
def __enter__(self): #You must have this function for the context manager to work correctly
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if self.logger:
|
|
self.logger.setEnabled(False)
|
|
self.logger = None
|
|
|
|
if self.stem:
|
|
#Attempts to restore the devices original power mode if it was changed.
|
|
pd = brainstem.entity.PowerDelivery(self.stem, self.port) #We only support one stem
|
|
result = pd.setPowerRole(self.power_role)
|
|
basic_error_handling(self.stem, result, "Error Applying user's original power role.")
|
|
|
|
self.stem.disconnect()
|
|
self.stem = None
|
|
|
|
return False # Ensure exception propagates
|
|
|
|
|
|
#Acquire the current power role of the device so that it can be restored and
|
|
#set the needed power role based on the sop.
|
|
def handle_power_role(stem, port, sop):
|
|
pd = brainstem.entity.PowerDelivery(stem, port)
|
|
pr_result = pd.getPowerRole()
|
|
basic_error_handling(stem, pr_result, "pd.getPowerRole()", exit=True, exit_code=10)
|
|
|
|
set_power_role = 0
|
|
if 4 == sop:
|
|
set_power_role = _BS_C.powerdeliveryPowerRoleSource
|
|
elif 3 == sop:
|
|
set_power_role = _BS_C.powerdeliveryPowerRoleSink
|
|
else: #Arg parser should be handling this.
|
|
print("Unsupported SOP")
|
|
sys.exit(76)
|
|
error = pd.setPowerRole(set_power_role)
|
|
basic_error_handling(stem, error, ("pd.setPowerRole(%d)" % (set_power_role)), exit=True, exit_code=11)
|
|
|
|
#Power should be on; however, if the power role changed then we should wait for PD to be established.
|
|
turn_port_on_and_confirm_power_delivery(stem, port)
|
|
|
|
return pr_result.value #return the original value
|
|
|
|
|
|
#Check that the requested port is not our upstream port. We don't want to cut our head off.
|
|
def check_upstream_port(stem, port):
|
|
usb_system = brainstem.entity.USBSystem(stem, 0)
|
|
upstream_result = usb_system.getUpstream()
|
|
if not basic_error_handling(stem, upstream_result, "usb_system.getUpstream()", exit=True, exit_code=77):
|
|
if upstream_result.value == port:
|
|
print("Unable to Discover DFU capabilities on the upstream port. Select a different port")
|
|
sys.exit(78)
|
|
|
|
|
|
def main(argv):
|
|
try:
|
|
print("Provided Arguments:")
|
|
print(argv)
|
|
arg_parser = CustomArgumentParser(argv)
|
|
|
|
with CLI_Manager() as cli:
|
|
|
|
cli.port = arg_parser.port
|
|
|
|
#Setup
|
|
#/////////////////////////////////////////////////////////////////////
|
|
#Only connect if we need to.
|
|
if (arg_parser.CLI_ACTION_DISCOVER == arg_parser.cli_action) or \
|
|
(arg_parser.CLI_ACTION_SEND == arg_parser.cli_action and arg_parser.send):
|
|
#---------------------------------------------------------------------
|
|
#Note: This code doesn't use the typical BrainStem object, but instead uses a Module object.
|
|
#The Module class is the base class for all BrainStem Objects like the USBHub3c, USBHub3p, USBCSwitch etc.
|
|
#This allows our code to be more generic; however, we don't really know what we are or what
|
|
#we are capable of and that is why we do a handful of capability checks here.
|
|
cli.stem = create_and_connect_stem(arg_parser.sn)
|
|
check_device_capability(cli.stem, arg_parser.port)
|
|
check_software_version_capability()
|
|
#---------------------------------------------------------------------
|
|
|
|
check_upstream_port(cli.stem, arg_parser.port)
|
|
|
|
cli.power_role = handle_power_role(cli.stem, cli.port, arg_parser.sop)
|
|
|
|
cli.logger = create_logger(cli.stem, cli.port) #Create Logger object for given port
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
#Work
|
|
#/////////////////////////////////////////////////////////////////////
|
|
if arg_parser.CLI_ACTION_DISCOVER == arg_parser.cli_action:
|
|
if arg_parser.action != None:
|
|
discover_action_arguments(cli.stem, cli.port, arg_parser.sop, arg_parser.action, logger)
|
|
else:
|
|
supported_actions = discover_actions(cli.stem, cli.port, arg_parser.sop, cli.logger)
|
|
for action in supported_actions:
|
|
discover_action_arguments(cli.stem, cli.port, arg_parser.sop, action, cli.logger)
|
|
|
|
elif arg_parser.CLI_ACTION_SEND == arg_parser.cli_action:
|
|
vdm = construct_apple_dfu_vdm(arg_parser.sop, CMD_PREFORM_ACTION, arg_parser.action, arg_parser.action_argument)
|
|
print("Provided Action: 0x%04X" % (arg_parser.action))
|
|
print("Provided Action Argument: 0x%04X" % (arg_parser.action_argument))
|
|
print("Requested VDM as integer list: ", vdm)
|
|
print("Requested VDM as hex string list: ", format_list_as_hex_string_list(vdm, width=2))
|
|
if arg_parser.send:
|
|
handle_send_receive_apple_dfu_vdm(cli.stem, cli.port, arg_parser.sop, vdm, cli.logger)
|
|
else:
|
|
print("Provide the '-x' argument to send this VDM")
|
|
|
|
else: #Arg parser should be handling this.
|
|
print("Unsupported Action")
|
|
sys.exit(79)
|
|
#/////////////////////////////////////////////////////////////////////
|
|
|
|
except SystemExit as e:
|
|
return e
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(sys.argv))
|
|
|
|
|