316 lines
13 KiB
Python
316 lines
13 KiB
Python
|
|
import os
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import warnings
|
||
|
|
|
||
|
|
from UniTAP.utils import tsi_logging as logging
|
||
|
|
from ..private_info import MergedTestGroups, test_group_to_merged_group
|
||
|
|
from ..test_group_params_types import DataType
|
||
|
|
from ..test_info import TestGroupId
|
||
|
|
from ..dut_default_params import *
|
||
|
|
from ..dut_default_params.crc_video_tests import CrcVideoTestBpp, BrokenFrameExportFormat
|
||
|
|
from enum import IntEnum
|
||
|
|
|
||
|
|
|
||
|
|
class ParserFileBase:
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.__file_data = ""
|
||
|
|
self.__group_id = TestGroupId.UNKNOWN
|
||
|
|
self.__test_id = 0
|
||
|
|
|
||
|
|
|
||
|
|
class ParserTd(ParserFileBase):
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def __open_file(file_path: str) -> str:
|
||
|
|
with open(file_path, "r") as file:
|
||
|
|
data = file.read()
|
||
|
|
return data
|
||
|
|
|
||
|
|
@property
|
||
|
|
def group_id(self) -> TestGroupId:
|
||
|
|
return self.__group_id
|
||
|
|
|
||
|
|
@property
|
||
|
|
def test_id(self) -> int:
|
||
|
|
return self.__test_id
|
||
|
|
|
||
|
|
def __detect_group(self) -> TestGroupId:
|
||
|
|
group_str = self.__file_data.split("\n")[0]
|
||
|
|
group_int = int(group_str[:4], 16)
|
||
|
|
if not TestGroupId.has_value(group_int):
|
||
|
|
raise ValueError(f"Incorrect test group with ID {group_int}.")
|
||
|
|
return TestGroupId(group_int)
|
||
|
|
|
||
|
|
def __detect_test(self) -> int:
|
||
|
|
group_str = self.__file_data.split("\n")[0]
|
||
|
|
return int(group_str[4:], 16)
|
||
|
|
|
||
|
|
def set_file_name(self, file_path: str):
|
||
|
|
self.__file_data = self.__open_file(file_path)
|
||
|
|
self.__group_id = self.__detect_group()
|
||
|
|
self.__test_id = self.__detect_test()
|
||
|
|
|
||
|
|
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
|
||
|
|
if isinstance(params, CrcVideoTestParam):
|
||
|
|
data = self.__file_data.split("\n")
|
||
|
|
if data[1][len(data[1]) - 1] == " ":
|
||
|
|
data[1] = data[1][: -2]
|
||
|
|
|
||
|
|
params_count, timeout, frames_to_test, ref_frame_count, frame_mismatches, width, height, bpp, frame_rate, \
|
||
|
|
rate_tolerance, crc_r, crc_g, crc_b, motion_iteration, *_ = data[1].split(" ")
|
||
|
|
|
||
|
|
crc_values = []
|
||
|
|
if len(data) > 2:
|
||
|
|
crc_values_str = data[2].split(" ")
|
||
|
|
crc_count = int(crc_values_str[0])
|
||
|
|
if crc_count > 0:
|
||
|
|
for i in range(0, crc_count, 2):
|
||
|
|
crc_values.append(int(crc_values_str[i + 1]) & 0xFFFF)
|
||
|
|
crc_values.append((int(crc_values_str[i + 1]) >> 16) & 0xFFFF)
|
||
|
|
crc_values.append(int(crc_values_str[i + 2]) & 0xFFFF)
|
||
|
|
else:
|
||
|
|
crc_values.extend([int(crc_r), int(crc_g), int(crc_b)])
|
||
|
|
else:
|
||
|
|
crc_values.extend([int(crc_r), int(crc_g), int(crc_b)])
|
||
|
|
|
||
|
|
params.timeout = int(timeout)
|
||
|
|
params.number_frames_to_test = int(frames_to_test)
|
||
|
|
params.number_reference_frames = int(ref_frame_count)
|
||
|
|
params.number_frames_mismatch = int(frame_mismatches)
|
||
|
|
params.reference_width = int(width)
|
||
|
|
params.reference_height = int(height)
|
||
|
|
params.reference_color_depth = CrcVideoTestBpp(int(bpp))
|
||
|
|
params.required_frame_rate = int(frame_rate)
|
||
|
|
params.frame_rate_tolerance = int(rate_tolerance)
|
||
|
|
params.motion_test_iteration = int(motion_iteration)
|
||
|
|
params.reference_crc_values = crc_values
|
||
|
|
|
||
|
|
return params
|
||
|
|
elif isinstance(params, HdmiSinkContinuityDUTTestParam):
|
||
|
|
data = self.__file_data.split("\n")
|
||
|
|
if data[1][len(data[1]) - 1] == " ":
|
||
|
|
data[1] = data[1][: -2]
|
||
|
|
|
||
|
|
params_count, test_time, status_period, stop_flag, enable_scdc_version, enable_scdc_status, \
|
||
|
|
enable_scdc_error_count, scdc_error_count, *_ = data[1].split(" ")
|
||
|
|
|
||
|
|
params.test_time = int(test_time)
|
||
|
|
params.status_period = int(status_period)
|
||
|
|
params.stop_flag = bool(stop_flag)
|
||
|
|
params.enable_scdc_version = bool(enable_scdc_version)
|
||
|
|
params.enable_scdc_status = bool(enable_scdc_status)
|
||
|
|
params.enable_scdc_error_count = bool(enable_scdc_error_count)
|
||
|
|
params.scdc_error_count = int(scdc_error_count)
|
||
|
|
|
||
|
|
return params
|
||
|
|
else:
|
||
|
|
raise NotImplementedError(f"Group {params.__name__} is not currently supported.")
|
||
|
|
|
||
|
|
|
||
|
|
class ParserJson(ParserFileBase):
|
||
|
|
|
||
|
|
class JsonFormat(IntEnum):
|
||
|
|
UNKNOWN = -1
|
||
|
|
EXPORT = 0
|
||
|
|
CDF = 1
|
||
|
|
|
||
|
|
__DSC_VMT_CI_LIST = [ "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_30",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_60",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_120",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_144",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_240",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_30",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_60",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_120",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_144",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_240",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_30",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_60",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_120",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_144",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_240",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_30",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_60",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_120",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_30",
|
||
|
|
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_60" ]
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
self.__type = self.JsonFormat.UNKNOWN
|
||
|
|
self.__merged_group_id = MergedTestGroups.Unknown
|
||
|
|
self.__test_id = -1
|
||
|
|
self.__group_id = TestGroupId.UNKNOWN
|
||
|
|
|
||
|
|
def __open_file(self, file_path: str) -> dict:
|
||
|
|
with open(file_path, encoding='UTF-8') as cfg_json:
|
||
|
|
data = json.load(cfg_json)
|
||
|
|
|
||
|
|
if data.get('TestId') is not None:
|
||
|
|
self.__type = self.JsonFormat.EXPORT
|
||
|
|
elif data.get('Header_Type') is not None:
|
||
|
|
self.__type = self.JsonFormat.CDF
|
||
|
|
|
||
|
|
return data
|
||
|
|
|
||
|
|
@property
|
||
|
|
def group_id(self) -> TestGroupId:
|
||
|
|
return self.__group_id
|
||
|
|
|
||
|
|
@property
|
||
|
|
def merged_group_id(self) -> MergedTestGroups:
|
||
|
|
return self.__merged_group_id
|
||
|
|
|
||
|
|
@property
|
||
|
|
def test_id(self) -> int:
|
||
|
|
return self.__test_id
|
||
|
|
|
||
|
|
@property
|
||
|
|
def json_type(self) -> JsonFormat:
|
||
|
|
return self.__type
|
||
|
|
|
||
|
|
def __detect_group(self) -> TestGroupId:
|
||
|
|
group_int = int(self.__file_data.get('TestId')[:4], 16)
|
||
|
|
if group_int == 0x0 and self.__file_data.get('Data').get("TSI_REF1_FRAME_DATA") is not None:
|
||
|
|
group_int = 0x3E8
|
||
|
|
if not TestGroupId.has_value(group_int):
|
||
|
|
raise ValueError(f"Incorrect test group with ID {group_int}.")
|
||
|
|
return TestGroupId(group_int)
|
||
|
|
|
||
|
|
def __detect_merged_group(self) -> MergedTestGroups:
|
||
|
|
return MergedTestGroups(int(re.findall(r"0x\d+\w+_", self.__file_data.get("Header_Type"))[0][:-1], 16))
|
||
|
|
|
||
|
|
def __detect_test(self) -> int:
|
||
|
|
return int(self.__file_data.get('TestId')[4:], 16)
|
||
|
|
|
||
|
|
def set_file_name(self, file_path: str):
|
||
|
|
self.__file_data = self.__open_file(file_path)
|
||
|
|
if self.json_type == self.JsonFormat.EXPORT:
|
||
|
|
self.__group_id = self.__detect_group()
|
||
|
|
self.__test_id = self.__detect_test()
|
||
|
|
elif self.json_type == self.JsonFormat.CDF:
|
||
|
|
self.__merged_group_id = self.__detect_merged_group()
|
||
|
|
else:
|
||
|
|
raise ValueError(f"Incorrect json file type {self.__type.name}.")
|
||
|
|
|
||
|
|
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
|
||
|
|
try:
|
||
|
|
param_list = get_param_list(params)
|
||
|
|
|
||
|
|
if self.json_type == self.JsonFormat.EXPORT:
|
||
|
|
json_data = self.__file_data.get('Data')
|
||
|
|
else:
|
||
|
|
json_data = self.__file_data
|
||
|
|
|
||
|
|
for param in param_list:
|
||
|
|
if param.config_id_name in json_data.keys():
|
||
|
|
value = self.__json_to_param_value(param, json_data.get(param.config_id_name))
|
||
|
|
if param.config_id_name == "TSI_REF1_ELEMENT_FORMAT":
|
||
|
|
param.direct_set_def_value(value)
|
||
|
|
else:
|
||
|
|
param.default_value = value
|
||
|
|
else:
|
||
|
|
if (self.__merged_group_id == MergedTestGroups.DPRX_128b132b_LL_CTS
|
||
|
|
or self.__group_id == TestGroupId.DP_2_1_RX_DSC_CTS):
|
||
|
|
if param.config_id_name in self.__DSC_VMT_CI_LIST:
|
||
|
|
param.default_value = self.__json_to_param_value(param, json_data.get(param.config_id_name.replace("DSC_", "")))
|
||
|
|
|
||
|
|
logging.warn(f"Required CDF key {param.config_id_name} is missing in provided CDF file. It will be skipped.")
|
||
|
|
|
||
|
|
return params
|
||
|
|
except BaseException as e:
|
||
|
|
raise ValueError(f"Group {str(params)} cannot be parsed. Error {e}")
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def __json_to_param_value(param, json_value):
|
||
|
|
if param.value_type in [ DataType.ArrayU8, DataType.ArrayU16, DataType.ArrayU32 ]:
|
||
|
|
str_vals = json_value.split(" ")[ :-1 ]
|
||
|
|
return [ int(i) for i in str_vals ]
|
||
|
|
elif param.value_type in [ DataType.Int, DataType.Uint32, DataType.BitList ]:
|
||
|
|
return int(json_value)
|
||
|
|
elif param.value_type == DataType.Bool:
|
||
|
|
return bool(json_value)
|
||
|
|
elif param.value_type == DataType.Double:
|
||
|
|
return float(json_value)
|
||
|
|
elif param.value_type == DataType.ArrayFromFilePath:
|
||
|
|
return str(json_value)
|
||
|
|
else:
|
||
|
|
return json_value
|
||
|
|
|
||
|
|
|
||
|
|
class Parser:
|
||
|
|
class ConfigFileFormat(IntEnum):
|
||
|
|
UNKNOWN = -1
|
||
|
|
TD = 0
|
||
|
|
JSON = 1
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.__extension = ""
|
||
|
|
self.__parser_td = ParserTd()
|
||
|
|
self.__parser_json = ParserJson()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def __detect_file_format(file_path: str) -> ConfigFileFormat:
|
||
|
|
filename, file_extension = os.path.splitext(file_path)
|
||
|
|
if file_extension.lower() == '.json':
|
||
|
|
return Parser.ConfigFileFormat.JSON
|
||
|
|
elif file_extension.lower() == '.td':
|
||
|
|
return Parser.ConfigFileFormat.TD
|
||
|
|
else:
|
||
|
|
return Parser.ConfigFileFormat.UNKNOWN
|
||
|
|
|
||
|
|
def set_file_name(self, file_path: str):
|
||
|
|
self.__extension = self.__detect_file_format(file_path)
|
||
|
|
if self.__extension == Parser.ConfigFileFormat.JSON:
|
||
|
|
self.__parser_json.set_file_name(file_path)
|
||
|
|
elif self.__extension == Parser.ConfigFileFormat.TD:
|
||
|
|
self.__parser_td.set_file_name(file_path)
|
||
|
|
else:
|
||
|
|
raise ValueError("Incorrect file format. Must be: TD or JSON.")
|
||
|
|
|
||
|
|
def detect_group(self) -> TestGroupId:
|
||
|
|
if self.__extension == Parser.ConfigFileFormat.JSON:
|
||
|
|
if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT:
|
||
|
|
return self.__parser_json.group_id
|
||
|
|
else:
|
||
|
|
raise ValueError("Cannot get Test Group ID from JSON (CDF type) file")
|
||
|
|
elif self.__extension == Parser.ConfigFileFormat.TD:
|
||
|
|
return self.__parser_td.group_id
|
||
|
|
else:
|
||
|
|
raise ValueError("Incorrect file format. Must be: TD or JSON.")
|
||
|
|
|
||
|
|
def detect_merged_group(self) -> MergedTestGroups:
|
||
|
|
if self.__extension == Parser.ConfigFileFormat.JSON:
|
||
|
|
if self.__parser_json.json_type == ParserJson.JsonFormat.CDF:
|
||
|
|
return self.__parser_json.merged_group_id
|
||
|
|
else:
|
||
|
|
return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id)
|
||
|
|
elif self.__extension == Parser.ConfigFileFormat.TD:
|
||
|
|
return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id)
|
||
|
|
else:
|
||
|
|
raise ValueError("Incorrect file format. Must be: TD or JSON.")
|
||
|
|
|
||
|
|
def detect_test(self) -> int:
|
||
|
|
if self.__extension == Parser.ConfigFileFormat.JSON:
|
||
|
|
if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT:
|
||
|
|
return self.__parser_json.test_id
|
||
|
|
else:
|
||
|
|
raise ValueError("Cannot get Test ID from JSON (CDF type) file")
|
||
|
|
elif self.__extension == Parser.ConfigFileFormat.TD:
|
||
|
|
return self.__parser_td.test_id
|
||
|
|
else:
|
||
|
|
raise ValueError("Incorrect file format. Must be: TD or JSON.")
|
||
|
|
|
||
|
|
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
|
||
|
|
if self.__extension == Parser.ConfigFileFormat.JSON:
|
||
|
|
return self.__parser_json.parse(params)
|
||
|
|
elif self.__extension == Parser.ConfigFileFormat.TD:
|
||
|
|
return self.__parser_td.parse(params)
|
||
|
|
else:
|
||
|
|
raise ValueError("Incorrect file format. Must be: TD or JSON.")
|