import os import json import re import warnings from UniTAP.utils import tsi_logging as logging from ..private_info import MergedTestGroups, test_group_to_merged_group from ..test_group_params_types import DataType from ..test_info import TestGroupId from ..dut_default_params import * from ..dut_default_params.crc_video_tests import CrcVideoTestBpp, BrokenFrameExportFormat from enum import IntEnum class ParserFileBase: def __init__(self): self.__file_data = "" self.__group_id = TestGroupId.UNKNOWN self.__test_id = 0 class ParserTd(ParserFileBase): def __init__(self): super().__init__() @staticmethod def __open_file(file_path: str) -> str: with open(file_path, "r") as file: data = file.read() return data @property def group_id(self) -> TestGroupId: return self.__group_id @property def test_id(self) -> int: return self.__test_id def __detect_group(self) -> TestGroupId: group_str = self.__file_data.split("\n")[0] group_int = int(group_str[:4], 16) if not TestGroupId.has_value(group_int): raise ValueError(f"Incorrect test group with ID {group_int}.") return TestGroupId(group_int) def __detect_test(self) -> int: group_str = self.__file_data.split("\n")[0] return int(group_str[4:], 16) def set_file_name(self, file_path: str): self.__file_data = self.__open_file(file_path) self.__group_id = self.__detect_group() self.__test_id = self.__detect_test() def parse(self, params: DUTTestParameters) -> DUTTestParameters: if isinstance(params, CrcVideoTestParam): data = self.__file_data.split("\n") if data[1][len(data[1]) - 1] == " ": data[1] = data[1][: -2] params_count, timeout, frames_to_test, ref_frame_count, frame_mismatches, width, height, bpp, frame_rate, \ rate_tolerance, crc_r, crc_g, crc_b, motion_iteration, *_ = data[1].split(" ") crc_values = [] if len(data) > 2: crc_values_str = data[2].split(" ") crc_count = int(crc_values_str[0]) if crc_count > 0: for i in range(0, crc_count, 2): crc_values.append(int(crc_values_str[i + 1]) & 0xFFFF) crc_values.append((int(crc_values_str[i + 1]) >> 16) & 0xFFFF) crc_values.append(int(crc_values_str[i + 2]) & 0xFFFF) else: crc_values.extend([int(crc_r), int(crc_g), int(crc_b)]) else: crc_values.extend([int(crc_r), int(crc_g), int(crc_b)]) params.timeout = int(timeout) params.number_frames_to_test = int(frames_to_test) params.number_reference_frames = int(ref_frame_count) params.number_frames_mismatch = int(frame_mismatches) params.reference_width = int(width) params.reference_height = int(height) params.reference_color_depth = CrcVideoTestBpp(int(bpp)) params.required_frame_rate = int(frame_rate) params.frame_rate_tolerance = int(rate_tolerance) params.motion_test_iteration = int(motion_iteration) params.reference_crc_values = crc_values return params elif isinstance(params, HdmiSinkContinuityDUTTestParam): data = self.__file_data.split("\n") if data[1][len(data[1]) - 1] == " ": data[1] = data[1][: -2] params_count, test_time, status_period, stop_flag, enable_scdc_version, enable_scdc_status, \ enable_scdc_error_count, scdc_error_count, *_ = data[1].split(" ") params.test_time = int(test_time) params.status_period = int(status_period) params.stop_flag = bool(stop_flag) params.enable_scdc_version = bool(enable_scdc_version) params.enable_scdc_status = bool(enable_scdc_status) params.enable_scdc_error_count = bool(enable_scdc_error_count) params.scdc_error_count = int(scdc_error_count) return params else: raise NotImplementedError(f"Group {params.__name__} is not currently supported.") class ParserJson(ParserFileBase): class JsonFormat(IntEnum): UNKNOWN = -1 EXPORT = 0 CDF = 1 __DSC_VMT_CI_LIST = [ "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_30", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_60", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_120", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_144", "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_240", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_30", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_60", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_120", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_144", "TSI_DP20_SRCCTS_DSC_VMT_3840_2160_240", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_30", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_60", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_120", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_144", "TSI_DP20_SRCCTS_DSC_VMT_5120_2160_240", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_30", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_60", "TSI_DP20_SRCCTS_DSC_VMT_7680_4320_120", "TSI_DP20_SRCCTS_DSC_VMT_10240_4320_30", "TSI_DP20_SRCCTS_DSC_VMT_10240_4320_60" ] def __init__(self): super().__init__() self.__type = self.JsonFormat.UNKNOWN self.__merged_group_id = MergedTestGroups.Unknown self.__test_id = -1 self.__group_id = TestGroupId.UNKNOWN def __open_file(self, file_path: str) -> dict: with open(file_path, encoding='UTF-8') as cfg_json: data = json.load(cfg_json) if data.get('TestId') is not None: self.__type = self.JsonFormat.EXPORT elif data.get('Header_Type') is not None: self.__type = self.JsonFormat.CDF return data @property def group_id(self) -> TestGroupId: return self.__group_id @property def merged_group_id(self) -> MergedTestGroups: return self.__merged_group_id @property def test_id(self) -> int: return self.__test_id @property def json_type(self) -> JsonFormat: return self.__type def __detect_group(self) -> TestGroupId: group_int = int(self.__file_data.get('TestId')[:4], 16) if group_int == 0x0 and self.__file_data.get('Data').get("TSI_REF1_FRAME_DATA") is not None: group_int = 0x3E8 if not TestGroupId.has_value(group_int): raise ValueError(f"Incorrect test group with ID {group_int}.") return TestGroupId(group_int) def __detect_merged_group(self) -> MergedTestGroups: return MergedTestGroups(int(re.findall(r"0x\d+\w+_", self.__file_data.get("Header_Type"))[0][:-1], 16)) def __detect_test(self) -> int: return int(self.__file_data.get('TestId')[4:], 16) def set_file_name(self, file_path: str): self.__file_data = self.__open_file(file_path) if self.json_type == self.JsonFormat.EXPORT: self.__group_id = self.__detect_group() self.__test_id = self.__detect_test() elif self.json_type == self.JsonFormat.CDF: self.__merged_group_id = self.__detect_merged_group() else: raise ValueError(f"Incorrect json file type {self.__type.name}.") def parse(self, params: DUTTestParameters) -> DUTTestParameters: try: param_list = get_param_list(params) if self.json_type == self.JsonFormat.EXPORT: json_data = self.__file_data.get('Data') else: json_data = self.__file_data for param in param_list: if param.config_id_name in json_data.keys(): value = self.__json_to_param_value(param, json_data.get(param.config_id_name)) if param.config_id_name == "TSI_REF1_ELEMENT_FORMAT": param.direct_set_def_value(value) else: param.default_value = value else: if (self.__merged_group_id == MergedTestGroups.DPRX_128b132b_LL_CTS or self.__group_id == TestGroupId.DP_2_1_RX_DSC_CTS): if param.config_id_name in self.__DSC_VMT_CI_LIST: param.default_value = self.__json_to_param_value(param, json_data.get(param.config_id_name.replace("DSC_", ""))) logging.warn(f"Required CDF key {param.config_id_name} is missing in provided CDF file. It will be skipped.") return params except BaseException as e: raise ValueError(f"Group {str(params)} cannot be parsed. Error {e}") @staticmethod def __json_to_param_value(param, json_value): if param.value_type in [ DataType.ArrayU8, DataType.ArrayU16, DataType.ArrayU32 ]: str_vals = json_value.split(" ")[ :-1 ] return [ int(i) for i in str_vals ] elif param.value_type in [ DataType.Int, DataType.Uint32, DataType.BitList ]: return int(json_value) elif param.value_type == DataType.Bool: return bool(json_value) elif param.value_type == DataType.Double: return float(json_value) elif param.value_type == DataType.ArrayFromFilePath: return str(json_value) else: return json_value class Parser: class ConfigFileFormat(IntEnum): UNKNOWN = -1 TD = 0 JSON = 1 def __init__(self): self.__extension = "" self.__parser_td = ParserTd() self.__parser_json = ParserJson() @staticmethod def __detect_file_format(file_path: str) -> ConfigFileFormat: filename, file_extension = os.path.splitext(file_path) if file_extension.lower() == '.json': return Parser.ConfigFileFormat.JSON elif file_extension.lower() == '.td': return Parser.ConfigFileFormat.TD else: return Parser.ConfigFileFormat.UNKNOWN def set_file_name(self, file_path: str): self.__extension = self.__detect_file_format(file_path) if self.__extension == Parser.ConfigFileFormat.JSON: self.__parser_json.set_file_name(file_path) elif self.__extension == Parser.ConfigFileFormat.TD: self.__parser_td.set_file_name(file_path) else: raise ValueError("Incorrect file format. Must be: TD or JSON.") def detect_group(self) -> TestGroupId: if self.__extension == Parser.ConfigFileFormat.JSON: if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT: return self.__parser_json.group_id else: raise ValueError("Cannot get Test Group ID from JSON (CDF type) file") elif self.__extension == Parser.ConfigFileFormat.TD: return self.__parser_td.group_id else: raise ValueError("Incorrect file format. Must be: TD or JSON.") def detect_merged_group(self) -> MergedTestGroups: if self.__extension == Parser.ConfigFileFormat.JSON: if self.__parser_json.json_type == ParserJson.JsonFormat.CDF: return self.__parser_json.merged_group_id else: return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id) elif self.__extension == Parser.ConfigFileFormat.TD: return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id) else: raise ValueError("Incorrect file format. Must be: TD or JSON.") def detect_test(self) -> int: if self.__extension == Parser.ConfigFileFormat.JSON: if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT: return self.__parser_json.test_id else: raise ValueError("Cannot get Test ID from JSON (CDF type) file") elif self.__extension == Parser.ConfigFileFormat.TD: return self.__parser_td.test_id else: raise ValueError("Incorrect file format. Must be: TD or JSON.") def parse(self, params: DUTTestParameters) -> DUTTestParameters: if self.__extension == Parser.ConfigFileFormat.JSON: return self.__parser_json.parse(params) elif self.__extension == Parser.ConfigFileFormat.TD: return self.__parser_td.parse(params) else: raise ValueError("Incorrect file format. Must be: TD or JSON.")