UmberHubManager/api/examples/python/usbhub3c_streaming/usbhub3c_streaming.py

329 lines
13 KiB
Python
Executable File

# Copyright (c) 2018 Acroname Inc. - All Rights Reserved
#
# This file is part of the BrainStem development package.
# See file LICENSE or go to https://acroname.com/software/brainstem-development-kit for full license details.
import brainstem
from brainstem.result import Result
from brainstem import _BS_C #Gives access to aProtocolDef.h constants.
from brainstem import ffi #Allows use of C Style API's (async streaming)
from brainstem.link import StreamStatusEntry
import sys
import time
import queue
BUFFER_SIZE = 100
LOOPS = 100
SLEEP_TIME = .000001
STREAM_WILDCARD = 0xFF #Magic number for indicating all possible values.
PORT_CALLBACK_INDEX = 2
# Create a queue for use across threads.
# Will be used in a producer/consumer style.
queue_port_voltage = queue.Queue(BUFFER_SIZE)
#Callback related to example: async_single_port_voltage_with_ref
#This is the producer side of queue_port_voltage
@ffi.callback("unsigned char(aPacket*, void*)")
def callback_single_port_voltage_with_ref(packet, pRef):
# See reference for packet decoding.
# https://acroname.com/reference/brainstem/appendix/uei.html
port_index = packet.data[2] & 0x3F
if PORT_CALLBACK_INDEX != port_index:
raise IndexError("This callback is only configured for port: %d" % (PORT_CALLBACK_INDEX))
val = 0
val = val | packet.data[12] << 24
val = val | packet.data[13] << 16
val = val | packet.data[14] << 8
val = val | packet.data[15] << 0
queue_port_voltage_ref = ffi.from_handle(pRef)
if queue_port_voltage_ref == ffi.NULL:
raise MemoryError("Failed to retrieve user ref")
queue_port_voltage_ref.put(val)
return Result.NO_ERROR
#Callback related to example: async_port_voltage
#This callback shows how a single callback can be used for all ports.
@ffi.callback("unsigned char(aPacket*, void*)")
def callback_port_voltage(packet, pRef):
# See reference for packet decoding.
# https://acroname.com/reference/brainstem/appendix/uei.html
port_index = packet.data[2] & 0x3F
val = 0
val = val | packet.data[12] << 24
val = val | packet.data[13] << 16
val = val | packet.data[14] << 8
val = val | packet.data[15] << 0
print("Callback: Port: %d Voltage: %f VDC" % (port_index, (val/1000000)))
return Result.NO_ERROR
#Shows how to register a callback and object reference for a specific cmd, option and index.
def async_single_port_voltage_with_ref(stem):
#Create c handle from python object.
queue_port_voltage_handle = ffi.new_handle(queue_port_voltage)
stem.link.registerStreamCallback(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, PORT_CALLBACK_INDEX, True, callback_single_port_voltage_with_ref, queue_port_voltage_handle)
#Loop for a defined amount of time.
LOOP_TIME = 5 #seconds
start_time = time.time()
while True:
elapsed_time = time.time() - start_time
if elapsed_time > LOOP_TIME:
break
print("async_single_port_voltage_with_ref: Time: %.02f/%.02f seconds" % (elapsed_time, LOOP_TIME))
while True: #Process all data in the queue.
try:
voltage = queue_port_voltage.get(block=False)
print("\tNew Voltage: %.06f VDC" % ((voltage/1000000)))
queue_port_voltage.task_done()
except queue.Empty:
break;
time.sleep(0.25)
stem.link.registerStreamCallback(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, PORT_CALLBACK_INDEX, False, ffi.NULL, ffi.NULL)
#Shows how to register a callback for a specific cmd, option and ALL indexes.
#ie port voltage will stream from all ports that are capable.
def async_port_voltage(stem):
stem.link.registerStreamCallback(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, STREAM_WILDCARD, True, callback_port_voltage, ffi.NULL)
#Do Stuff
print("Work happening in: \"async_port_voltage\"")
time.sleep(5)
stem.link.registerStreamCallback(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, STREAM_WILDCARD, False, ffi.NULL, ffi.NULL)
#This function will enable streaming for ALL capable streaming values.
#This is done by using the wildcard values in place for cmd, option and index.
def selective_stream_enable_all(stem):
stem.link.enableStream(stem.address, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD, True)
#///////////////////////////////////////////////////
#Do stuff here
print("Work happening in: \"selective_stream_enable_all\"")
time.sleep(2)
#///////////////////////////////////////////////////
stem.link.enableStream(stem.address, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD, False)
#This function selectively enable a few channels for streaming.
#Various enabling filters are shown with use of the wildcard value.
def selective_stream_manual_enable(stem):
#PortClass: Enable all ports for streaming.
stem.link.enableStream(stem.address, _BS_C.cmdPORT, STREAM_WILDCARD, STREAM_WILDCARD, True) #On
#PowerDeliveryClass: Enable all ports for streaming.
stem.link.enableStream(stem.address, _BS_C.cmdPOWERDELIVERY, STREAM_WILDCARD, STREAM_WILDCARD, True) #On
#PortClass: Enable getVbusVoltage for all ports.
stem.link.enableStream(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, STREAM_WILDCARD, True) #On
#PortClass: Enable getVconnVoltage for a specific port.
stem.link.enableStream(stem.address, _BS_C.cmdPORT, _BS_C.portVconnVoltage, 2, True) #On
#///////////////////////////////////////////////////
#Do stuff here
print("Work happening in: \"selective_stream_manual_enable\"")
time.sleep(2)
#///////////////////////////////////////////////////
#Turn everything off
stem.link.enableStream(stem.address, _BS_C.cmdPORT, STREAM_WILDCARD, STREAM_WILDCARD, False)
stem.link.enableStream(stem.address, _BS_C.cmdPOWERDELIVERY, STREAM_WILDCARD, STREAM_WILDCARD, False)
stem.link.enableStream(stem.address, _BS_C.cmdPORT, _BS_C.portVbusVoltage, STREAM_WILDCARD, False)
stem.link.enableStream(stem.address, _BS_C.cmdPORT, _BS_C.portVconnVoltage, 2, False)
def selective_stream_entity_enable(stem):
#Each entity can be selectively enabled for streaming functionality.
#In this example we enable the System Class (index 0) and 3x ports
#within the Port Class (indexes: 0, 5, 6). Other ports will NOT stream
#until explicitly enabled.
stream_state = True #Enable selective streams
stem.system.setStreamEnabled(stream_state)
stem.hub.port[0].setStreamEnabled(stream_state)
stem.hub.port[5].setStreamEnabled(stream_state)
stem.hub.port[6].setStreamEnabled(stream_state)
#///////////////////////////////////////////////////
#Do stuff here
print("Work happening in: \"selective_stream_entity_enable\"")
time.sleep(2)
#///////////////////////////////////////////////////
stream_state = False #Disable selective streams
stem.system.setStreamEnabled(stream_state)
stem.hub.port[0].setStreamEnabled(stream_state)
stem.hub.port[5].setStreamEnabled(stream_state)
stem.hub.port[6].setStreamEnabled(stream_state)
#This examples shows a comparison between times spent in a function call.
#This time is the time spent blocking. In non-streaming implementations
#each API call align with USB transaction time (1-4mS) per call. When
#streaming is enabled we can avoid this time penalty.
def basic_streaming_speed_test(stem):
non_streaming_total_time = 0
#///////////////////////////////////////////////////
#Non-Streaming test
#///////////////////////////////////////////////////
print("Speed test - Streaming disabled (default)")
for x in range(0, LOOPS):
start_time = time.time()
result = chub.system.getInputVoltage()
end_time = time.time()
elapsed_time = end_time - start_time
non_streaming_total_time += elapsed_time
print("\tLoop: %d Input Voltage(VDC): %.06f Error: %d Elapsed time: %.06f" % (x, (result.value/1000000), result.error, elapsed_time))
non_streaming_average = non_streaming_total_time / LOOPS
print("Average API time (non-streaming/blocking): %.06f" % (non_streaming_average))
#///////////////////////////////////////////////////
print("\n")
#///////////////////////////////////////////////////
#Streaming test
#///////////////////////////////////////////////////
#Enable streaming on the SystemClass
chub.system.setStreamEnabled(True)
time.sleep(1) #Allow time to settle
streaming_total_time = 0
streaming_average = 0
loop_counter = 0
actual_loops = 0
print("Speed test - Streaming enabled")
while loop_counter < LOOPS:
start_time = time.time()
result = chub.system.getInputVoltage()
end_time = time.time()
elapsed_time = end_time - start_time
streaming_total_time += elapsed_time
actual_loops += 1
if result.error == Result.STREAM_STALE_ERROR:
time.sleep(SLEEP_TIME) #Wait for a fresh value.
else:
loop_counter += 1 #Only increment the loop when we get a new stream value
print("\tLoop: %d Input Voltage(VDC): %.6f Error: %d Elapsed time: %.6f" % (loop_counter, (result.value/1000000), result.error, elapsed_time))
streaming_average = streaming_total_time / actual_loops
print("Average API time (streaming): %.6f" % (streaming_average))
#///////////////////////////////////////////////////
print("Result - Average time spent in API - Streaming: %.06f - Non-Streaming: %.06f Seconds" % (streaming_average, non_streaming_average))
chub.hub.port[5].setStreamEnabled(False)
time.sleep(1)
def print_stream_keys_raw(data):
for d in data:
print(f"Key: {d.key} - Value: {d.value}")
def print_stream_keys_decoded(data):
for entry in data:
module = StreamStatusEntry.getStreamKeyElement(entry.key, StreamStatusEntry.STREAM_KEY_MODULE_ADDRESS)
cmd = StreamStatusEntry.getStreamKeyElement(entry.key, StreamStatusEntry.STREAM_KEY_CMD)
option = StreamStatusEntry.getStreamKeyElement(entry.key, StreamStatusEntry.STREAM_KEY_OPTION)
index = StreamStatusEntry.getStreamKeyElement(entry.key, StreamStatusEntry.STREAM_KEY_INDEX)
subindex = StreamStatusEntry.getStreamKeyElement(entry.key, StreamStatusEntry.STREAM_KEY_SUBINDEX)
print(f"Module Address: {module.value}" \
f" - cmd: {cmd.value}" \
f" - option: {option.value}" \
f" - index: {index.value}" \
f" - subindex: {subindex.value}" \
f" - Value: {entry.value}")
#This example shows how to get a snapshot of current streaming status for all values
def basic_streaming_get_status_all(stem):
stem.link.enableStream(stem.address, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD, True)
time.sleep(1) #Allow time for data to come in.
result = stem.link.getStreamStatus(stem.address, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD)
if Result.NO_ERROR == result.error:
print_stream_keys_raw(result.value)
print_stream_keys_decoded(result.value);
stem.link.enableStream(stem.address, STREAM_WILDCARD, STREAM_WILDCARD, STREAM_WILDCARD, False)
#This example shows how to get a snapshot of current streaming status for a specific entity.
def basic_streaming_get_status_entity(stem):
stem.system.setStreamEnabled(True)
time.sleep(1) #Allow time for data to come in.
result = stem.system.getStreamStatus()
if Result.NO_ERROR == result.error:
print_stream_keys_raw(result.value)
print_stream_keys_decoded(result.value);
stem.system.setStreamEnabled(False)
#Main application.
if __name__ == '__main__':
# Create USBHub3c object
chub = brainstem.stem.USBHub3c()
#Locate and connect to the first USBHub3c you find on USB.
result = chub.discoverAndConnect(brainstem.link.Spec.USB)
#Verify we are connected
if result != (Result.NO_ERROR):
print ('Could not find a module. Exiting\n')
sys.exit(1)
result = chub.system.getSerialNumber()
print("Connected to USBHub3c: SN: 0x%08X - Error: %d" % (result.value, result.error))
#///////////////////////////////////////////////////
# Each function below represents a unique use case.
# For simplicity it can be helpful to only enable one at a time.
#///////////////////////////////////////////////////
basic_streaming_get_status_entity(chub)
# basic_streaming_get_status_all(chub)
# basic_streaming_speed_test(chub)
# selective_stream_enable_all(chub)
# selective_stream_manual_enable(chub)
# selective_stream_entity_enable(chub)
# async_port_voltage(chub)
# async_single_port_voltage_with_ref(chub)
#///////////////////////////////////////////////////
#Disconnect from device.
chub.disconnect()