import copy import re import threading import time import warnings import UniTAP from UniTAP.dev.modules.dut_tests.test_info import TestGroupId from UniTAP.libs.lib_uicl.uicl import UICL_GeneratePattern, UICL_GeneratePattern_3Tap from UniTAP.utils import tsi_logging as logging from UniTAP.dev.ports.modules.link.dp.link_tx_types import LinkConfig from UniTAP.common.audio_mode import AudioMode from UniTAP.dev.ports.modules.vtg import * from UniTAP.utils.uicl_api import * from UniTAP.dev.ports.modules.edid import MainBlockType, AdditionalBlockType from enum import IntEnum from UniTAP.utils.function_wrapper import function_scheduler from UniTAP.libs.lib_dscl.dscl import DSC_TOOLS_FOLDER, DSCL_ExtractPPSFromData, DSCL_Encode, DSCL_Decode from UniTAP.libs.lib_dscl.dscl_utils import calculate_slice_size, dscl_image_to_dsc_vf from UniTAP.libs.lib_pdl.pdl import generate_pattern_as_vf, PatternType from UniTAP.libs.lib_tsi.tsi_types import TSI_MEMORY_WRITE_W, TSI_MEMORY_BLOCK_INDEX, TSI_MEMORY_LAYOUT, \ TSI_DP20_SINKCTS_SUPPORT_444CRC, TSI_DP14_SINKCTS_SUPPORT_444CRC from UniTAP.libs.lib_tsi.tsi_private_types import TSI_DSC_MEMORY_BLOCK, TSI_DSC_DATA_SIZE, \ TSI_DSC_TX_CRC, TSI_DPRX_DSC_TEST_CRC, TSI_SELECT_SUITE from UniTAP.libs.lib_tsi.tsi import TSIX_TS_SetConfigItem, TSIX_TS_GetConfigItem from ctypes import c_uint64, c_uint32 from UniTAP.libs.lib_tsi import TSI_OPF_RETURN_CODE_ABORT, TSI_OPF_RETURN_CODE_PASS, \ TSI_OPF_RETURN_CODE_PROCEED, TSI_OPF_RETURN_CODE_FAIL, TSI_OPF_RETURN_CODE_AUTO_CLOSED class OPFDialogAnswer(IntEnum): ABORT = TSI_OPF_RETURN_CODE_ABORT PASS = TSI_OPF_RETURN_CODE_PASS PROCEED = TSI_OPF_RETURN_CODE_PROCEED FAIL = TSI_OPF_RETURN_CODE_FAIL NOTHING = TSI_OPF_RETURN_CODE_AUTO_CLOSED class OPFFunctions: dict_color_formats = {"RGB": ColorInfo.ColorFormat.CF_RGB, "YCbCr 4:2:2": ColorInfo.ColorFormat.CF_YCbCr_422, "YCbCr 4:4:4": ColorInfo.ColorFormat.CF_YCbCr_444, "YCbCr 4:2:0": ColorInfo.ColorFormat.CF_YCbCr_420, "Simple 4:2:2": ColorInfo.ColorFormat.CF_YCbCr_422, "YCbCr422": ColorInfo.ColorFormat.CF_YCbCr_422, "YCbCr444": ColorInfo.ColorFormat.CF_YCbCr_444, "YCbCr420": ColorInfo.ColorFormat.CF_YCbCr_420, "Simple422": ColorInfo.ColorFormat.CF_YCbCr_422 } dict_dsc_color_formats = {"RGB": CompressionInfo.DscColorFormat.CF_RGB, "YCbCr 4:2:2": CompressionInfo.DscColorFormat.CF_YCbCr_422, "YCbCr 4:4:4": CompressionInfo.DscColorFormat.CF_YCbCr_444, "YCbCr 4:2:0": CompressionInfo.DscColorFormat.CF_YCbCr_420, "Simple 4:2:2": CompressionInfo.DscColorFormat.CF_Simple_422, "YCbCr422": CompressionInfo.DscColorFormat.CF_YCbCr_422, "YCbCr444": CompressionInfo.DscColorFormat.CF_YCbCr_444, "YCbCr420": CompressionInfo.DscColorFormat.CF_YCbCr_420, "Simple422": CompressionInfo.DscColorFormat.CF_Simple_422 } dict_colorimetry = {None: ColorInfo.Colorimetry.CM_sRGB, "ITU-601": ColorInfo.Colorimetry.CM_ITUR_BT601, "ITU-709": ColorInfo.Colorimetry.CM_ITUR_BT709} dict_reduce_blanking = {None: Timing.ReduceBlanking.RB_NONE, 'RB1': Timing.ReduceBlanking.RB1, 'RB2': Timing.ReduceBlanking.RB2, 'RB3': Timing.ReduceBlanking.RB3} dict_patterns = { "Color Ramp": VideoPattern.ColorRamp, "Color Square": VideoPattern.ColorSquares } dict_dsc_color_names = { CompressionInfo.DscColorFormat.CF_RGB: "RGB444", CompressionInfo.DscColorFormat.CF_YCbCr_444: "YUV444", CompressionInfo.DscColorFormat.CF_YCbCr_422: "YUV422", CompressionInfo.DscColorFormat.CF_YCbCr_420: "YUV420", CompressionInfo.DscColorFormat.CF_Simple_422: "SIMPL422" } path_custom_image = os.path.join(DSC_TOOLS_FOLDER, "Default_16K.png") dsc_content_library_path = "" g_vm = None g_vf = None g_h_slice_size = 0 g_v_slice_size = 0 @staticmethod def opf_pass_handler(dptx, dprx, message: str) -> OPFDialogAnswer: logging.info(message) return OPFDialogAnswer.PASS @staticmethod def check_dptx(dptx) -> bool: return isinstance(dptx, (UniTAP.DPTX, UniTAP.DPTX4xx, UniTAP.DPTX5xx)) @staticmethod def check_dprx(dprx) -> bool: return isinstance(dprx, (UniTAP.DPRX, UniTAP.DPRX4xx, UniTAP.DPRX5xx)) @staticmethod def check_hdtx(hdtx) -> bool: return isinstance(hdtx, (UniTAP.HDTX, UniTAP.HDTX4xx)) @staticmethod def check_hdrx(hdrx) -> bool: return isinstance(hdrx, (UniTAP.HDRX, UniTAP.DPRX4xx)) @staticmethod def check_video(dev_rx) -> bool: def is_video_available(_dev_rx): return _dev_rx.link.status.stream(0).crc != [0, 0, 0] and \ _dev_rx.link.status.stream(0).video_mode.timing.hactive != 0 return function_scheduler(is_video_available, dev_rx, interval=2, timeout=10) @staticmethod def opf_1_after_handler(dptx, dprx, dp_lanes, dp_link_rate, encoding): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((dp_lanes, dp_link_rate, encoding)) if encoding is None: config = LinkConfig.DP8b10b() else: if encoding == '8b/10b': config = LinkConfig.DP8b10b() else: config = LinkConfig.DP128b132b() config.lane_count = dp_lanes config.bit_rate = dp_link_rate dptx.link.config.set(config) dptx.link.start_link_training() @staticmethod def opf_1_handler(dptx, dprx, dp_lanes, dp_link_rate, encoding): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_2_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, tim_std, tim_rb): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, tim_std)) timing_manager = dptx.pg.timing_manager if tim_std.find("VIC") != -1: tim_id = int(re.findall(r'\d+', tim_std)[0]) timing = timing_manager.get_cta(tim_id) elif tim_std.find("DMT") != -1: tim_id = int(tim_std.replace("h", "").replace("DMT ", ""), 16) timing = timing_manager.get_dmt(tim_id) elif tim_std.find("CVT") != -1: tim_id = int(tim_std.replace("h", "").replace("CVT ", ""), 16) timing = timing_manager.get_cvt(tim_id) else: timing = timing_manager.search(res_x, res_y, res_frate, OPFFunctions.dict_reduce_blanking.get(tim_rb)) if timing is None: timing = timing_manager.search(res_x, res_y, res_frate, OPFFunctions.dict_reduce_blanking.get(tim_rb)) color_mode = ColorInfo() color_mode.color_format = ColorInfo.ColorFormat.CF_RGB color_mode.bpc = res_bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB video_mode = VideoMode(timing=timing, color_info=color_mode) dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=VideoPattern.ColorBars) res_pg = dptx.pg.apply() res_app = dptx.pg.status().error if not res_pg: logging.info(f"[UniTAP] Stream {0} - Apply {res_app.__str__()}") return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_3_handler(dptx, dprx, message: str): logging.info(message) return OPFDialogAnswer.PROCEED @staticmethod def opf_4_handler(dptx, dprx, message: str): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) val = dptx.dpcd.read(0x00101, 1).data[0] & 0xFF val &= ~ 0x7 val |= 1 dptx.dpcd.write(0x00101, val) return OPFDialogAnswer.PROCEED @staticmethod def opf_5_handler(dptx, dprx, message: str): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT val = dptx.dpcd.read(0x00101, 1).data[0] & 0xFF val &= ~ 0x7 val |= 4 dptx.dpcd.write(0x00101, val) logging.info(message) return OPFDialogAnswer.PROCEED @staticmethod def opf_6_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, timing_standard, col_format, col_range, col_yc, pattern_name) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, res_bpc, timing_standard, col_format, col_range, col_yc, pattern_name)) if dptx.link.config.get(UniTAP.LinkConfig.DP8b10b).force_edid_timings_after_lt: timing = dptx.pg.get_stream_video_mode().timing else: timing = dptx.pg.timing_manager.search(res_x, res_y, res_frate, rb=OPFFunctions.dict_reduce_blanking.get(timing_standard)) color_mode = ColorInfo() color_mode.color_format = OPFFunctions.dict_color_formats.get(col_format) color_mode.bpc = res_bpc color_mode.colorimetry = OPFFunctions.dict_colorimetry.get(col_yc) color_mode.dynamic_range = ColorInfo.DynamicRange.DR_VESA if col_range == 'VESA' else ColorInfo.DynamicRange.DR_CTA video_mode = VideoMode(timing=timing, color_info=color_mode) dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=OPFFunctions.dict_patterns.get(pattern_name)) res_pg = dptx.pg.apply() if not res_pg: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_7_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) dptx.dpcd.write(0x600, 2) dptx.link.start_link_training() return OPFDialogAnswer.PROCEED @staticmethod def opf_8_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) dptx.dpcd.write(0x600, 1) dptx.link.start_link_training() return OPFDialogAnswer.PROCEED @staticmethod def opf_9_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, tim_std, pattern_name) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, res_bpc, tim_std)) if not dptx.link.config.get(UniTAP.LinkConfig.DP8b10b).force_edid_timings_after_lt: timing_manager = dptx.pg.timing_manager rb = OPFFunctions.dict_reduce_blanking.get(tim_std) timing = timing_manager.search(res_x, res_y, res_frate, rb=rb) else: timing = dptx.pg.get_stream_video_mode().timing color_mode = ColorInfo() color_mode.color_format = ColorInfo.ColorFormat.CF_RGB color_mode.bpc = res_bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB video_mode = VideoMode(timing=timing, color_info=color_mode) dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=OPFFunctions.dict_patterns.get(pattern_name)) res_pg = dptx.pg.apply() if not res_pg: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_10_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, timing_standard, audio_pattern, ch_count, sample_freq, sample_size, ch_alloc, ch_info) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((audio_pattern, ch_count, sample_freq, sample_size)) audio_mode = AudioMode() audio_mode.channel_count = ch_count audio_mode.bits = sample_size audio_mode.sample_rate = sample_freq dptx.ag.setup(audio_mode=audio_mode, audio_pattern=UniTAP.AudioPattern.SignalSine, signal_frequency=2000, amplitude=60) dptx.ag.apply() if res_x != 0: timing_manager = dptx.pg.timing_manager rb = OPFFunctions.dict_reduce_blanking.get(timing_standard) for standard in [UniTAP.Timing.Standard.SD_CTA, UniTAP.Timing.Standard.SD_DMT, UniTAP.Timing.Standard.SD_CVT]: timing = timing_manager.search(res_x, res_y, res_frate, standard=standard, rb=rb) if timing is not None: break color_mode = ColorInfo() color_mode.color_format = ColorInfo.ColorFormat.CF_RGB color_mode.bpc = res_bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB video_mode = VideoMode(timing=timing, color_info=color_mode) dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=VideoPattern.ColorRamp) res_pg = dptx.pg.apply() if dptx.pg.status().error != PGStatus.PGError.OK and not res_pg: return OPFDialogAnswer.ABORT else: for i in range(dptx.pg.max_stream_count): dptx.pg[i].set_pattern(UniTAP.VideoPattern.Disabled) res_pg = dptx.pg.apply() if dptx.pg.status().error != PGStatus.PGError.OK and not res_pg: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_11_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_12_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.ABORT logging.info(message) def capture_frame(_dprx): try: dprx.video_capturer.start() _vf = dprx.video_capturer.pop_element() dprx.video_capturer.stop() return _vf except BaseException: return None iterations = 5 for i in range(iterations): vf = capture_frame(dprx) vf = video_frame_to_ci(vf, vf.color_info, OPFFunctions.g_vf.data_info) if vf is not None and len(vf.data) > 0: if OPFFunctions.g_vf is not None: if vf.color_info.color_format == UniTAP.ColorInfo.ColorFormat.CF_DSC: if OPFFunctions.g_vf.data[4:] in vf.data: return OPFDialogAnswer.PASS else: if OPFFunctions.g_vf.data in vf.data: return OPFDialogAnswer.PASS return OPFDialogAnswer.FAIL @staticmethod def opf_13_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_14_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) data = dptx.dpcd.read(0x120, 1).data data.append(1) dptx.dpcd.write(0x120, data) if OPFFunctions.check_dprx(dprx): data = dptx.dpcd.read(0x120, 1).data if len(data) > 0 and data[0] == 1: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_15_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) dptx.fec.enable(True) def is_fec_enabled(_dptx): return _dptx.fec.is_enabled() if not function_scheduler(is_fec_enabled, dptx, interval=2, timeout=10): return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_16_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) dptx.fec.enable(False) def is_fec_disable(_dptx): return not _dptx.fec.is_enabled() if not function_scheduler(is_fec_disable, dptx, interval=2, timeout=10): return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_17_handler(dptx, dprx, message: str) -> OPFDialogAnswer: logging.info(message) res_x = 1920 res_y = 1080 res_frate = 60000 res_bpc = 8 col_yc = None col_format = "RGB" timing_manager = dptx.pg.timing_manager standard = Timing.Standard.SD_CTA timing = timing_manager.search(res_x, res_y, res_frate, standard=standard) return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing, pattern_name=None, enable_dsc=True) @staticmethod def opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing, pattern_name=None, enable_dsc=True): color_mode = ColorInfo() color_mode.color_format = OPFFunctions.dict_color_formats.get(col_format) color_mode.bpc = res_bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_yc is None else \ ColorInfo.Colorimetry.CM_ITUR_BT601 color_mode.dynamic_range = ColorInfo.DynamicRange.DR_VESA if col_yc is None else ColorInfo.DynamicRange.DR_CTA data_info = DataInfo() data_info.packing = DataInfo.Packing.P_PACKED if col_yc is None else DataInfo.Packing.P_PLANAR data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_yc is None else DataInfo.ComponentOrder.CO_YCbCr data_info.alignment = DataInfo.Alignment.A_LSB video_mode = VideoMode(timing=timing, color_info=color_mode) if enable_dsc: sink_dsc_caps = dptx.dpcd.read(0x60, 16) block_prediction = bool(sink_dsc_caps.data[6] & 0x1) dsc_minor = sink_dsc_caps.data[1] >> 4 & 0xf dsc_major = sink_dsc_caps.data[1] & 0xf h_slice_size = int(calculate_slice_size(res_x, 10, color_mode.color_format)) v_slice_size = int(calculate_slice_size(res_y, 10, color_mode.color_format)) dptx.pg.set_vm(video_mode) pixel_rate = timing.htotal * timing.vtotal * (timing.frame_rate / 1000) link_bandwidth = dptx.link.status.available_link_rate bpp = (link_bandwidth * 0.9 / pixel_rate) * 16 bpp = res_bpc * 16 if res_bpc * 16 < bpp else round(bpp) compression_info = CompressionInfo() compression_info.v_slice_size = v_slice_size compression_info.h_slice_size = h_slice_size compression_info.bpp = bpp compression_info.color_format = OPFFunctions.dict_dsc_color_formats.get(col_format) compression_info.buffer_bit_depth = res_bpc + 1 compression_info.is_block_prediction_enabled = block_prediction compression_info.version = (dsc_major, dsc_minor) if OPFFunctions.dsc_content_library_path != "": if not os.path.exists(OPFFunctions.dsc_content_library_path): os.makedirs(OPFFunctions.dsc_content_library_path) dsc_image_name = f"{res_x}x{res_y}_{OPFFunctions.dict_dsc_color_names.get(compression_info.color_format)}_" \ f"{'BPY' if block_prediction else 'NBP'}_bpc{res_bpc}_" \ f"bpp{compression_info.bpp}_{10}slicew_{10}sliceh_" \ f"{compression_info.buffer_bit_depth}lb_v{dsc_major}{dsc_minor}.dsc" full_path = os.path.join(OPFFunctions.dsc_content_library_path, dsc_image_name) if os.path.exists(full_path): # # Temporary message "Update DSC folder" # warnings.warn("Since version 3.5.X/3.6.X the usual file for generating DSC images has been updated." "Please update/regenerate your DSC folder/library with using new file.") with open(full_path, 'rb') as file: data = bytearray(file.read()) encoded_video_frame = dsc_video_frame_from_data(data) else: custom_vf = generate_pattern_as_vf(PatternType.Unigraf, res_x, res_y, color_mode, data_info) encoded_video_frame = encode_video_frame(custom_vf, compression_info) with open(full_path, 'wb') as file: file.write(encoded_video_frame.data) else: custom_vf = generate_pattern_as_vf(PatternType.Unigraf, res_x, res_y, color_mode, data_info) encoded_video_frame = encode_video_frame(custom_vf, compression_info) dptx.pg.set_pattern(pattern=encoded_video_frame) res = dptx.pg.apply() if not res and dptx.pg.status().error != PGStatus.PGError.OK: return OPFDialogAnswer.ABORT OPFFunctions.g_vf = encoded_video_frame elif pattern_name is not None: dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=pattern_name) res = dptx.pg.apply() if not res and dptx.pg.status().error != PGStatus.PGError.OK: return OPFDialogAnswer.ABORT try: if not OPFFunctions.check_video(dprx): print('Video is not available.') return OPFDialogAnswer.ABORT except BaseException as e: print(e) return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_18_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std) \ -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std)) timing_manager = dptx.pg.timing_manager standard = Timing.Standard.SD_CTA if OPFFunctions.dict_reduce_blanking.get( tim_std) is None else Timing.Standard.SD_CVT timing = timing_manager.search(res_x, res_y, res_frate, standard=standard, rb=OPFFunctions.dict_reduce_blanking.get(tim_std)) return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing) @staticmethod def opf_19_handler(dptx, dprx, use_3tap) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.ABORT # check DSC state if not dprx.link.capabilities.link_caps_status().dsc: return OPFDialogAnswer.ABORT incorrect_crc = [0xFFFF, 0xFFFF, 0xFFFF, 0x1111, 0x1111, 0x1111] def load_dsc(handle: int, values: list) -> OPFDialogAnswer: TSIX_TS_SetConfigItem(handle, TSI_DPRX_DSC_TEST_CRC, values, data_count=6) return OPFDialogAnswer.FAIL # TODO: Need to change on public interface. Temporary solution. device_handle = dprx.bulk_capturer.__getattribute__("_BulkCapturer__memory_manager"). \ __getattribute__('_MemoryManager__io'). \ __getattribute__('device_handle') iterations = 5 for i in range(iterations): # capture frame try: dprx.video_capturer.start(frames_count=1) vf = dprx.video_capturer.capture_result.buffer[-1] dprx.video_capturer.stop() except BaseException: dprx.video_capturer.stop() return load_dsc(device_handle, incorrect_crc) def create_reference_image(video_frame, crc_values: list): try: pps_info = DSCL_ExtractPPSFromData(video_frame.data) except BaseException: return sampling = UICL_Sampling.Sampling_444 if pps_info.is_yuv(): if pps_info.is_420(): sampling = UICL_Sampling.Sampling_420 elif pps_info.is_422(): sampling = UICL_Sampling.Sampling_422 elif pps_info.is_simple_422(): sampling = UICL_Sampling.Sampling_422 image = UICL_Image() parameters = UICL_ImageParameters() parameters.Width = pps_info.width() parameters.Height = pps_info.height() parameters.BitsPerColor = pps_info.bpc() parameters.Alignment = UICL_Alignment.Alignment_LSB parameters.IsFullRange = False if pps_info.is_yuv(): parameters.Colorspace = UICL_Colorspace.Colorspace_YCbCr parameters.Colorimetry = UICL_Colorimetry.Colorimetry_ITU_R_BT709 parameters.Sampling = sampling parameters.ComponentOrder = UICL_ComponentOrder.Order_YCbCr parameters.Packing = UICL_Packing.Packing_Planar image.Parameters = parameters try: image.DataSize = UICL_GetRequiredBufferSize(image) image.DataPtr = (c_uint8 * image.DataSize)() if use_3tap: res = UICL_GeneratePattern_3Tap(image, 1) else: res = UICL_GeneratePattern(image, 1) if res < UICL_SUCCESS: return except BaseException: return else: parameters.Colorspace = UICL_Colorspace.Colorspace_RGB parameters.Colorimetry = UICL_Colorimetry.Colorimetry_Unknown parameters.Sampling = sampling parameters.ComponentOrder = UICL_ComponentOrder.Order_RGB parameters.Packing = UICL_Packing.Packing_Packed image.Parameters = parameters try: image.DataSize = UICL_GetRequiredBufferSize(image) image.DataPtr = (c_uint8 * image.DataSize)() if use_3tap: res = UICL_GeneratePattern_3Tap(image, 0) else: res = UICL_GeneratePattern(image, 0) if res < UICL_SUCCESS: return except BaseException: return dsc_encoded_image = DSCL_Encode(image, pps_info) OPFFunctions.g_vf = dscl_image_to_dsc_vf(dsc_encoded_image) dsc_decoded_uicl_image = DSCL_Decode(dsc_encoded_image) reference_image_crc = UICL_CRC16() try: res = UICL_CalculateCRC16(dsc_decoded_uicl_image, reference_image_crc) if res < UICL_SUCCESS: crc_values.extend([0xFFFF, 0xFFFF, 0xFFFF]) else: crc_values.extend([reference_image_crc.R, reference_image_crc.G, reference_image_crc.B]) except BaseException: return def decompress_dsc(video_frame, crc_values: list): decoded_captured_vf = decode_video_frame(video_frame) uicl_decoded_image_2 = image_from_vf(decoded_captured_vf) decompressed_image_crc = UICL_CRC16() try: res = UICL_CalculateCRC16(uicl_decoded_image_2, decompressed_image_crc) if res < UICL_SUCCESS: crc_values.extend([0x1111, 0x1111, 0x1111]) else: crc_values.extend([decompressed_image_crc.R, decompressed_image_crc.G, decompressed_image_crc.B]) except BaseException: return reference_image_crc_values = [] decompressed_image_crc_values = [] thread_create_reference_image = threading.Thread(target=create_reference_image, args=(vf, reference_image_crc_values)) thread_decompressed_image = threading.Thread(target=decompress_dsc, args=(vf, decompressed_image_crc_values)) thread_create_reference_image.start() thread_decompressed_image.start() thread_create_reference_image.join() thread_decompressed_image.join() if len(reference_image_crc_values) > 0 and len(decompressed_image_crc_values) > 0 and \ ((reference_image_crc_values == decompressed_image_crc_values) or (reference_image_crc_values != decompressed_image_crc_values and (i == iterations - 1))): # Write crc to register load_dsc(device_handle, [decompressed_image_crc_values[0], decompressed_image_crc_values[1], decompressed_image_crc_values[2], reference_image_crc_values[0], reference_image_crc_values[1], reference_image_crc_values[2]]) return OPFDialogAnswer.PROCEED return load_dsc(device_handle, incorrect_crc) @staticmethod def opf_20_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std, pattern_name, enable_dsc) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std, pattern_name)) if pattern_name is not None: if pattern_name == 'No Video': pattern_name = VideoPattern.Disabled elif pattern_name == 'Color Ramp': pattern_name = VideoPattern.ColorRamp elif pattern_name == 'Color Square': pattern_name = VideoPattern.ColorSquares elif pattern_name == 'Black and Vertical lines': pattern_name = VideoPattern.WhiteVStrips elif pattern_name == "Any": if col_format == "RGB": pattern_name = UniTAP.VideoPattern.ColorRamp else: pattern_name = UniTAP.VideoPattern.ColorSquares else: pattern_name = VideoPattern.ColorBars timing_manager = dptx.pg.timing_manager for item in tim_std: if item.find("VIC") != -1: tim_id = int(re.findall(r'\d+', item)[0]) timing = timing_manager.get_cta(tim_id) if timing is None: continue break elif item.find("DMT") != -1: res = re.findall(r'\w+', item)[1][:-1] tim_id = int(res, 16) timing = timing_manager.get_dmt(tim_id) if timing is None: continue break elif item.find("CVT") != -1: res = re.findall(r'RB1|RB2|RB3', item) tim_id = res[0] if len(res) > 0 else None standard = Timing.Standard.SD_CVT timing = timing_manager.search(res_x, res_y, res_frate, standard=standard, rb=OPFFunctions.dict_reduce_blanking.get(tim_id)) if timing is None: continue break elif item.find("UGF") != -1: pass elif item.find("OVT") != -1: standard = Timing.Standard.SD_OVT timing = timing_manager.search(res_x, res_y, res_frate, standard=standard) if timing is None: continue break else: timing = timing_manager.search(res_x, res_y, res_frate) return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing, pattern_name, enable_dsc) @staticmethod def opf_21_handler(dptx, dprx, audio_pattern, ch_count, sample_freq, sample_size, ch_alloc, ch_info): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((audio_pattern, ch_count, sample_freq, sample_size)) audio_mode = AudioMode() audio_mode.channel_count = ch_count audio_mode.bits = sample_size audio_mode.sample_rate = sample_freq audio_pattern_value = UniTAP.AudioPattern.SignalSawtooth if audio_pattern.find("Sawtooth") != -1 else ( UniTAP.AudioPattern.SignalSine) dptx.ag.setup(audio_mode=audio_mode, audio_pattern=audio_pattern_value, signal_frequency=2000, amplitude=60) res = dptx.ag.apply() if not res: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_101_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.FAIL try: dprx.audio_capturer.start(10) dprx.audio_capturer.stop() return UniTAP.OPFDialogAnswer.PASS if len( dprx.audio_capturer.capture_result.buffer) > 0 else UniTAP.OPFDialogAnswer.FAIL except BaseException as e: print(f"Cannot capture audio. Error: {e}") return UniTAP.OPFDialogAnswer.FAIL @staticmethod def opf_102_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.FAIL def is_crc_valid(rx, dsc_flag): return rx.link.status.stream(0).dsc_crc != [0, 0, 0] if dsc_flag else \ rx.link.status.stream(0).crc != [0, 0, 0] logging.info(message) if OPFFunctions.g_vm is not None: try: if UniTAP.utils.function_scheduler(is_crc_valid, dprx, True, interval=2, timeout=10): try: dprx.video_capturer.start(frames_count=5) vf_list = dprx.video_capturer.capture_result.buffer dprx.video_capturer.stop() except BaseException as e: logging.info(f'{e}\nCannot capture frames.') dprx.video_capturer.stop() return OPFDialogAnswer.FAIL if isinstance(vf_list, list) and len(vf_list) == 5: for vf in vf_list: delta = len(vf.data) - len(OPFFunctions.g_vm.data) + 4 if OPFFunctions.g_vm.data[4:] in vf.data[:-delta]: return OPFDialogAnswer.PASS logging.info("Incorrect sequence of frames.") return OPFDialogAnswer.FAIL else: logging.info("CRC is not valid.") return OPFDialogAnswer.FAIL except BaseException as e: logging.info(f"{e}\nCannot check CRC.") return OPFDialogAnswer.FAIL else: if UniTAP.utils.function_scheduler(is_crc_valid, dprx, False, interval=2, timeout=10): def capture_frame(_dprx): try: dprx.video_capturer.start() _vf = dprx.video_capturer.pop_element() dprx.video_capturer.stop() return _vf except BaseException: return None iterations = 5 for i in range(iterations): vf = capture_frame(dprx) if len(vf.data) > 0: return OPFDialogAnswer.PASS return OPFDialogAnswer.FAIL @staticmethod def opf_103_handler(dptx, dprx, width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp, h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_minor, dsc_v_major): # TODO: Need to resolve problem with writing TSI_VR_DSC_MEMORY_BLOCK and TSI_VR_DSC_DATA_SIZE if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp, h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_major, dsc_v_minor)) # TODO: Need to change on public interface. Temporary solution. device_handle = dptx.pg.__getattribute__('_DpMstPatternGenerator__pg_list')[0]. \ __getattribute__("_PatternGenerator__memory_manager"). \ __getattribute__('_MemoryManager__io'). \ __getattribute__('device_handle') color_mode = ColorInfo() color_mode.color_format = OPFFunctions.dict_color_formats.get(color_format) col_rgb = color_mode.color_format == ColorInfo.ColorFormat.CF_RGB color_mode.bpc = bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_rgb else ColorInfo.Colorimetry.CM_ITUR_BT709 h_slice_size = int(calculate_slice_size(width, h_slice_number, color_mode.color_format)) v_slice_size = int(calculate_slice_size(height, v_slice_number, color_mode.color_format)) OPFFunctions.g_h_slice_size = h_slice_size OPFFunctions.g_v_slice_size = v_slice_size res, suite_id = TSIX_TS_GetConfigItem(device_handle, TSI_SELECT_SUITE, c_uint32) if res > 0: suite_id = TestGroupId(suite_id) else: suite_id = TestGroupId.UNKNOWN res, is_simple_as_444 = 0, False try: if suite_id in [TestGroupId.DP_TX_LL_CTS, TestGroupId.DP_TX_LL_CTS_DSC, TestGroupId.DP_TX_DISPLAYID, TestGroupId.DP_TX_ADAPTIVESYNC]: res, is_simple_as_444 = TSIX_TS_GetConfigItem(device_handle, TSI_DP14_SINKCTS_SUPPORT_444CRC, c_uint32) elif suite_id in [TestGroupId.DP_2_1_TX_LL_CTS, TestGroupId.DP_2_1_TX_DSC_CTS, TestGroupId.DP_2_1_TX_DISPAYID, TestGroupId.DP_2_1_TX_ADAPTIVESYNC]: res, is_simple_as_444 = TSIX_TS_GetConfigItem(device_handle, TSI_DP20_SINKCTS_SUPPORT_444CRC, c_uint32) except BaseException as e: pass compression_info = OPFFunctions.get_compression_info(v_slice_size, h_slice_size, bpp, color_format, buffer_bit_depth, is_block_prediction_enabled, dsc_v_major, dsc_v_minor, is_simple_as_444) data_info = DataInfo() data_info.packing = DataInfo.Packing.P_PACKED if col_rgb else DataInfo.Packing.P_PLANAR data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_rgb else DataInfo.ComponentOrder.CO_YCbCr data_info.alignment = DataInfo.Alignment.A_LSB try: encoded_video_frame = OPFFunctions.check_dsc_content_library(width, height, color_mode, is_block_prediction_enabled, bpc, bpp, h_slice_number, v_slice_number, compression_info, dsc_v_major, dsc_v_minor, data_info) except BaseException as e: logging.info(e) return OPFDialogAnswer.ABORT OPFFunctions.g_vm = copy.deepcopy(encoded_video_frame) TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_LAYOUT, len(encoded_video_frame.data), c_uint64) TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_BLOCK_INDEX, 0, data_size=4) TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_WRITE_W, bytearray(encoded_video_frame.data), data_type=c_uint8, data_count=len(encoded_video_frame.data)) TSIX_TS_SetConfigItem(device_handle, TSI_DSC_MEMORY_BLOCK, 0) TSIX_TS_SetConfigItem(device_handle, TSI_DSC_DATA_SIZE, len(encoded_video_frame.data)) decoded_vf = decode_video_frame(encoded_video_frame) crc0, crc1, crc2 = calculate_crc(decoded_vf) TSIX_TS_SetConfigItem(device_handle, TSI_DSC_TX_CRC, [crc0, crc1, crc2], data_count=3) return OPFDialogAnswer.PROCEED @staticmethod def opf_104_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.FAIL logging.info(message) def capture_frame(_dprx): try: dprx.video_capturer.start() _vf = dprx.video_capturer.pop_element() dprx.video_capturer.stop() return _vf except BaseException: return None iterations = 5 for i in range(iterations): vf = capture_frame(dprx) if vf is not None and isinstance(vf, VideoFrameDSC): slice_size = OPFFunctions.g_v_slice_size * OPFFunctions.g_h_slice_size pps_size = 128 delta = len(vf.data) - len(OPFFunctions.g_vm.data) + 4 # Slice #1 if OPFFunctions.g_vm.data[4 + pps_size:slice_size] in vf.data[pps_size:slice_size]: # Slice #3 if OPFFunctions.g_vm.data[4 + pps_size + slice_size * 2:slice_size * 3] in \ vf.data[pps_size + slice_size * 2:slice_size * 3]: # Slice #4 if OPFFunctions.g_vm.data[4 + pps_size + slice_size * 3:slice_size * 4] in \ vf.data[pps_size + slice_size * 3:]: # Slice #2 if OPFFunctions.g_vm.data[4 + pps_size + slice_size:slice_size * 2] not in \ vf.data[pps_size + slice_size:slice_size * 2]: return OPFDialogAnswer.PASS else: print(f"Frame {i + 1}: Image looks good and distortion free for slice #2. It is wrong.") else: print( f"Frame {i + 1}: Image does not look good and distortion free for slice #4. It is wrong.") else: print(f"Frame {i + 1}: Image does not look good and distortion free for slice #3. It is wrong.") else: print(f"Frame {i + 1}: Image does not look good and distortion free for slice #1. It is wrong.") return OPFDialogAnswer.FAIL @staticmethod def opf_105_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_106_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_107_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_120_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, color_format) -> OPFDialogAnswer: if not OPFFunctions.check_dprx(dprx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, res_bpc, color_format)) stream_status = dprx.link.status.stream(0) if res_x == stream_status.video_mode.timing.hactive and res_y == stream_status.video_mode.timing.vactive and \ abs(res_frate - stream_status.video_mode.timing.frame_rate) <= 500 and \ res_bpc == stream_status.video_mode.color_info.bpc and \ OPFFunctions.dict_color_formats.get( color_format) == stream_status.video_mode.color_info.color_format: return OPFDialogAnswer.PASS else: return OPFDialogAnswer.FAIL @staticmethod def opf_121_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_122_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_123_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_140_handler(dptx, dprx, res_x, res_y, res_frate, tim_std, tim_std_num) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((res_x, res_y, res_frate, tim_std, tim_std_num)) if tim_std_num is not None and tim_std_num.find('h') != -1: tim_std_num = tim_std_num.replace('h', '') timing_manager = dptx.pg.timing_manager if tim_std == 'DMT': timing = timing_manager.get_dmt(int(tim_std_num, 16)) elif tim_std == 'VIC': timing = timing_manager.get_cta(int(tim_std_num)) else: timing = timing_manager.search(res_x, res_y, res_frate, rb=OPFFunctions.dict_reduce_blanking.get(tim_std)) color_mode = ColorInfo() color_mode.color_format = ColorInfo.ColorFormat.CF_RGB color_mode.bpc = 8 color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB video_mode = VideoMode(timing=timing, color_info=color_mode) dptx.pg.set_vm(video_mode) dptx.pg.set_pattern(pattern=VideoPattern.ColorBars) res_pg = dptx.pg.apply() if not res_pg: return OPFDialogAnswer.ABORT time.sleep(5) rx_vm = dprx.link.status.stream(0).video_mode logging.info((rx_vm.timing.hactive, video_mode.timing.hactive, " | ", rx_vm.timing.vactive, video_mode.timing.vactive)) if rx_vm.timing.hactive == video_mode.timing.hactive and rx_vm.timing.vactive == video_mode.timing.vactive: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT @staticmethod def opf_141_handler(dptx, dprx, audio_format, channels, size, rate) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info((audio_format, channels, size, rate)) audio_mode = AudioMode() audio_mode.channel_count = channels audio_mode.bits = size audio_mode.sample_rate = rate dptx.ag.setup(audio_mode=audio_mode, audio_pattern=UniTAP.AudioPattern.SignalSine, signal_frequency=2000, amplitude=60) dptx.ag.apply() time.sleep(3) return OPFDialogAnswer.PROCEED @staticmethod def opf_142_handler(dptx, dprx, message: str): if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) if dptx.link.status.link_encoding == UniTAP.DpLinkEncoding.LE_8b10b: config = UniTAP.LinkConfig.DP8b10b() else: config = UniTAP.LinkConfig.DP128b132b() config.adaptive_sync_auto_enable = True dptx.link.config.set(config) dptx.link.start_link_training() time.sleep(1) res_app = dptx.pg.status().error def is_adaptive_sync_enabled(_dptx): status = _dptx.pg[0].adaptive_sync_status() return status if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30): return OPFDialogAnswer.ABORT if res_app == PGStatus.PGError.OK: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT @staticmethod def opf_143_handler(dptx, dprx, rate: int) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(rate) dptx.pg.set_as_config(as_config=FixedASParams(refresh_rate=rate, divide_by_1_001=False, increase_lines=100, decrease_lines=100)) dptx.pg.apply() res_app = dptx.pg.status().error def is_adaptive_sync_enabled(_dptx): status = _dptx.pg[0].adaptive_sync_status() return status if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30): return OPFDialogAnswer.ABORT if res_app == PGStatus.PGError.OK: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT @staticmethod def opf_144_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT logging.info(message) if message.find("Zigzag Sweep") != -1 or message.find("Square") != -1: increase_lines, decrease_lines, max_lanes, min_lanes = OPFFunctions.as_parse_edid(dptx) if message.find("Zigzag Sweep") != -1: dptx.pg.set_as_config(as_config=ZigzagASParams(min_lanes=min_lanes, max_lanes=max_lanes, increase_lines=increase_lines, decrease_lines=decrease_lines)) else: dptx.pg.set_as_config(as_config=SquareASParams(min_lanes=min_lanes, max_lanes=max_lanes, period_frames=20)) else: dptx.pg.set_as_config(as_config=ConstantASParams(lines=20)) dptx.pg.apply() res_app = dptx.pg.status().error def is_adaptive_sync_enabled(_dptx): status = _dptx.pg[0].adaptive_sync_status() return status if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30): return OPFDialogAnswer.ABORT if res_app == PGStatus.PGError.OK: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT @staticmethod def opf_145_handler(dptx, dprx, message: str) -> OPFDialogAnswer: if not OPFFunctions.check_dptx(dptx): return OPFDialogAnswer.ABORT increase_lines, decrease_lines, max_lanes, min_lanes = OPFFunctions.as_parse_edid(dptx) dptx.pg.set_as_config(as_config=ZigzagASParams(min_lanes=min_lanes, max_lanes=max_lanes, increase_lines=increase_lines, decrease_lines=decrease_lines)) dptx.pg.apply() res_app = dptx.pg.status().error def is_adaptive_sync_enabled(_dptx): time.sleep(0.5) status = _dptx.pg[0].adaptive_sync_status() return status if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30): return OPFDialogAnswer.ABORT if res_app == PGStatus.PGError.OK: return OPFDialogAnswer.PROCEED else: return OPFDialogAnswer.ABORT @staticmethod def opf_150_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFFunctions.opf_pass_handler(dptx, dprx, message) @staticmethod def opf_151_handler(hdtx, hdrx, width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp, h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_minor, dsc_v_major): if not OPFFunctions.check_hdtx(hdtx): return OPFDialogAnswer.ABORT logging.info(width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp, h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_major, dsc_v_minor) color_mode = ColorInfo() color_mode.color_format = OPFFunctions.dict_color_formats.get(color_format) col_rgb = color_mode.color_format == ColorInfo.ColorFormat.CF_RGB color_mode.bpc = bpc color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_rgb else ColorInfo.Colorimetry.CM_ITUR_BT709 h_slice_size = int(calculate_slice_size(width, h_slice_number)) v_slice_size = int(calculate_slice_size(height, v_slice_number)) OPFFunctions.g_h_slice_size = h_slice_size OPFFunctions.g_v_slice_size = v_slice_size compression_info = OPFFunctions.get_compression_info(v_slice_size, h_slice_size, bpp, color_format, buffer_bit_depth, is_block_prediction_enabled, dsc_v_major, dsc_v_minor, False) data_info = DataInfo() data_info.packing = DataInfo.Packing.P_PACKED if col_rgb else DataInfo.Packing.P_PLANAR data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_rgb else DataInfo.ComponentOrder.CO_YCbCr data_info.alignment = DataInfo.Alignment.A_LSB try: encoded_video_frame = OPFFunctions.check_dsc_content_library(width, height, color_mode, is_block_prediction_enabled, bpc, bpp, h_slice_number, v_slice_number, compression_info, dsc_v_major, dsc_v_minor, data_info) except BaseException as e: logging.info(e) return OPFDialogAnswer.ABORT OPFFunctions.g_vm = copy.deepcopy(encoded_video_frame) hdtx.pg.set_pattern(pattern=encoded_video_frame) res = hdtx.pg.apply() if not res and hdtx.pg.status().error != PGStatus.PGError.OK: return OPFDialogAnswer.ABORT return OPFDialogAnswer.PROCEED @staticmethod def opf_161_handler(dptx, dprx, message: str) -> OPFDialogAnswer: return OPFDialogAnswer.PROCEED @staticmethod def as_parse_edid(dptx): edid_data = dptx.edid.read_i2c() display_id_data = dptx.edid._parser.find_main_block(MainBlockType.DisplayID, edid_data) if len(display_id_data) == 0: return OPFDialogAnswer.ABORT as_data = dptx.edid._parser.find_additional_block(AdditionalBlockType.AdaptiveSync, display_id_data) if len(as_data) == 0: return OPFDialogAnswer.ABORT increase_lines = as_data[4] decrease_lines = as_data[8] min_ref_rate = round(as_data[5] / 1.001, 3) frame_rate = dptx.link.status.stream(0).video_mode.timing.frame_rate / 1000 v_total = dptx.link.status.stream(0).video_mode.timing.vtotal max_lanes = round(v_total * (frame_rate - min_ref_rate) / min_ref_rate) max_ref_rate = round((int.from_bytes(as_data[6: 8], "little") + 1) * 1.00035) min_lanes = round(v_total * (frame_rate - max_ref_rate) / max_ref_rate) return increase_lines, decrease_lines, max_lanes, min_lanes @staticmethod def check_dsc_content_library(*args): width, height, color_mode, is_block_prediction_enabled, bpc, bpp, h_slice_number, v_slice_number, \ compression_info, dsc_v_major, dsc_v_minor, data_info = args if OPFFunctions.dsc_content_library_path != "": if not os.path.exists(OPFFunctions.dsc_content_library_path): os.makedirs(OPFFunctions.dsc_content_library_path) dsc_image_name = f"{width}x{height}_{OPFFunctions.dict_dsc_color_names.get(compression_info.color_format)}_" \ f"{'BPY' if is_block_prediction_enabled else 'NBP'}_bpc{bpc}_" \ f"bpp{bpp}_{h_slice_number}slicew_{v_slice_number}sliceh_" \ f"{compression_info.buffer_bit_depth}lb_v{dsc_v_major}{dsc_v_minor}.dsc" full_path = os.path.join(OPFFunctions.dsc_content_library_path, dsc_image_name) if os.path.exists(full_path): # # Temporary message "Update DSC folder" # warnings.warn("Since version 3.5.X/3.6.X the usual file for generating DSC images has been updated." "Please update/regenerate your DSC folder/library with using new file.") with open(full_path, 'rb') as file: data = bytearray(file.read()) encoded_video_frame = dsc_video_frame_from_data(data) else: vf = generate_pattern_as_vf(PatternType.Unigraf, width, height, color_mode, data_info) encoded_video_frame = encode_video_frame(vf, compression_info) with open(full_path, 'wb') as file: file.write(encoded_video_frame.data) else: vf = generate_pattern_as_vf(PatternType.Unigraf, width, height, color_mode, data_info) encoded_video_frame = encode_video_frame(vf, compression_info) return encoded_video_frame @staticmethod def get_compression_info(*args): v_slice_size, h_slice_size, bpp, color_mode, buffer_bit_depth, is_block_prediction_enabled, dsc_v_major, \ dsc_v_minor, is_simple_as_444 = args compression_info = CompressionInfo() compression_info.v_slice_size = v_slice_size compression_info.h_slice_size = h_slice_size compression_info.bpp = bpp compression_info.color_format = OPFFunctions.dict_dsc_color_formats.get(color_mode) compression_info.buffer_bit_depth = buffer_bit_depth compression_info.is_block_prediction_enabled = is_block_prediction_enabled compression_info.version = (dsc_v_major, dsc_v_minor) compression_info.is_simple_as_444 = is_simple_as_444 return compression_info