import concurrent.futures
import copy
import os
import json
import re
import time
from typing import TypeVar, Generic, Optional, Type, Tuple, List
from UniTAP.utils import tsi_logging as logging
from UniTAP.dev.modules.dut_tests.test_info import TestResultObject, TestGroupId
from UniTAP.dev.modules.dut_tests.private_info import MergedTestGroups, group_params_dict, SubTestResultObject, \
MERGED_GROUP_PARAMS_TYPE, test_group_to_merged_group
from .private_info import TestResultObject, TestGroupId, MergedTestGroups, group_params_dict, SubTestResultObject, \
MERGED_GROUP_PARAMS_TYPE, test_group_to_merged_group
from UniTAP.dev.modules.dut_tests.test_group_params_types import dict_types, read_image_file, uicl_image_preparation
from UniTAP.libs.lib_tsi.tsi_io import DeviceIO, TestDialogConfig
from UniTAP.version import __version__
from .dut_default_params import *
from .test_utils import is_need_to_update_test_configuration, TestReportConfigType, convert_to_cdf_json_base64
from .report import update_configuration, dp21_source_timing_table, detect_warning_message
from .report import generate_html_report, TestAdditionalInfo, TestResult
from UniTAP.utils import tsi_logging as logging
from .test_info import TestStatusEnum
from .parser import Parser
class Test:
"""
Class `TestGroup` describes template of usual test.
Contains info of:
- Test name `name`.
- Test ID `id`
"""
def __init__(self, name: str, test_id: int):
self.__name = name
self.__id = test_id
@property
def name(self):
return self.__name
@property
def id(self):
return self.__id
def __str__(self):
return f"Test ID: {self.id} - {self.name}\n"
class TestGroup(Generic[DUTTestParameters]):
"""
Class `TestGroup` describes template of usual test group.
Contains info of:
- Test count `test_count`.
- Tests `tests`.
- Name `name`.
- ID `id`.
"""
def __init__(self, _id: TestGroupId, name: str):
self.__name = name
self.__id = _id
self.__tests = list()
@property
def test_count(self) -> int:
"""
Test count of test group.
Returns:
object of int type
"""
return len(self.__tests)
@property
def tests(self):
"""
Returns tests.
Returns:
object of `` type
"""
return self.__tests
@property
def name(self) -> str:
"""
Returns test group name.
Returns:
object of str type
"""
return self.__name
@property
def id(self) -> TestGroupId:
"""
Returns ID of test group.
Returns:
object of id type
"""
return self.__id
def _add_new_test(self, new_test: Test):
if not self.__test_presence_check(new_test):
self.__tests.append(new_test)
def __test_presence_check(self, test_to_check: Test) -> bool:
for test in self.__tests:
if test.id == test_to_check.id:
return True
return False
def __search_test_by_id(self, test_id: int) -> Optional[Test]:
for test in self.__tests:
if test.id == test_id:
return test
return None
def __str__(self):
tests_info = ""
for test in self.tests:
tests_info += test.__str__()
return f"Test group name: {self.name}. ID Group: {self.id}\n" \
f"Number of tests in a group: {self.test_count}\n" \
f"{tests_info}"
class DUTTests:
"""
Class `DUTTests` allows working with available tests on the device.
- Run `run`.
- Get default parameters for selected test group `get_default_parameters`.
- Get available test count in selected group `number_tests_in_group`.
- Get all test results `get_all_tests_results`.
- Clear all test results `clear_results`.
- Get information of available test groups `info_of_available_test_groups`.
- Make report after testing `make_report`.
- Run test from file `run_from_file` - Not implemented. Will be added later.
"""
def __init__(self, dev_io: DeviceIO):
self.__io = dev_io
self.__opf_device = None
self.__results = []
self.__default_parameters = {}
self.__test_groups = {}
self.__parser = Parser()
self.__io.set_test_config(0)
cfp_path = os.path.join(os.path.dirname(__file__), 'cfg')
files = os.listdir(cfp_path)
for file in files:
try:
if not os.path.isfile(os.path.join(cfp_path, file)):
continue
with open(os.path.join(cfp_path, file), encoding='UTF-8') as cfg_json:
json_obj = json.load(cfg_json)
item_group = MergedTestGroups(int(json_obj.get('id'), 16))
group_type = MERGED_GROUP_PARAMS_TYPE.get(item_group, None)
if group_type is None:
continue
group_parameters_dict = {}
for parameter in json_obj.get('descriptions'):
group_parameters_dict[parameter["id"]] = parameter
self.__default_parameters[group_type] = group_type(group_parameters_dict)
except ValueError:
logging.info(
f"[UniTAP] Failed to load {os.path.join(cfp_path, file)}.\n Value {int(json_obj.get('id'), 16)} is missing in MergedTestGroups.")
test_list = self.__io.get_test_list()
for test in test_list:
test_id = test[0] & 0xFFFF
group_id = TestGroupId(test[0] >> 16)
name = test[2]
if test[2].find(' / ') != -1:
name = test[2].split(" / ")
else:
name = [name]
if test[2].find("Validate audio") != -1:
name.insert(0, "Audio Test")
else:
name.insert(0, "Pixel Level Video Test")
group_id = TestGroupId.PIXEL_VIDEO_TEST
group_name = name[0]
test_name = name[1]
if self.__test_groups.get(group_id, None) is None:
self.__test_groups[group_id] = TestGroup(_id=group_id, name=group_name)
self.__test_groups[group_id]._add_new_test(Test(name=test_name, test_id=test_id))
def run(self, group_id: TestGroupId, test_id: int, params: Optional[DUTTestParameters] = None,
print_fw_logs: bool = True, test_delay: int = 0) -> SubTestResultObject:
"""
Run selected test of test group.
Args:
group_id (TestGroupId) - test group id
test_id (int)
params (DUTTestParameters) - one of the variants of params
print_fw_logs (bool) - print FW logs (enable/disable)
test_delay (int) - delay between tests (in seconds)
"""
if self.__test_groups.get(group_id, None) is None:
raise ValueError(f"Group {group_id} is not available.")
if group_id in [TestGroupId.AUDIO_TEST, TestGroupId.PIXEL_VIDEO_TEST]:
if group_id == TestGroupId.AUDIO_TEST and test_id != 3 or \
group_id == TestGroupId.PIXEL_VIDEO_TEST and test_id != 2:
raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.")
group = self.__test_groups.get(group_id, None)
test_name = self.__test_groups.get(group_id).tests[0].name
group_id = TestGroupId.AUDIO_TEST
else:
if test_id > self.__test_groups.get(group_id).test_count:
raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.")
group = self.__test_groups.get(group_id, None)
test_name = self.__test_groups.get(group_id).tests[test_id].name
merged_group_id = test_group_to_merged_group(group_id, test_id)
if params is not None:
params_list = get_param_list(params)
if isinstance(params, Dp21SourceDUTTestParam) and not params.dsc_video_modes._Dp21AvailableDscVideoModes__is_config_changed:
logging.warning("Detected that used didn't change new DSC config. Copy setting from video config.")
dsc_dp2_ci_remapper = {
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_30": "TSI_DP20_SRCCTS_VMT_1920_1080_30",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_60": "TSI_DP20_SRCCTS_VMT_1920_1080_60",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_120": "TSI_DP20_SRCCTS_VMT_1920_1080_120",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_144": "TSI_DP20_SRCCTS_VMT_1920_1080_144",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_240": "TSI_DP20_SRCCTS_VMT_1920_1080_240",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_30": "TSI_DP20_SRCCTS_VMT_3840_2160_30",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_60": "TSI_DP20_SRCCTS_VMT_3840_2160_60",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_120": "TSI_DP20_SRCCTS_VMT_3840_2160_120",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_144": "TSI_DP20_SRCCTS_VMT_3840_2160_144",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_240": "TSI_DP20_SRCCTS_VMT_3840_2160_240",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_30": "TSI_DP20_SRCCTS_VMT_5120_2160_30",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_60": "TSI_DP20_SRCCTS_VMT_5120_2160_60",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_120": "TSI_DP20_SRCCTS_VMT_5120_2160_120",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_144": "TSI_DP20_SRCCTS_VMT_5120_2160_144",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_240": "TSI_DP20_SRCCTS_VMT_5120_2160_240",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_30": "TSI_DP20_SRCCTS_VMT_7680_4320_30",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_60": "TSI_DP20_SRCCTS_VMT_7680_4320_60",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_120": "TSI_DP20_SRCCTS_VMT_7680_4320_120",
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_30": "TSI_DP20_SRCCTS_VMT_10240_4320_30",
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_60": "TSI_DP20_SRCCTS_VMT_10240_4320_60"
}
for key in dsc_dp2_ci_remapper.keys():
dst = next((obj for obj in params_list if obj.config_id_name == key), None)
src = next((obj for obj in params_list if obj.config_id_name == dsc_dp2_ci_remapper[key]), None)
if dst.default_value != src.default_value:
logging.warn(f"DSC Config missmatch detected. Apply workaround: {key}:{hex(dst.default_value)}; {dsc_dp2_ci_remapper[key]}:{hex(src.default_value)}")
dst.default_value = src.default_value
else:
logging.warn(f"DSC Config modification detected. Skip workaround.")
else:
params_list = self.__default_parameters[merged_group_id]
for item in params_list:
if group_id == TestGroupId.AUDIO_TEST and item.config_id_name == "TSI_REF1_FRAME_DATA":
if isinstance(item.default_value, str) and os.path.exists(item.default_value):
image_data = read_image_file(item.default_value)
if len(image_data) > 0:
item.default_value = uicl_image_preparation(image_data, params)
elif isinstance(item, bytearray):
# TODO: add 'bytearray' support
item.default_value = uicl_image_preparation(item.default_value, params)
self.__io.set(item.config_id, item.default_value, dict_types.get(item.value_type),
item.data_length)
report_type = is_need_to_update_test_configuration(params_list)
result_obj = SubTestResultObject(group.name, detect_warning_message(report_type, params_list))
result_obj.test_name = test_name
result_obj.json_config_info = convert_to_cdf_json_base64(params_list, merged_group_id)
if report_type != TestReportConfigType.Another:
result_obj.config_info = update_configuration(report_type, params_list)
else:
for item in params_list:
result_obj.config_info += "{} = {}
".format(item.name, item.__str__())
run_id = (group_id << 16) | test_id
self.__io.set_opf_config(TestDialogConfig(disable_auto_precessing=True))
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.__io.run_test, run_id)
try:
state = True
while future.running() or state:
state = self.__read_logs(result_obj, print_fw_logs)
except KeyboardInterrupt as e:
self.__io.abort_test()
raise
result_obj.test_result = TestResult(future.result())
result_obj.error_code = self.__io.get_test_error_code()
result_obj.test_delay = test_delay
self.__results.append(result_obj)
time.sleep(test_delay)
return copy.deepcopy(self.__results[-1])
def get_params_from_file(self, path: str) -> Tuple[TestGroupId, int, DUTTestParameters]:
"""
Get test parameters from transferred file: td or json (not cdf).
Args:
path ('str') - full path to config file
"""
self.__parser.set_file_name(path)
group_id = self.__parser.detect_group()
test_id = self.__parser.detect_test()
group_type = group_params_dict.get(group_id)
params = self.__parser.parse(self.get_default_parameters(group_type))
return group_id, test_id, params
# TODO - add support parsing '.td' (other test groups) file extension
def get_params_from_cdf_file(self, path: str) -> DUTTestParameters:
"""
Get test parameters from transferred file: json (cdf).
Args:
path ('str') - full path to config file
"""
self.__parser.set_file_name(path)
group_id = self.__parser.detect_merged_group()
group_type = MERGED_GROUP_PARAMS_TYPE.get(group_id)
params = self.__parser.parse(self.get_default_parameters(group_type))
return params
def get_default_parameters(self, group_type: Type[DUTTestParameters]) -> DUTTestParameters:
"""
Get predefined (default) parameters of test group.
Args:
group_type (`DUTTestParameters`) - test group id
Returns:
object of `DUTTestParameters` type
"""
for group_id in self.__default_parameters.keys():
if isinstance(self.__default_parameters.get(group_id), group_type):
return copy.deepcopy(self.__default_parameters.get(group_id))
raise ValueError(f"Group {group_type} is not available.")
def number_tests_in_group(self, group_id: TestGroupId) -> int:
"""
Returns all count of available tests of selected test group.
Args:
group_id (`TestGroupId`) - test group id
Returns:
object of int type
"""
return self.__search_group_by_id(group_id).test_count
def get_test_name(self, group_id: TestGroupId, test_id: int) -> str:
"""
Returns test name by test id in selected test group.
Args:
group_id (`TestGroupId`) - test group id
test_id (`int`) - test id
Returns:
object of str type
"""
if group_id in [TestGroupId.AUDIO_TEST, TestGroupId.PIXEL_VIDEO_TEST]:
if group_id == TestGroupId.AUDIO_TEST and test_id != 3 or \
group_id == TestGroupId.PIXEL_VIDEO_TEST and test_id != 2:
raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.")
group_id = TestGroupId.AUDIO_TEST
return self.__test_groups.get(group_id).tests[0 if test_id == 3 else 1].name
else:
if test_id > self.__test_groups.get(group_id).test_count:
raise ValueError(f"Test with id {test_id} is not available in {self.__test_groups.get(group_id).name}.")
return self.__test_groups.get(group_id).tests[test_id].name
def get_test_id_by_name(self, group_id: TestGroupId, name: str) -> int:
"""
Returns test ID by test name id in selected test group.
Args:
group_id (`TestGroupId`) - test group id
name (`str`) - test name or part name of the test
Returns:
object of int type
"""
if group_id == TestGroupId.AUDIO_TEST:
return 3
elif group_id == TestGroupId.PIXEL_VIDEO_TEST:
return 2
else:
for test in self.__test_groups.get(group_id).tests:
if test.name.find(name) != -1:
return test.id
raise ValueError(f"Incorrect test name: {name}")
def get_test_id_list_by_name(self, group_id: TestGroupId, name: str) -> List[int]:
"""
Returns list of test ID by test name id in selected test group.
Args:
group_id (`TestGroupId`) - test group id
name (`str`) - test name or part name of the test
Returns:
object of list[int] type
"""
if group_id == TestGroupId.AUDIO_TEST:
return [3]
elif group_id == TestGroupId.PIXEL_VIDEO_TEST:
return [2]
else:
id_list = []
for test in self.__test_groups.get(group_id).tests:
if test.name.find(name) != -1:
id_list.append(test.id)
return id_list
@staticmethod
def print_test_params(params: DUTTestParameters):
"""
Print all test parameters in readable format.
Args:
params (`DUTTestParameters`) - type of params
"""
params_list = get_param_list(params)
report_type = is_need_to_update_test_configuration(params_list)
if report_type != TestReportConfigType.Another:
info = update_configuration(report_type, params_list).replace("
", '\n')
if report_type == TestReportConfigType.Dp2_1_Source:
info = info[: info.find("