Files
xinzhu.yin c157e774e5 1.1.0版本
2026-04-16 16:51:05 +08:00

316 lines
13 KiB
Python

import os
import json
import re
import warnings
from UniTAP.utils import tsi_logging as logging
from ..private_info import MergedTestGroups, test_group_to_merged_group
from ..test_group_params_types import DataType
from ..test_info import TestGroupId
from ..dut_default_params import *
from ..dut_default_params.crc_video_tests import CrcVideoTestBpp, BrokenFrameExportFormat
from enum import IntEnum
class ParserFileBase:
def __init__(self):
self.__file_data = ""
self.__group_id = TestGroupId.UNKNOWN
self.__test_id = 0
class ParserTd(ParserFileBase):
def __init__(self):
super().__init__()
@staticmethod
def __open_file(file_path: str) -> str:
with open(file_path, "r") as file:
data = file.read()
return data
@property
def group_id(self) -> TestGroupId:
return self.__group_id
@property
def test_id(self) -> int:
return self.__test_id
def __detect_group(self) -> TestGroupId:
group_str = self.__file_data.split("\n")[0]
group_int = int(group_str[:4], 16)
if not TestGroupId.has_value(group_int):
raise ValueError(f"Incorrect test group with ID {group_int}.")
return TestGroupId(group_int)
def __detect_test(self) -> int:
group_str = self.__file_data.split("\n")[0]
return int(group_str[4:], 16)
def set_file_name(self, file_path: str):
self.__file_data = self.__open_file(file_path)
self.__group_id = self.__detect_group()
self.__test_id = self.__detect_test()
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
if isinstance(params, CrcVideoTestParam):
data = self.__file_data.split("\n")
if data[1][len(data[1]) - 1] == " ":
data[1] = data[1][: -2]
params_count, timeout, frames_to_test, ref_frame_count, frame_mismatches, width, height, bpp, frame_rate, \
rate_tolerance, crc_r, crc_g, crc_b, motion_iteration, *_ = data[1].split(" ")
crc_values = []
if len(data) > 2:
crc_values_str = data[2].split(" ")
crc_count = int(crc_values_str[0])
if crc_count > 0:
for i in range(0, crc_count, 2):
crc_values.append(int(crc_values_str[i + 1]) & 0xFFFF)
crc_values.append((int(crc_values_str[i + 1]) >> 16) & 0xFFFF)
crc_values.append(int(crc_values_str[i + 2]) & 0xFFFF)
else:
crc_values.extend([int(crc_r), int(crc_g), int(crc_b)])
else:
crc_values.extend([int(crc_r), int(crc_g), int(crc_b)])
params.timeout = int(timeout)
params.number_frames_to_test = int(frames_to_test)
params.number_reference_frames = int(ref_frame_count)
params.number_frames_mismatch = int(frame_mismatches)
params.reference_width = int(width)
params.reference_height = int(height)
params.reference_color_depth = CrcVideoTestBpp(int(bpp))
params.required_frame_rate = int(frame_rate)
params.frame_rate_tolerance = int(rate_tolerance)
params.motion_test_iteration = int(motion_iteration)
params.reference_crc_values = crc_values
return params
elif isinstance(params, HdmiSinkContinuityDUTTestParam):
data = self.__file_data.split("\n")
if data[1][len(data[1]) - 1] == " ":
data[1] = data[1][: -2]
params_count, test_time, status_period, stop_flag, enable_scdc_version, enable_scdc_status, \
enable_scdc_error_count, scdc_error_count, *_ = data[1].split(" ")
params.test_time = int(test_time)
params.status_period = int(status_period)
params.stop_flag = bool(stop_flag)
params.enable_scdc_version = bool(enable_scdc_version)
params.enable_scdc_status = bool(enable_scdc_status)
params.enable_scdc_error_count = bool(enable_scdc_error_count)
params.scdc_error_count = int(scdc_error_count)
return params
else:
raise NotImplementedError(f"Group {params.__name__} is not currently supported.")
class ParserJson(ParserFileBase):
class JsonFormat(IntEnum):
UNKNOWN = -1
EXPORT = 0
CDF = 1
__DSC_VMT_CI_LIST = [ "TSI_DP20_SRCCTS_DSC_VMT_1920_1080_30",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_60",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_120",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_144",
"TSI_DP20_SRCCTS_DSC_VMT_1920_1080_240",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_30",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_60",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_120",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_144",
"TSI_DP20_SRCCTS_DSC_VMT_3840_2160_240",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_30",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_60",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_120",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_144",
"TSI_DP20_SRCCTS_DSC_VMT_5120_2160_240",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_30",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_60",
"TSI_DP20_SRCCTS_DSC_VMT_7680_4320_120",
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_30",
"TSI_DP20_SRCCTS_DSC_VMT_10240_4320_60" ]
def __init__(self):
super().__init__()
self.__type = self.JsonFormat.UNKNOWN
self.__merged_group_id = MergedTestGroups.Unknown
self.__test_id = -1
self.__group_id = TestGroupId.UNKNOWN
def __open_file(self, file_path: str) -> dict:
with open(file_path, encoding='UTF-8') as cfg_json:
data = json.load(cfg_json)
if data.get('TestId') is not None:
self.__type = self.JsonFormat.EXPORT
elif data.get('Header_Type') is not None:
self.__type = self.JsonFormat.CDF
return data
@property
def group_id(self) -> TestGroupId:
return self.__group_id
@property
def merged_group_id(self) -> MergedTestGroups:
return self.__merged_group_id
@property
def test_id(self) -> int:
return self.__test_id
@property
def json_type(self) -> JsonFormat:
return self.__type
def __detect_group(self) -> TestGroupId:
group_int = int(self.__file_data.get('TestId')[:4], 16)
if group_int == 0x0 and self.__file_data.get('Data').get("TSI_REF1_FRAME_DATA") is not None:
group_int = 0x3E8
if not TestGroupId.has_value(group_int):
raise ValueError(f"Incorrect test group with ID {group_int}.")
return TestGroupId(group_int)
def __detect_merged_group(self) -> MergedTestGroups:
return MergedTestGroups(int(re.findall(r"0x\d+\w+_", self.__file_data.get("Header_Type"))[0][:-1], 16))
def __detect_test(self) -> int:
return int(self.__file_data.get('TestId')[4:], 16)
def set_file_name(self, file_path: str):
self.__file_data = self.__open_file(file_path)
if self.json_type == self.JsonFormat.EXPORT:
self.__group_id = self.__detect_group()
self.__test_id = self.__detect_test()
elif self.json_type == self.JsonFormat.CDF:
self.__merged_group_id = self.__detect_merged_group()
else:
raise ValueError(f"Incorrect json file type {self.__type.name}.")
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
try:
param_list = get_param_list(params)
if self.json_type == self.JsonFormat.EXPORT:
json_data = self.__file_data.get('Data')
else:
json_data = self.__file_data
for param in param_list:
if param.config_id_name in json_data.keys():
value = self.__json_to_param_value(param, json_data.get(param.config_id_name))
if param.config_id_name == "TSI_REF1_ELEMENT_FORMAT":
param.direct_set_def_value(value)
else:
param.default_value = value
else:
if (self.__merged_group_id == MergedTestGroups.DPRX_128b132b_LL_CTS
or self.__group_id == TestGroupId.DP_2_1_RX_DSC_CTS):
if param.config_id_name in self.__DSC_VMT_CI_LIST:
param.default_value = self.__json_to_param_value(param, json_data.get(param.config_id_name.replace("DSC_", "")))
logging.warn(f"Required CDF key {param.config_id_name} is missing in provided CDF file. It will be skipped.")
return params
except BaseException as e:
raise ValueError(f"Group {str(params)} cannot be parsed. Error {e}")
@staticmethod
def __json_to_param_value(param, json_value):
if param.value_type in [ DataType.ArrayU8, DataType.ArrayU16, DataType.ArrayU32 ]:
str_vals = json_value.split(" ")[ :-1 ]
return [ int(i) for i in str_vals ]
elif param.value_type in [ DataType.Int, DataType.Uint32, DataType.BitList ]:
return int(json_value)
elif param.value_type == DataType.Bool:
return bool(json_value)
elif param.value_type == DataType.Double:
return float(json_value)
elif param.value_type == DataType.ArrayFromFilePath:
return str(json_value)
else:
return json_value
class Parser:
class ConfigFileFormat(IntEnum):
UNKNOWN = -1
TD = 0
JSON = 1
def __init__(self):
self.__extension = ""
self.__parser_td = ParserTd()
self.__parser_json = ParserJson()
@staticmethod
def __detect_file_format(file_path: str) -> ConfigFileFormat:
filename, file_extension = os.path.splitext(file_path)
if file_extension.lower() == '.json':
return Parser.ConfigFileFormat.JSON
elif file_extension.lower() == '.td':
return Parser.ConfigFileFormat.TD
else:
return Parser.ConfigFileFormat.UNKNOWN
def set_file_name(self, file_path: str):
self.__extension = self.__detect_file_format(file_path)
if self.__extension == Parser.ConfigFileFormat.JSON:
self.__parser_json.set_file_name(file_path)
elif self.__extension == Parser.ConfigFileFormat.TD:
self.__parser_td.set_file_name(file_path)
else:
raise ValueError("Incorrect file format. Must be: TD or JSON.")
def detect_group(self) -> TestGroupId:
if self.__extension == Parser.ConfigFileFormat.JSON:
if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT:
return self.__parser_json.group_id
else:
raise ValueError("Cannot get Test Group ID from JSON (CDF type) file")
elif self.__extension == Parser.ConfigFileFormat.TD:
return self.__parser_td.group_id
else:
raise ValueError("Incorrect file format. Must be: TD or JSON.")
def detect_merged_group(self) -> MergedTestGroups:
if self.__extension == Parser.ConfigFileFormat.JSON:
if self.__parser_json.json_type == ParserJson.JsonFormat.CDF:
return self.__parser_json.merged_group_id
else:
return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id)
elif self.__extension == Parser.ConfigFileFormat.TD:
return test_group_to_merged_group(self.__parser_json.group_id, self.__parser_json.test_id)
else:
raise ValueError("Incorrect file format. Must be: TD or JSON.")
def detect_test(self) -> int:
if self.__extension == Parser.ConfigFileFormat.JSON:
if self.__parser_json.json_type == ParserJson.JsonFormat.EXPORT:
return self.__parser_json.test_id
else:
raise ValueError("Cannot get Test ID from JSON (CDF type) file")
elif self.__extension == Parser.ConfigFileFormat.TD:
return self.__parser_td.test_id
else:
raise ValueError("Incorrect file format. Must be: TD or JSON.")
def parse(self, params: DUTTestParameters) -> DUTTestParameters:
if self.__extension == Parser.ConfigFileFormat.JSON:
return self.__parser_json.parse(params)
elif self.__extension == Parser.ConfigFileFormat.TD:
return self.__parser_td.parse(params)
else:
raise ValueError("Incorrect file format. Must be: TD or JSON.")