import concurrent.futures import copy import os import json import re import time from typing import TypeVar, Generic, Optional, Type, Tuple, List from UniTAP.utils import tsi_logging as logging from UniTAP.dev.modules.dut_tests.test_info import TestResultObject, TestGroupId from UniTAP.dev.modules.dut_tests.private_info import MergedTestGroups, group_params_dict, SubTestResultObject, \ MERGED_GROUP_PARAMS_TYPE, test_group_to_merged_group from .private_info import TestResultObject, TestGroupId, MergedTestGroups, group_params_dict, SubTestResultObject, \ MERGED_GROUP_PARAMS_TYPE, test_group_to_merged_group from UniTAP.dev.modules.dut_tests.test_group_params_types import dict_types, read_image_file, uicl_image_preparation from UniTAP.libs.lib_tsi.tsi_io import DeviceIO, TestDialogConfig from UniTAP.version import __version__ from .dut_default_params import * from .test_utils import is_need_to_update_test_configuration, TestReportConfigType, convert_to_cdf_json_base64 from .report import update_configuration, dp21_source_timing_table, detect_warning_message from .report import generate_html_report, TestAdditionalInfo, TestResult from UniTAP.utils import tsi_logging as logging from .test_info import TestStatusEnum from .parser import Parser class Test: """ Class `TestGroup` describes template of usual test. Contains info of: - Test name `name`. - Test ID `id` """ def __init__(self, name: str, test_id: int): self.__name = name self.__id = test_id @property def name(self): return self.__name @property def id(self): return self.__id def __str__(self): return f"Test ID: {self.id} - {self.name}\n" class TestGroup(Generic[DUTTestParameters]): """ Class `TestGroup` describes template of usual test group. Contains info of: - Test count `test_count`. - Tests `tests`. - Name `name`. - ID `id`. """ def __init__(self, _id: TestGroupId, name: str): self.__name = name self.__id = _id self.__tests = list() @property def test_count(self) -> int: """ Test count of test group. Returns: object of int type """ return len(self.__tests) @property def tests(self): """ Returns tests. Returns: object of `` type """ return self.__tests @property def name(self) -> str: """ Returns test group name. Returns: object of str type """ return self.__name @property def id(self) -> TestGroupId: """ Returns ID of test group. Returns: object of id type """ return self.__id def _add_new_test(self, new_test: Test): if not self.__test_presence_check(new_test): self.__tests.append(new_test) def __test_presence_check(self, test_to_check: Test) -> bool: for test in self.__tests: if test.id == test_to_check.id: return True return False def __search_test_by_id(self, test_id: int) -> Optional[Test]: for test in self.__tests: if test.id == test_id: return test return None def __str__(self): tests_info = "" for test in self.tests: tests_info += test.__str__() return f"Test group name: {self.name}. ID Group: {self.id}\n" \ f"Number of tests in a group: {self.test_count}\n" \ f"{tests_info}" class DUTTests: """ Class `DUTTests` allows working with available tests on the device. - Run `run`. - Get default parameters for selected test group `get_default_parameters`. - Get available test count in selected group `number_tests_in_group`. - Get all test results `get_all_tests_results`. - Clear all test results `clear_results`. - Get information of available test groups `info_of_available_test_groups`. - Make report after testing `make_report`. - Run test from file `run_from_file` - Not implemented. Will be added later. """ def __init__(self, dev_io: DeviceIO): self.__io = dev_io self.__opf_device = None self.__results = [] self.__default_parameters = {} self.__test_groups = {} self.__parser = Parser() self.__io.set_test_config(0) cfp_path = os.path.join(os.path.dirname(__file__), 'cfg') files = os.listdir(cfp_path) for file in files: try: if not os.path.isfile(os.path.join(cfp_path, file)): continue with open(os.path.join(cfp_path, file), encoding='UTF-8') as cfg_json: json_obj = json.load(cfg_json) item_group = MergedTestGroups(int(json_obj.get('id'), 16)) group_type = MERGED_GROUP_PARAMS_TYPE.get(item_group, None) if group_type is None: continue group_parameters_dict = {} for parameter in json_obj.get('descriptions'): group_parameters_dict[parameter["id"]] = parameter self.__default_parameters[group_type] = group_type(group_parameters_dict) except ValueError: logging.info( f"[UniTAP] Failed to load {os.path.join(cfp_path, file)}.\n Value {int(json_obj.get('id'), 16)} is missing in MergedTestGroups.") test_list = self.__io.get_test_list() for test in test_list: test_id = test[0] & 0xFFFF group_id = TestGroupId(test[0] >> 16) name = test[2] if test[2].find(' / ') != -1: name = test[2].split(" / ") else: name = [name] if test[2].find("Validate audio") != -1: name.insert(0, "Audio Test") else: name.insert(0, "Pixel Level Video Test") group_id = TestGroupId.PIXEL_VIDEO_TEST group_name = name[0] test_name = name[1] if self.__test_groups.get(group_id, None) is None: self.__test_groups[group_id] = TestGroup(_id=group_id, name=group_name) self.__test_groups[group_id]._add_new_test(Test(name=test_name, test_id=test_id)) def run(self, group_id: TestGroupId, test_id: int, params: Optional[DUTTestParameters] = None, print_fw_logs: bool = True, test_delay: int = 0) -> SubTestResultObject: """ Run selected test of test group. Args: group_id (TestGroupId) - test group id test_id (int) params (DUTTestParameters) - one of the variants of params print_fw_logs (bool) - print FW logs (enable/disable) test_delay (int) - delay between tests (in seconds) """ if self.__test_groups.get(group_id, None) is None: raise ValueError(f"Group {group_id} is not available.") if group_id in [TestGroupId.AUDIO_TEST, TestGroupId.PIXEL_VIDEO_TEST]: if group_id == TestGroupId.AUDIO_TEST and test_id != 3 or \ group_id == TestGroupId.PIXEL_VIDEO_TEST and test_id != 2: raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.") group = self.__test_groups.get(group_id, None) test_name = self.__test_groups.get(group_id).tests[0].name group_id = TestGroupId.AUDIO_TEST else: if test_id > self.__test_groups.get(group_id).test_count: raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.") group = self.__test_groups.get(group_id, None) test_name = self.__test_groups.get(group_id).tests[test_id].name merged_group_id = test_group_to_merged_group(group_id, test_id) if params is not None: params_list = get_param_list(params) if isinstance(params, Dp21SourceDUTTestParam) and not params.dsc_video_modes._Dp21AvailableDscVideoModes__is_config_changed: logging.warning("Detected that used didn't change new DSC config. Copy setting from video config.") dsc_dp2_ci_remapper = { "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_30": "TSI_DP20_SRCCTS_VMT_1920_1080_30", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_60": "TSI_DP20_SRCCTS_VMT_1920_1080_60", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_120": "TSI_DP20_SRCCTS_VMT_1920_1080_120", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_144": "TSI_DP20_SRCCTS_VMT_1920_1080_144", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_240": "TSI_DP20_SRCCTS_VMT_1920_1080_240", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_30": "TSI_DP20_SRCCTS_VMT_3840_2160_30", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_60": "TSI_DP20_SRCCTS_VMT_3840_2160_60", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_120": "TSI_DP20_SRCCTS_VMT_3840_2160_120", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_144": "TSI_DP20_SRCCTS_VMT_3840_2160_144", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_240": "TSI_DP20_SRCCTS_VMT_3840_2160_240", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_30": "TSI_DP20_SRCCTS_VMT_5120_2160_30", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_60": "TSI_DP20_SRCCTS_VMT_5120_2160_60", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_120": "TSI_DP20_SRCCTS_VMT_5120_2160_120", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_144": "TSI_DP20_SRCCTS_VMT_5120_2160_144", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_240": "TSI_DP20_SRCCTS_VMT_5120_2160_240", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_30": "TSI_DP20_SRCCTS_VMT_7680_4320_30", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_60": "TSI_DP20_SRCCTS_VMT_7680_4320_60", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_120": "TSI_DP20_SRCCTS_VMT_7680_4320_120", "TSI_DP20_SRCCTS_DSC_VMT_10240_4320_30": "TSI_DP20_SRCCTS_VMT_10240_4320_30", "TSI_DP20_SRCCTS_DSC_VMT_10240_4320_60": "TSI_DP20_SRCCTS_VMT_10240_4320_60" } for key in dsc_dp2_ci_remapper.keys(): dst = next((obj for obj in params_list if obj.config_id_name == key), None) src = next((obj for obj in params_list if obj.config_id_name == dsc_dp2_ci_remapper[key]), None) if dst.default_value != src.default_value: logging.warn(f"DSC Config missmatch detected. Apply workaround: {key}:{hex(dst.default_value)}; {dsc_dp2_ci_remapper[key]}:{hex(src.default_value)}") dst.default_value = src.default_value else: logging.warn(f"DSC Config modification detected. Skip workaround.") else: params_list = self.__default_parameters[merged_group_id] for item in params_list: if group_id == TestGroupId.AUDIO_TEST and item.config_id_name == "TSI_REF1_FRAME_DATA": if isinstance(item.default_value, str) and os.path.exists(item.default_value): image_data = read_image_file(item.default_value) if len(image_data) > 0: item.default_value = uicl_image_preparation(image_data, params) elif isinstance(item, bytearray): # TODO: add 'bytearray' support item.default_value = uicl_image_preparation(item.default_value, params) self.__io.set(item.config_id, item.default_value, dict_types.get(item.value_type), item.data_length) report_type = is_need_to_update_test_configuration(params_list) result_obj = SubTestResultObject(group.name, detect_warning_message(report_type, params_list)) result_obj.test_name = test_name result_obj.json_config_info = convert_to_cdf_json_base64(params_list, merged_group_id) if report_type != TestReportConfigType.Another: result_obj.config_info = update_configuration(report_type, params_list) else: for item in params_list: result_obj.config_info += "{} = {}
".format(item.name, item.__str__()) run_id = (group_id << 16) | test_id self.__io.set_opf_config(TestDialogConfig(disable_auto_precessing=True)) with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(self.__io.run_test, run_id) try: state = True while future.running() or state: state = self.__read_logs(result_obj, print_fw_logs) except KeyboardInterrupt as e: self.__io.abort_test() raise result_obj.test_result = TestResult(future.result()) result_obj.error_code = self.__io.get_test_error_code() result_obj.test_delay = test_delay self.__results.append(result_obj) time.sleep(test_delay) return copy.deepcopy(self.__results[-1]) def get_params_from_file(self, path: str) -> Tuple[TestGroupId, int, DUTTestParameters]: """ Get test parameters from transferred file: td or json (not cdf). Args: path ('str') - full path to config file """ self.__parser.set_file_name(path) group_id = self.__parser.detect_group() test_id = self.__parser.detect_test() group_type = group_params_dict.get(group_id) params = self.__parser.parse(self.get_default_parameters(group_type)) return group_id, test_id, params # TODO - add support parsing '.td' (other test groups) file extension def get_params_from_cdf_file(self, path: str) -> DUTTestParameters: """ Get test parameters from transferred file: json (cdf). Args: path ('str') - full path to config file """ self.__parser.set_file_name(path) group_id = self.__parser.detect_merged_group() group_type = MERGED_GROUP_PARAMS_TYPE.get(group_id) params = self.__parser.parse(self.get_default_parameters(group_type)) return params def get_default_parameters(self, group_type: Type[DUTTestParameters]) -> DUTTestParameters: """ Get predefined (default) parameters of test group. Args: group_type (`DUTTestParameters`) - test group id Returns: object of `DUTTestParameters` type """ for group_id in self.__default_parameters.keys(): if isinstance(self.__default_parameters.get(group_id), group_type): return copy.deepcopy(self.__default_parameters.get(group_id)) raise ValueError(f"Group {group_type} is not available.") def number_tests_in_group(self, group_id: TestGroupId) -> int: """ Returns all count of available tests of selected test group. Args: group_id (`TestGroupId`) - test group id Returns: object of int type """ return self.__search_group_by_id(group_id).test_count def get_test_name(self, group_id: TestGroupId, test_id: int) -> str: """ Returns test name by test id in selected test group. Args: group_id (`TestGroupId`) - test group id test_id (`int`) - test id Returns: object of str type """ if group_id in [TestGroupId.AUDIO_TEST, TestGroupId.PIXEL_VIDEO_TEST]: if group_id == TestGroupId.AUDIO_TEST and test_id != 3 or \ group_id == TestGroupId.PIXEL_VIDEO_TEST and test_id != 2: raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.") group_id = TestGroupId.AUDIO_TEST return self.__test_groups.get(group_id).tests[0 if test_id == 3 else 1].name else: if test_id > self.__test_groups.get(group_id).test_count: raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.") return self.__test_groups.get(group_id).tests[test_id].name def get_test_id_by_name(self, group_id: TestGroupId, name: str) -> int: """ Returns test ID by test name id in selected test group. Args: group_id (`TestGroupId`) - test group id name (`str`) - test name or part name of the test Returns: object of int type """ if group_id == TestGroupId.AUDIO_TEST: return 3 elif group_id == TestGroupId.PIXEL_VIDEO_TEST: return 2 else: for test in self.__test_groups.get(group_id).tests: if test.name.find(name) != -1: return test.id raise ValueError(f"Incorrect test name: {name}") def get_test_id_list_by_name(self, group_id: TestGroupId, name: str) -> List[int]: """ Returns list of test ID by test name id in selected test group. Args: group_id (`TestGroupId`) - test group id name (`str`) - test name or part name of the test Returns: object of list[int] type """ if group_id == TestGroupId.AUDIO_TEST: return [3] elif group_id == TestGroupId.PIXEL_VIDEO_TEST: return [2] else: id_list = [] for test in self.__test_groups.get(group_id).tests: if test.name.find(name) != -1: id_list.append(test.id) return id_list @staticmethod def print_test_params(params: DUTTestParameters): """ Print all test parameters in readable format. Args: params (`DUTTestParameters`) - type of params """ params_list = get_param_list(params) report_type = is_need_to_update_test_configuration(params_list) if report_type != TestReportConfigType.Another: info = update_configuration(report_type, params_list).replace("
", '\n') if report_type == TestReportConfigType.Dp2_1_Source: info = info[: info.find(" TestResultObject: """ Returns all test result. Combined in one `TestResultObject` object. Returns: object of TestResultObject type """ return TestResultObject(self.__results) def clear_results(self): """ Clear all results. """ self.__results.clear() @property def info_of_available_test_groups(self) -> str: """ Returns all info in string format of test groups. Returns: object of str type """ test_groups_info = "" for group_key in self.__test_groups.keys(): test_groups_info += self.__test_groups.get(group_key).__str__() + '\n' return test_groups_info def __search_group_by_id(self, group_id: TestGroupId) -> TestGroup: for group in self.__test_groups: if group == group_id: return self.__test_groups[group] return TestGroup(TestGroupId.UNKNOWN, 'Unknown') def make_report(self, path: str = "Report.html", tested_by="", dut_driver_version="", dut_firmware_version="", dut_model_name="", dut_revision="", dut_serial_number="", remarks="", results=None): """ Make report after testing. Args: path (str) - path to save report tested_by (str) - who tested dut_driver_version (str) - DUT driver version dut_firmware_version (str) - DUT FW version dut_model_name (str) - DUT model name dut_revision (str) - DUT revision dut_serial_number (str) - DUT serial number remarks (int) - additional remarks of testing results (list|None) - results of testing (if list is empty or None, usually use internal list of test results, which was filled during testing) """ test_additional_info = TestAdditionalInfo() device_name, serial_number, fw_version, front_end, memory_size = self.__io.get_prepared_fw_info().split("|") test_additional_info.device_name = f"{device_name} [{serial_number.replace(' ', '')}]: " \ f"{self.__io.get_role().name}" test_additional_info.device_detailed_data = fw_version test_additional_info.device_frontend_version = front_end test_additional_info.memory_size = memory_size test_additional_info.python_version = __version__ test_additional_info.bundle_version = self.__io.get_bundle_version() test_additional_info.tested_by = tested_by test_additional_info.dut_driver_version = dut_driver_version test_additional_info.dut_firmware_version = dut_firmware_version test_additional_info.dut_model_name = dut_model_name test_additional_info.dut_revision = dut_revision test_additional_info.dut_serial_number = dut_serial_number test_additional_info.remarks = remarks if results is None: generate_html_report(path, test_additional_info, self.__results) else: generate_html_report(path, test_additional_info, results) def __read_logs(self, test_result: SubTestResultObject, print_logs: bool) -> bool: try: msg_count = self.__io.is_log_message_available(250) except ReferenceError as e: return False if msg_count <= 0: return False for _ in range(msg_count): result, message, length = self.__io.get_test_log_message() if result < 0: continue if print_logs: print(message) try: test_result.fw_logs += message + "\n" if str(message).find("Error:") != -1: try: new_str = message.split("Error:")[1] error_code = int(re.findall(r'\d+', new_str)[0]) test_result.error_logs = new_str.replace(f"{error_code}:", "") except BaseException as e: pass except BaseException as e: pass return True