import warnings from UniTAP.libs.lib_tsi.tsi_private_types import * from UniTAP.libs.lib_helper import OS_Requirements, lib_method_wrapper from ctypes import c_char_p, c_uint32, c_void_p, c_int32, POINTER, create_string_buffer, byref, \ sizeof, c_uint64, c_ubyte, c_bool from UniTAP.utils import tsi_logging as logging CTYPES_TYPE_LIST = [c_char_p, c_uint32, c_void_p, c_int32, c_uint64, c_ubyte, c_bool] callback = None class Singleton(type): _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] class TSIWrapper(metaclass=Singleton): _TSI_CURRENT_VERSION = 12 def __init__(self): self._TSI_DEV_SetSearchMask = None self._TSI_GetParsedEventData = None self._TSI_RemoveEventParser = None self._TSI_CreateEventParser = None self._TSIX_STLOG_GetMessageData = None self._TSIX_STLOG_WaitMessage = None self._TSIX_PORT_GetTestInfo = None self._TSIX_PORT_GetTestCount = None self._TSIX_TS_RunTest = None self._TSIX_TS_AbortTests = None self._TSIX_PORT_Deselect = None self._TSIX_PORT_Select = None self._TSI_MISC_GetErrorDescription = None self._TSIX_TS_GetConfigItem = None self._TSI2_TS_GetConfigItem = None self._TSIX_TS_SetConfigItem = None self._TSI2_TS_SetConfigItem = None self._TSIX_DEV_CloseDevice = None self._TSIX_DEV_GetDeviceRoleCount = None self._TSIX_DEV_GetDeviceRoleName = None self._TSIX_DEV_RescanDevices = None self._TSIX_DEV_SelectRole = None self._TSIX_DEV_OpenDevice = None self._TSIX_DEV_GetDeviceName = None self._TSI_DEV_GetDeviceName = None self._TSI_DEV_GetDeviceCount = None self._TSI_Clean = None self._TSIX_Init = None self._TSI_Init = None self.lib = OS_Requirements("TSI").get_lib() self.__setup_callback() def __del__(self): self.deinitialize() def initialize(self): self._TSIX_Init = lib_method_wrapper(self.lib.TSIX_Init, [POINTER(TSI_INIT_CONFIGURATION)], TSI_RESULT) self._TSI_Init = lib_method_wrapper(self.lib.TSI_Init, [c_uint32], TSI_RESULT) self._TSI_Clean = lib_method_wrapper(self.lib.TSI_Clean, [], TSI_RESULT) self._TSI_DEV_GetDeviceCount = lib_method_wrapper(self.lib.TSIX_DEV_GetDeviceCount, [], TSI_RESULT) self._TSIX_DEV_GetDeviceName = lib_method_wrapper(self.lib.TSIX_DEV_GetDeviceName, [TSI_DEVICE_ID, c_char_p, c_uint32], TSI_RESULT) self._TSIX_DEV_OpenDevice = lib_method_wrapper(self.lib.TSIX_DEV_OpenDevice, [TSI_DEVICE_ID, POINTER(TSI_RESULT)], TSI_HANDLE) self._TSIX_DEV_SelectRole = lib_method_wrapper(self.lib.TSIX_DEV_SelectRole, [TSI_HANDLE, c_int32], TSI_RESULT) self._TSIX_DEV_RescanDevices = lib_method_wrapper(self.lib.TSIX_DEV_RescanDevices, [c_int32, c_int32, c_int32], TSI_RESULT) self._TSIX_DEV_GetDeviceRoleName = lib_method_wrapper(self.lib.TSIX_DEV_GetDeviceRoleName, [TSI_HANDLE, c_int32, c_char_p, c_uint32], TSI_RESULT) self._TSIX_DEV_GetDeviceRoleCount = lib_method_wrapper(self.lib.TSIX_DEV_GetDeviceRoleCount, [TSI_HANDLE, ], TSI_RESULT) self._TSIX_DEV_CloseDevice = lib_method_wrapper(self.lib.TSIX_DEV_CloseDevice, [TSI_HANDLE, ], TSI_RESULT) self._TSI2_TS_SetConfigItem = lib_method_wrapper(self.lib.TSI2_TS_SetConfigItem, [TSI_HANDLE, TSI_LOGICAL_PORT, TSI_CONFIG_ID, c_void_p, c_uint32], TSI_RESULT) self._TSIX_TS_SetConfigItem = lib_method_wrapper(self.lib.TSIX_TS_SetConfigItem, [TSI_HANDLE, TSI_CONFIG_ID, c_void_p, c_uint32], TSI_RESULT) self._TSI2_TS_GetConfigItem = lib_method_wrapper(self.lib.TSI2_TS_GetConfigItem, [TSI_HANDLE, TSI_LOGICAL_PORT, TSI_CONFIG_ID, c_void_p, c_uint32], TSI_RESULT) self._TSIX_TS_GetConfigItem = lib_method_wrapper(self.lib.TSIX_TS_GetConfigItem, [TSI_HANDLE, TSI_CONFIG_ID, c_void_p, c_uint32], TSI_RESULT) self._TSI_MISC_GetErrorDescription = lib_method_wrapper( self.lib.TSI_MISC_GetErrorDescription, [TSI_RESULT, c_char_p, c_uint32], TSI_RESULT) self._TSIX_PORT_Select = lib_method_wrapper(self.lib.TSIX_PORT_Select, [TSI_HANDLE, TSI_LOGICAL_PORT], TSI_RESULT) self._TSIX_PORT_Deselect = lib_method_wrapper(self.lib.TSIX_PORT_Deselect, [TSI_HANDLE, TSI_LOGICAL_PORT], TSI_RESULT) self._TSIX_TS_RunTest = lib_method_wrapper(self.lib.TSIX_TS_RunTest, [TSI_HANDLE, TSI_TEST_ID], TSI_RESULT) self._TSIX_TS_AbortTests = lib_method_wrapper(self.lib.TSIX_TS_AbortTests, [TSI_HANDLE], TSI_RESULT) self._TSIX_PORT_GetTestCount = lib_method_wrapper(self.lib.TSIX_PORT_GetTestCount, [TSI_HANDLE, TSI_LOGICAL_PORT], TSI_RESULT) self._TSIX_PORT_GetTestInfo = lib_method_wrapper(self.lib.TSIX_PORT_GetTestInfo, [TSI_HANDLE, TSI_LOGICAL_PORT, c_int32, POINTER(TSI_TEST_ID), POINTER(c_int32), c_char_p, c_uint32], TSI_RESULT) self._TSIX_STLOG_WaitMessage = lib_method_wrapper(self.lib.TSIX_STLOG_WaitMessage, [TSI_HANDLE, c_int32], TSI_RESULT) self._TSIX_STLOG_GetMessageData = lib_method_wrapper(self.lib.TSIX_STLOG_GetMessageData, [TSI_HANDLE, c_char_p, c_uint32, POINTER(c_uint32)], TSI_RESULT) self._TSI_CreateEventParser = lib_method_wrapper(self.lib.TSI_CreateEventParser, [POINTER(TSI_RESULT), c_uint32], TSI_PARSER) self._TSI_RemoveEventParser = lib_method_wrapper(self.lib.TSI_RemoveEventParser, [TSI_PARSER, ], TSI_RESULT) self._TSI_GetParsedEventData = lib_method_wrapper(self.lib.TSI_GetParsedEventData, [TSI_PARSER, POINTER(c_ubyte), c_int32, POINTER(c_uint64), c_char_p, c_int32, c_char_p, c_int32, c_char_p, c_int32, c_char_p, c_int32], TSI_RESULT) self._TSI_DEV_SetSearchMask = lib_method_wrapper(self.lib.TSI_DEV_SetSearchMask, [TSI_DEVICE_CAPS, TSI_DEVICE_CAPS], TSI_RESULT) return self.__TSIX_Init() if logging.is_enabled() else self.__TSI_Init() def deinitialize(self): return self.__TSI_Clean() @staticmethod def __setup_callback(): global callback def ofp_impl(level, message): try: logging.log((level + 1) * 10, message.decode()) except Exception as exc: warnings.warn(f"[UniTAP] Logger function receive exception: {exc}") callback = TSI_USER_LOG_FUNCTION(ofp_impl) def __TSIX_Init(self): global callback config = TSI_INIT_CONFIGURATION() config.size = sizeof(config) config.version = self._TSI_CURRENT_VERSION config.function = callback return self._TSIX_Init(byref(config)) def __TSI_Init(self): return self._TSI_Init(self._TSI_CURRENT_VERSION) def __TSI_Clean(self): return self._TSI_Clean() def TSIX_DEV_GetDeviceCount(self): return self._TSI_DEV_GetDeviceCount() def TSIX_DEV_GetDeviceName(self, device_id: int): _device_name = create_string_buffer(TSI_NAME_SIZE) result = self._TSIX_DEV_GetDeviceName(device_id, _device_name, TSI_NAME_SIZE) return result, from_cstr(_device_name) def TSIX_DEV_OpenDevice(self, device_id: int): result = c_int32(TSI_SUCCESS) device = self._TSIX_DEV_OpenDevice(c_uint32(device_id), byref(result)) return device def TSIX_DEV_SelectRole(self, device, role_id: int): return self._TSIX_DEV_SelectRole(device, role_id) def TSIX_DEV_RescanDevices(self, search_options=1, required_caps=0, unallowed_caps=0): return self._TSIX_DEV_RescanDevices(search_options, required_caps, unallowed_caps) def TSIX_DEV_GetDeviceRoleName(self, device, role_id: int): _log_string_size = c_uint32(65536) _log_string = create_string_buffer(_log_string_size.value) result = self._TSIX_DEV_GetDeviceRoleName(device, role_id, _log_string, _log_string_size) return result, from_cstr(_log_string) def TSIX_DEV_GetDeviceRoleCount(self, device): return self._TSIX_DEV_GetDeviceRoleCount(device) def TSIX_DEV_CloseDevice(self, device): return self._TSIX_DEV_CloseDevice(device) def TSIX_TS_SetPortConfigItem(self, device_handle, port_id, config_id, data, data_type=c_uint32, data_count=1, data_size=0): result = 0 if data_size > 0: if type(data) == int or type(data) == bool: _data = data_type(data) result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, byref(_data), data_size) elif type(data) == str: _data = create_string_buffer(data_count) _data.value = str.encode(data) result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, byref(_data), data_size) else: if type(data) == int or type(data) == bool or type(data) == float: _data = data_type(data) result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, byref(_data), sizeof(_data)) elif type(data) == list or type(data) == bytearray or type(data) == tuple: _data = (data_type * data_count)(*data) result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, byref(_data), sizeof(_data)) elif type(data) == str: _data = create_string_buffer(data_count) _data.value = str.encode(data) result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, byref(_data), sizeof(_data)) elif str(type(data)).find('WINFUNCTYPE') != -1 or str(type(data)).find( 'CFUNCTYPE') != -1: result = self._TSI2_TS_SetConfigItem(device_handle, port_id, config_id, data, sizeof(data)) elif issubclass(type(data), IntEnum): _data = data_type(data.value) result = self._TSI2_TS_SetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) else: result = TSI_ERROR_NOT_IMPLEMENTED return result def TSIX_TS_SetConfigItem(self, device_handle, config_id, data, data_type=c_uint32, data_count=1, data_size=0): result = 0 if data_size > 0: if type(data) == int or type(data) == bool: _data = data_type(data) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), data_size) elif type(data) == str: _data = create_string_buffer(data_count) _data.value = str.encode(data) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), data_size) else: if type(data) == int or type(data) == bool or type(data) == float: _data = data_type(data) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) elif type(data) == list or type(data) == bytearray: _data = (data_type * data_count)(*data) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) elif type(data) == str: _data = create_string_buffer(data_count) _data.value = str.encode(data) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) elif str(type(data)).find('WINFUNCTYPE') != -1 or str(type(data)).find( 'CFUNCTYPE') != -1: result = self._TSIX_TS_SetConfigItem(device_handle, config_id, data, sizeof(data)) elif issubclass(type(data), IntEnum): _data = data_type(data.value) result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) elif issubclass(type(data), Structure): result = self._TSIX_TS_SetConfigItem(device_handle, config_id, byref(data), sizeof(data)) else: result = TSI_ERROR_NOT_IMPLEMENTED return result def TSIX_TS_GetPortConfigItem(self, device_handle, port_id, config_id, data_type, data_count=1): if data_type is None: rv = self._TSI2_TS_GetConfigItem(device_handle, port_id, config_id, data_type, data_count) result = rv, elif data_count > 1: _data = (data_type * data_count)() rv = self._TSI2_TS_GetConfigItem(device_handle, port_id, config_id, _data, sizeof(data_type) * data_count) if data_type == c_ubyte: result = rv, bytearray(_data), int(rv / sizeof(data_type)) else: result = rv, list(_data), int(rv / sizeof(data_type)) else: _data = data_type(0) rv = self._TSI2_TS_GetConfigItem(device_handle, port_id, config_id, byref(_data), sizeof(_data)) if data_type in CTYPES_TYPE_LIST: result = rv, _data.value else: result = rv, _data return result def TSIX_TS_GetConfigItem(self, device_handle, config_id, data_type, data_count=1): if data_type is None: rv = self._TSIX_TS_GetConfigItem(device_handle, config_id, data_type, data_count) result = rv, elif data_count > 1: _data = (data_type * data_count)() rv = self._TSIX_TS_GetConfigItem(device_handle, config_id, _data, sizeof(data_type) * data_count) result = rv, _data, data_count else: _data = data_type(0) rv = self._TSIX_TS_GetConfigItem(device_handle, config_id, byref(_data), sizeof(_data)) if data_type in CTYPES_TYPE_LIST: result = rv, _data.value else: result = rv, _data return result def TSI_MISC_GetErrorDescription(self, error): err_msg_size = 256 err_msg = create_string_buffer(err_msg_size) result = self._TSI_MISC_GetErrorDescription(error, err_msg, err_msg_size) return result, from_cstr(err_msg) def TSIX_PORT_Select(self, device: TSI_HANDLE, port): return self._TSIX_PORT_Select(device, port) def TSIX_PORT_Deselect(self, device: TSI_HANDLE, port): return self._TSIX_PORT_Deselect(device, port) def TSIX_TS_RunTest(self, device: TSI_HANDLE, test_id: int): return self._TSIX_TS_RunTest(device, c_uint32(test_id)) def TSIX_TS_AbortTests(self, device: TSI_HANDLE): return self._TSIX_TS_AbortTests(device) def TSIX_PORT_GetTestCount(self, device: TSI_HANDLE, port: TSI_LOGICAL_PORT): return self._TSIX_PORT_GetTestCount(device, port) def TSIX_PORT_GetTestInfo(self, device: TSI_HANDLE, port: TSI_LOGICAL_PORT, test_index: int): msg_size = c_uint32(1024) test_id = c_uint32(0) msg_buffer = create_string_buffer(msg_size.value) test_flags = c_int32(0) self._TSIX_PORT_GetTestInfo(device, port, test_index, byref(test_id), byref(test_flags), msg_buffer, msg_size) return test_id.value, test_flags.value, from_cstr(msg_buffer) def TSIX_STLOG_WaitMessage(self, device: TSI_HANDLE, max_wait: int): return self._TSIX_STLOG_WaitMessage(device, c_int32(max_wait)) def TSIX_STLOG_GetMessageData(self, device: TSI_HANDLE): msg_size = c_uint32(1024) out_size = c_uint32(0) msg_buffer = create_string_buffer(msg_size.value) result = self._TSIX_STLOG_GetMessageData(device, msg_buffer, msg_size, byref(out_size)) return result, from_cstr(msg_buffer), out_size.value def TSI_CreateEventParser(self, timestamp_res: int = 100): result = c_int32(TSI_SUCCESS) return self._TSI_CreateEventParser(byref(result), c_uint32(timestamp_res)) def TSI_RemoveEventParser(self, parser): return self._TSI_RemoveEventParser(parser) def TSI_GetParsedEventData(self, parser, data, to_dict=False): event_data = (c_ubyte * (len(data) - 12))(*data[12:]) type_str_max_size = 128 brief_str_max_size = 256 content_str_max_size = 4096 event_source_str_max_size = 64 typeStr = create_string_buffer(type_str_max_size) briefStr = create_string_buffer(brief_str_max_size) contentStr = create_string_buffer(content_str_max_size) eventSourceStr = create_string_buffer(event_source_str_max_size) timestamp = c_uint64(0) result = self._TSI_GetParsedEventData(parser, event_data, c_int32(len(data) - 12), byref(timestamp), typeStr, type_str_max_size, briefStr, brief_str_max_size, contentStr, content_str_max_size, eventSourceStr, event_source_str_max_size) if to_dict: return result, \ { 'timestamp': timestamp.value, 'type': from_cstr(typeStr), 'brief': from_cstr(briefStr), 'content': from_cstr(contentStr), 'source': from_cstr(eventSourceStr), 'data': data[12:] } else: return result, timestamp.value, from_cstr(typeStr), from_cstr(briefStr), from_cstr( contentStr), from_cstr( eventSourceStr), data[12:] def TSI_DEV_SetSearchMask(self, required_caps, unallowed_caps): return self._TSI_DEV_SetSearchMask(required_caps, unallowed_caps) class TSIEventParser: def __init__(self): self.__parser = None self.__parse_function = None def __enter__(self): self.__parser = TSIWrapper().TSI_CreateEventParser() return self def __exit__(self, exc_type, exc_val, exc_tb): TSIWrapper().TSI_RemoveEventParser(self.__parser) def parse(self, data, to_dict=False): return TSIWrapper().TSI_GetParsedEventData(self.__parser, data, to_dict) class TSI_INIT_CONFIGURATION(Structure): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.size = 0 self.version = 0 self.function = TSI_USER_LOG_FUNCTION() _fields_ = [ ("size", c_uint), ("version", c_uint), ("function", TSI_USER_LOG_FUNCTION), ] def get_config_item_name(config_item) -> str: items = globals().items() variable_name = [k for k, v in items if v == config_item][0] return variable_name def from_cstr(src_str): try: return src_str.value.decode('cp1252') except BaseException: return src_str.value.decode('utf-8') def to_cstr(src_str): return c_char_p(src_str.encode('utf-8')) def show_error(error_code): if error_code >= TSI_SUCCESS: return False result = TSI_MISC_GetErrorDescription(error_code) if result[0] < TSI_SUCCESS: val = f"Error {str(error_code)}: (No description available)\n" else: val = f"Error {str(error_code)}: {str(result[1])}\n" return val def create_c_array(data, _type, _size): buftype = _type * _size buf = buftype() buf.value = data return buf def TSIX_Init(): return TSIWrapper().initialize() def TSI_Clean(): return TSIWrapper().deinitialize() def TSIX_DEV_GetDeviceCount(): return TSIWrapper().TSIX_DEV_GetDeviceCount() def TSIX_DEV_GetDeviceName(device_id: int): return TSIWrapper().TSIX_DEV_GetDeviceName(device_id) def TSIX_DEV_OpenDevice(device_id: int): return TSIWrapper().TSIX_DEV_OpenDevice(device_id) def TSIX_DEV_SelectRole(device, role_id: int): return TSIWrapper().TSIX_DEV_SelectRole(device, role_id) def TSIX_DEV_RescanDevices(search_options=1, required_caps=0, unallowed_caps=0): return TSIWrapper().TSIX_DEV_RescanDevices(search_options, required_caps, unallowed_caps) def TSIX_DEV_GetDeviceRoleName(device, role_id: int): return TSIWrapper().TSIX_DEV_GetDeviceRoleName(device, role_id) def TSIX_DEV_GetDeviceRoleCount(device): return TSIWrapper().TSIX_DEV_GetDeviceRoleCount(device) def TSIX_DEV_CloseDevice(device): return TSIWrapper().TSIX_DEV_CloseDevice(device) def TSIX_TS_SetPortConfigItem(device_handle, port_id, config_id, data, data_type=c_uint32, data_count=1, data_size=0): return TSIWrapper().TSIX_TS_SetPortConfigItem(device_handle, port_id, config_id, data, data_type, data_count, data_size) def TSIX_TS_SetConfigItem(device_handle, config_id, data, data_type=c_uint32, data_count=1, data_size=0): return TSIWrapper().TSIX_TS_SetConfigItem(device_handle, config_id, data, data_type, data_count, data_size) def TSIX_TS_GetPortConfigItem(device_handle, port_id, config_id, data_type, data_count=1): return TSIWrapper().TSIX_TS_GetPortConfigItem(device_handle, port_id, config_id, data_type, data_count) def TSIX_TS_GetConfigItem(device_handle, config_id, data_type, data_count=1): return TSIWrapper().TSIX_TS_GetConfigItem(device_handle, config_id, data_type, data_count) def TSI_MISC_GetErrorDescription(error): return TSIWrapper().TSI_MISC_GetErrorDescription(error) def TSIX_PORT_Select(device: TSI_HANDLE, port): return TSIWrapper().TSIX_PORT_Select(device, port) def TSIX_PORT_Deselect(device, port): return TSIWrapper().TSIX_PORT_Deselect(device, port) def TSIX_TS_RunTest(device: TSI_HANDLE, test_id: int): return TSIWrapper().TSIX_TS_RunTest(device, test_id) def TSIX_TS_AbortTests(device: TSI_HANDLE): return TSIWrapper().TSIX_TS_AbortTests(device) def TSIX_PORT_GetTestCount(device: TSI_HANDLE, port: TSI_LOGICAL_PORT): return TSIWrapper().TSIX_PORT_GetTestCount(device, port) def TSIX_PORT_GetTestInfo(device: TSI_HANDLE, port: TSI_LOGICAL_PORT, test_index: int): return TSIWrapper().TSIX_PORT_GetTestInfo(device, port, test_index) def TSIX_STLOG_WaitMessage(device: TSI_HANDLE, max_wait: int): return TSIWrapper().TSIX_STLOG_WaitMessage(device, max_wait) def TSIX_STLOG_GetMessageData(device: TSI_HANDLE): return TSIWrapper().TSIX_STLOG_GetMessageData(device) def TSI_CreateEventParser(timestamp_res: int = 100): return TSIWrapper().TSI_CreateEventParser(timestamp_res) def TSI_RemoveEventParser(parser): return TSIWrapper().TSI_RemoveEventParser(parser) def TSI_GetParsedEventData(parser, data, to_dict=False): return TSIWrapper().TSI_GetParsedEventData(parser, data, to_dict) def TSI_DEV_SetSearchMask(required_caps, unallowed_caps): return TSIWrapper().TSI_DEV_SetSearchMask(required_caps, unallowed_caps)