import weakref from ctypes import c_uint32, c_char from typing import Callable, Tuple, List from enum import IntEnum from .tsi import (TSI_HANDLE, TSI_LOGICAL_PORT, TSI_SUCCESS, \ TSIX_TS_SetConfigItem, TSIX_TS_GetConfigItem, TSIX_TS_SetPortConfigItem, \ TSIX_TS_GetPortConfigItem, TSIX_Init, TSI_Clean, TSIX_DEV_RescanDevices, \ TSIX_DEV_OpenDevice, TSIX_DEV_GetDeviceName, TSIX_DEV_GetDeviceCount, TSIX_DEV_CloseDevice, \ TSIX_DEV_GetDeviceRoleCount, TSIX_DEV_SelectRole, TSIX_DEV_GetDeviceRoleName, TSIX_PORT_Select, \ get_config_item_name, TSIX_TS_RunTest, TSIX_STLOG_WaitMessage, TSIX_STLOG_GetMessageData, \ TSIX_PORT_GetTestCount, TSIX_PORT_GetTestInfo, TSI_TS_OF_MODE, \ TSI_OFMODE_RUN_CALL_STRUCT_PROCEDURE, TSI_TS_OF_REQ_ID, TSI_TS_OF_CALLBACK, TSI_FW_VERSION_TEXT, Structure, TSI_TEST_STATUS, TSI_DEVICE_INFO_R, TSI_BUNDLE_VERSION_TEXT, \ TSI_OFMODE_RUN_CALL_STRUCT_PROCEDURE_AND_RESULT, TSI_DEV_SetSearchMask, DeviceMaskInternal, \ TSI_TEST_LOG_CONFIGURATION, TSI_TEST_ERROR_CODE_R, TSI_TEST_PASSED, TSI_TEST_FAILED, TSI_TEST_NOT_STARTED, TSI_TEST_ABORTED, TSIX_TS_AbortTests, TSI_TEST_DLG_CONFIG) from UniTAP.utils import tsi_logging as logging class PortProtocol(IntEnum): DisplayPort = 0 HDMI = 1 DisplayPortThrowUSBC = 2 class TestStatus(Structure): _fields_ = [ ("status", c_uint32, 8), ("result", c_uint32, 7), ("oper_rq", c_uint32, 1), ("error", c_uint32, 7), ("log_ovf", c_uint32, 1), ("progress", c_uint32, 8), ] class TestDialogConfig(Structure): _fields_ = [ ("disable_auto_precessing", c_uint32, 1), ("reserved", c_uint32, 31) ] def __init__(self, disable_auto_precessing: bool): super().__init__() self.disable_auto_precessing = disable_auto_precessing class PortIO: def __init__(self, device: TSI_HANDLE, index: TSI_LOGICAL_PORT, protocol: PortProtocol): logging.info(f"[UniTAP] PortIO.init: {self}") self.__device = device self.__index = index self.__protocol = protocol self.active() def __del__(self): logging.info(f"[UniTAP] PortIO.del: {self}") def set(self, config_id, data, data_type=c_uint32, data_count=1, data_size=0): result = TSIX_TS_SetPortConfigItem(self.__device, self.__index, config_id, data, data_type, data_count, data_size) if isinstance(data, bytearray): logging.debug(f"[UniTAP] PortIO.set {self} {get_config_item_name(config_id)} " f"with data: {data[:(10 if len(data) > 10 else len(data))]}[{len(data)}] " f"({type(data)}); result: {result}") else: logging.debug(f"[UniTAP] PortIO.set {self} {get_config_item_name(config_id)} " f"with data: {data} ({type(data)}); result: {result}") return result def get(self, config_id, data_type=c_uint32, data_count=1): result = TSIX_TS_GetPortConfigItem(self.__device, self.__index, config_id, data_type, data_count) logging.debug( f"PortIO.get {self} {get_config_item_name(config_id)} " f"with type: {data_type}[{data_count}]; result: {result}") return result def active(self): TSIX_PORT_Select(self.__device, self.__index) def get_test_list(self) -> List[Tuple[int, int, str]]: test_list = [] for test_id in range(self.__get_test_count()): test_list.append(self.__get_test_info(test_id)) return test_list def protocol(self) -> PortProtocol: return self.__protocol def index(self): return self.__index def __get_test_count(self) -> int: return TSIX_PORT_GetTestCount(self.__device, self.__index) def __get_test_info(self, test_id: int): return TSIX_PORT_GetTestInfo(self.__device, self.__index, test_id) class DeviceIO: MAX_FW_VERSION_STR_SIZE = 65535 def __init__(self, index: int): logging.info(f"[UniTAP] DeviceIO.init: {self}") self.__device = TSIX_DEV_OpenDevice(index) self.__port_io_list = [] # type: list[PortIO] self.__current_role = None def __del__(self): self.__port_io_list.clear() TSIX_DEV_CloseDevice(self.__device) logging.info(f"[UniTAP] DeviceIO.del: {self}") @property def device_handle(self): return self.__device def set(self, config_id, data, data_type=c_uint32, data_count=1, data_size=0): return TSIX_TS_SetConfigItem(self.__device, config_id, data, data_type, data_count, data_size) def get(self, config_id, data_type=c_uint32, data_count=1): return TSIX_TS_GetConfigItem(self.__device, config_id, data_type, data_count) def get_device_role_count(self) -> int: return TSIX_DEV_GetDeviceRoleCount(self.__device) def get_device_role_name(self, index) -> str: return TSIX_DEV_GetDeviceRoleName(self.__device, index)[1] def select_role(self, index): logging.info( f"[UniTAP] DeviceIO.select_role: {index}") return TSIX_DEV_SelectRole(self.__device, index) def create_port_io(self, index, protocol: PortProtocol) -> PortIO: self.__port_io_list.append(PortIO(self.__device, index, protocol)) return weakref.proxy(self.__port_io_list[-1]) def set_opf_callback(self, callback: Callable): TSIX_TS_SetConfigItem(self.__device, TSI_TS_OF_MODE, TSI_OFMODE_RUN_CALL_STRUCT_PROCEDURE_AND_RESULT) TSIX_TS_SetConfigItem(self.__device, TSI_TS_OF_REQ_ID, -1) TSIX_TS_SetConfigItem(self.__device, TSI_TS_OF_CALLBACK, callback) def set_opf_config(self, config: TestDialogConfig): self.set(TSI_TEST_DLG_CONFIG, config) def get_test_list(self) -> List[Tuple[int, int, str]]: test_list = [] for port in self.__port_io_list: test_list.extend(port.get_test_list()) return test_list def run_test(self, test_id: int): logging.info( f"[UniTAP] DeviceIO.run_test with id: {hex(test_id)} " f"(group={test_id >> 16 & 0xffff}; test={test_id & 0xffff})") result = TSIX_TS_RunTest(self.__device, test_id) if result not in [TSI_TEST_PASSED, TSI_TEST_FAILED, TSI_TEST_NOT_STARTED, TSI_TEST_ABORTED]: result = TSI_TEST_FAILED return result def abort_test(self): logging.info(f"[UniTAP] DeviceIO.abort_test") return TSIX_TS_AbortTests(self.__device) def set_test_config(self, config: int): if config > 0xffffffff or config < 0: raise ValueError("Test config value must be more than 0 and less than 0xffffffff") TSIX_TS_SetConfigItem(self.__device, TSI_TEST_LOG_CONFIGURATION, config) def is_log_message_available(self, max_wait_timeout=5000) -> int: return int(TSIX_STLOG_WaitMessage(self.__device, max_wait_timeout)) def get_test_log_message(self) -> str: return TSIX_STLOG_GetMessageData(self.__device) def get_test_status(self) -> TestStatus: return TSIX_TS_GetConfigItem(self.__device, TSI_TEST_STATUS, TestStatus, 1)[1] def get_test_error_code(self) -> int: return TSIX_TS_GetConfigItem(self.__device, TSI_TEST_ERROR_CODE_R, c_uint32, 1)[1] def get_fw_version_string(self): return TSIX_TS_GetConfigItem(self.__device, TSI_FW_VERSION_TEXT, c_char, self.MAX_FW_VERSION_STR_SIZE)[1].value.decode('utf-8') def get_prepared_fw_info(self): return TSIX_TS_GetConfigItem(self.__device, TSI_DEVICE_INFO_R, c_char, self.MAX_FW_VERSION_STR_SIZE)[1].value.decode('utf-8') def get_bundle_version(self): ver = TSIX_TS_GetConfigItem(self.__device, TSI_BUNDLE_VERSION_TEXT, c_char, self.MAX_FW_VERSION_STR_SIZE)[1].value.decode('utf-8') if ver == '': ver = 'not specified (NOT FOR SHARING)' return ver def set_role(self, role): self.__current_role = role def get_role(self): if self.__current_role is not None: return weakref.proxy(self.__current_role) return None class BaseIO: def __init__(self): self.__ref_count = TSIX_Init() logging.info(f"[UniTAP] BaseIO.init: {self}; Ref count: {self.__ref_count}") TSIX_DEV_RescanDevices() def cleanup(self): self.__ref_count = TSI_Clean() logging.info(f"[UniTAP] BaseIO.del: {self}; Ref count: {self.__ref_count}") def get_device_count(self) -> int: if self.__ref_count > 0: return TSIX_DEV_GetDeviceCount() logging.error("[UniTAP] BaseIO.get_device_count call without TSI_Init") def get_device_name(self, index) -> str: if self.__ref_count > 0: return TSIX_DEV_GetDeviceName(index)[1] logging.error("[UniTAP] BaseIO.get_device_name call without TSI_Init") def get_devices_by_mask(self, require_caps: DeviceMaskInternal, unallowed_caps: DeviceMaskInternal): if self.__ref_count > 0: return TSI_DEV_SetSearchMask(require_caps.value, unallowed_caps.value) logging.error("[UniTAP] BaseIO.get_devices_by_mask call without TSI_Init") def create_device_io(self, index: int) -> DeviceIO: if self.__ref_count > 0: return DeviceIO(index) logging.error("[UniTAP] BaseIO.create_device_io call without TSI_Init")