Files

1391 lines
59 KiB
Python
Raw Permalink Normal View History

2026-04-16 16:51:05 +08:00
import copy
import re
import threading
import time
import warnings
import UniTAP
from UniTAP.dev.modules.dut_tests.test_info import TestGroupId
from UniTAP.libs.lib_uicl.uicl import UICL_GeneratePattern, UICL_GeneratePattern_3Tap
from UniTAP.utils import tsi_logging as logging
from UniTAP.dev.ports.modules.link.dp.link_tx_types import LinkConfig
from UniTAP.common.audio_mode import AudioMode
from UniTAP.dev.ports.modules.vtg import *
from UniTAP.utils.uicl_api import *
from UniTAP.dev.ports.modules.edid import MainBlockType, AdditionalBlockType
from enum import IntEnum
from UniTAP.utils.function_wrapper import function_scheduler
from UniTAP.libs.lib_dscl.dscl import DSC_TOOLS_FOLDER, DSCL_ExtractPPSFromData, DSCL_Encode, DSCL_Decode
from UniTAP.libs.lib_dscl.dscl_utils import calculate_slice_size, dscl_image_to_dsc_vf
from UniTAP.libs.lib_pdl.pdl import generate_pattern_as_vf, PatternType
from UniTAP.libs.lib_tsi.tsi_types import TSI_MEMORY_WRITE_W, TSI_MEMORY_BLOCK_INDEX, TSI_MEMORY_LAYOUT, \
TSI_DP20_SINKCTS_SUPPORT_444CRC, TSI_DP14_SINKCTS_SUPPORT_444CRC
from UniTAP.libs.lib_tsi.tsi_private_types import TSI_DSC_MEMORY_BLOCK, TSI_DSC_DATA_SIZE, \
TSI_DSC_TX_CRC, TSI_DPRX_DSC_TEST_CRC, TSI_SELECT_SUITE
from UniTAP.libs.lib_tsi.tsi import TSIX_TS_SetConfigItem, TSIX_TS_GetConfigItem
from ctypes import c_uint64, c_uint32
from UniTAP.libs.lib_tsi import TSI_OPF_RETURN_CODE_ABORT, TSI_OPF_RETURN_CODE_PASS, \
TSI_OPF_RETURN_CODE_PROCEED, TSI_OPF_RETURN_CODE_FAIL, TSI_OPF_RETURN_CODE_AUTO_CLOSED
class OPFDialogAnswer(IntEnum):
ABORT = TSI_OPF_RETURN_CODE_ABORT
PASS = TSI_OPF_RETURN_CODE_PASS
PROCEED = TSI_OPF_RETURN_CODE_PROCEED
FAIL = TSI_OPF_RETURN_CODE_FAIL
NOTHING = TSI_OPF_RETURN_CODE_AUTO_CLOSED
class OPFFunctions:
dict_color_formats = {"RGB": ColorInfo.ColorFormat.CF_RGB,
"YCbCr 4:2:2": ColorInfo.ColorFormat.CF_YCbCr_422,
"YCbCr 4:4:4": ColorInfo.ColorFormat.CF_YCbCr_444,
"YCbCr 4:2:0": ColorInfo.ColorFormat.CF_YCbCr_420,
"Simple 4:2:2": ColorInfo.ColorFormat.CF_YCbCr_422,
"YCbCr422": ColorInfo.ColorFormat.CF_YCbCr_422,
"YCbCr444": ColorInfo.ColorFormat.CF_YCbCr_444,
"YCbCr420": ColorInfo.ColorFormat.CF_YCbCr_420,
"Simple422": ColorInfo.ColorFormat.CF_YCbCr_422
}
dict_dsc_color_formats = {"RGB": CompressionInfo.DscColorFormat.CF_RGB,
"YCbCr 4:2:2": CompressionInfo.DscColorFormat.CF_YCbCr_422,
"YCbCr 4:4:4": CompressionInfo.DscColorFormat.CF_YCbCr_444,
"YCbCr 4:2:0": CompressionInfo.DscColorFormat.CF_YCbCr_420,
"Simple 4:2:2": CompressionInfo.DscColorFormat.CF_Simple_422,
"YCbCr422": CompressionInfo.DscColorFormat.CF_YCbCr_422,
"YCbCr444": CompressionInfo.DscColorFormat.CF_YCbCr_444,
"YCbCr420": CompressionInfo.DscColorFormat.CF_YCbCr_420,
"Simple422": CompressionInfo.DscColorFormat.CF_Simple_422
}
dict_colorimetry = {None: ColorInfo.Colorimetry.CM_sRGB,
"ITU-601": ColorInfo.Colorimetry.CM_ITUR_BT601,
"ITU-709": ColorInfo.Colorimetry.CM_ITUR_BT709}
dict_reduce_blanking = {None: Timing.ReduceBlanking.RB_NONE,
'RB1': Timing.ReduceBlanking.RB1,
'RB2': Timing.ReduceBlanking.RB2,
'RB3': Timing.ReduceBlanking.RB3}
dict_patterns = {
"Color Ramp": VideoPattern.ColorRamp,
"Color Square": VideoPattern.ColorSquares
}
dict_dsc_color_names = {
CompressionInfo.DscColorFormat.CF_RGB: "RGB444",
CompressionInfo.DscColorFormat.CF_YCbCr_444: "YUV444",
CompressionInfo.DscColorFormat.CF_YCbCr_422: "YUV422",
CompressionInfo.DscColorFormat.CF_YCbCr_420: "YUV420",
CompressionInfo.DscColorFormat.CF_Simple_422: "SIMPL422"
}
path_custom_image = os.path.join(DSC_TOOLS_FOLDER, "Default_16K.png")
dsc_content_library_path = ""
g_vm = None
g_vf = None
g_h_slice_size = 0
g_v_slice_size = 0
@staticmethod
def opf_pass_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
logging.info(message)
return OPFDialogAnswer.PASS
@staticmethod
def check_dptx(dptx) -> bool:
return isinstance(dptx, (UniTAP.DPTX, UniTAP.DPTX4xx, UniTAP.DPTX5xx))
@staticmethod
def check_dprx(dprx) -> bool:
return isinstance(dprx, (UniTAP.DPRX, UniTAP.DPRX4xx, UniTAP.DPRX5xx))
@staticmethod
def check_hdtx(hdtx) -> bool:
return isinstance(hdtx, (UniTAP.HDTX, UniTAP.HDTX4xx))
@staticmethod
def check_hdrx(hdrx) -> bool:
return isinstance(hdrx, (UniTAP.HDRX, UniTAP.DPRX4xx))
@staticmethod
def check_video(dev_rx) -> bool:
def is_video_available(_dev_rx):
return _dev_rx.link.status.stream(0).crc != [0, 0, 0] and \
_dev_rx.link.status.stream(0).video_mode.timing.hactive != 0
return function_scheduler(is_video_available, dev_rx, interval=2, timeout=10)
@staticmethod
def opf_1_after_handler(dptx, dprx, dp_lanes, dp_link_rate, encoding):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((dp_lanes, dp_link_rate, encoding))
if encoding is None:
config = LinkConfig.DP8b10b()
else:
if encoding == '8b/10b':
config = LinkConfig.DP8b10b()
else:
config = LinkConfig.DP128b132b()
config.lane_count = dp_lanes
config.bit_rate = dp_link_rate
dptx.link.config.set(config)
dptx.link.start_link_training()
@staticmethod
def opf_1_handler(dptx, dprx, dp_lanes, dp_link_rate, encoding):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_2_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, tim_std, tim_rb):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, tim_std))
timing_manager = dptx.pg.timing_manager
if tim_std.find("VIC") != -1:
tim_id = int(re.findall(r'\d+', tim_std)[0])
timing = timing_manager.get_cta(tim_id)
elif tim_std.find("DMT") != -1:
tim_id = int(tim_std.replace("h", "").replace("DMT ", ""), 16)
timing = timing_manager.get_dmt(tim_id)
elif tim_std.find("CVT") != -1:
tim_id = int(tim_std.replace("h", "").replace("CVT ", ""), 16)
timing = timing_manager.get_cvt(tim_id)
else:
timing = timing_manager.search(res_x, res_y, res_frate, OPFFunctions.dict_reduce_blanking.get(tim_rb))
if timing is None:
timing = timing_manager.search(res_x, res_y, res_frate, OPFFunctions.dict_reduce_blanking.get(tim_rb))
color_mode = ColorInfo()
color_mode.color_format = ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = res_bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB
video_mode = VideoMode(timing=timing, color_info=color_mode)
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=VideoPattern.ColorBars)
res_pg = dptx.pg.apply()
res_app = dptx.pg.status().error
if not res_pg:
logging.info(f"[UniTAP] Stream {0} - Apply {res_app.__str__()}")
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_3_handler(dptx, dprx, message: str):
logging.info(message)
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_4_handler(dptx, dprx, message: str):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
val = dptx.dpcd.read(0x00101, 1).data[0] & 0xFF
val &= ~ 0x7
val |= 1
dptx.dpcd.write(0x00101, val)
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_5_handler(dptx, dprx, message: str):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
val = dptx.dpcd.read(0x00101, 1).data[0] & 0xFF
val &= ~ 0x7
val |= 4
dptx.dpcd.write(0x00101, val)
logging.info(message)
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_6_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, timing_standard, col_format,
col_range, col_yc,
pattern_name) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, res_bpc, timing_standard, col_format, col_range, col_yc, pattern_name))
if dptx.link.config.get(UniTAP.LinkConfig.DP8b10b).force_edid_timings_after_lt:
timing = dptx.pg.get_stream_video_mode().timing
else:
timing = dptx.pg.timing_manager.search(res_x, res_y, res_frate,
rb=OPFFunctions.dict_reduce_blanking.get(timing_standard))
color_mode = ColorInfo()
color_mode.color_format = OPFFunctions.dict_color_formats.get(col_format)
color_mode.bpc = res_bpc
color_mode.colorimetry = OPFFunctions.dict_colorimetry.get(col_yc)
color_mode.dynamic_range = ColorInfo.DynamicRange.DR_VESA if col_range == 'VESA' else ColorInfo.DynamicRange.DR_CTA
video_mode = VideoMode(timing=timing, color_info=color_mode)
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=OPFFunctions.dict_patterns.get(pattern_name))
res_pg = dptx.pg.apply()
if not res_pg:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_7_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
dptx.dpcd.write(0x600, 2)
dptx.link.start_link_training()
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_8_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
dptx.dpcd.write(0x600, 1)
dptx.link.start_link_training()
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_9_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, tim_std,
pattern_name) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, res_bpc, tim_std))
if not dptx.link.config.get(UniTAP.LinkConfig.DP8b10b).force_edid_timings_after_lt:
timing_manager = dptx.pg.timing_manager
rb = OPFFunctions.dict_reduce_blanking.get(tim_std)
timing = timing_manager.search(res_x, res_y, res_frate, rb=rb)
else:
timing = dptx.pg.get_stream_video_mode().timing
color_mode = ColorInfo()
color_mode.color_format = ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = res_bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB
video_mode = VideoMode(timing=timing, color_info=color_mode)
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=OPFFunctions.dict_patterns.get(pattern_name))
res_pg = dptx.pg.apply()
if not res_pg:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_10_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, timing_standard, audio_pattern,
ch_count, sample_freq,
sample_size, ch_alloc, ch_info) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((audio_pattern, ch_count, sample_freq, sample_size))
audio_mode = AudioMode()
audio_mode.channel_count = ch_count
audio_mode.bits = sample_size
audio_mode.sample_rate = sample_freq
dptx.ag.setup(audio_mode=audio_mode,
audio_pattern=UniTAP.AudioPattern.SignalSine,
signal_frequency=2000,
amplitude=60)
dptx.ag.apply()
if res_x != 0:
timing_manager = dptx.pg.timing_manager
rb = OPFFunctions.dict_reduce_blanking.get(timing_standard)
for standard in [UniTAP.Timing.Standard.SD_CTA, UniTAP.Timing.Standard.SD_DMT,
UniTAP.Timing.Standard.SD_CVT]:
timing = timing_manager.search(res_x, res_y, res_frate, standard=standard, rb=rb)
if timing is not None:
break
color_mode = ColorInfo()
color_mode.color_format = ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = res_bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB
video_mode = VideoMode(timing=timing, color_info=color_mode)
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=VideoPattern.ColorRamp)
res_pg = dptx.pg.apply()
if dptx.pg.status().error != PGStatus.PGError.OK and not res_pg:
return OPFDialogAnswer.ABORT
else:
for i in range(dptx.pg.max_stream_count):
dptx.pg[i].set_pattern(UniTAP.VideoPattern.Disabled)
res_pg = dptx.pg.apply()
if dptx.pg.status().error != PGStatus.PGError.OK and not res_pg:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_11_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_12_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.ABORT
logging.info(message)
def capture_frame(_dprx):
try:
dprx.video_capturer.start()
_vf = dprx.video_capturer.pop_element()
dprx.video_capturer.stop()
return _vf
except BaseException:
return None
iterations = 5
for i in range(iterations):
vf = capture_frame(dprx)
vf = video_frame_to_ci(vf, vf.color_info, OPFFunctions.g_vf.data_info)
if vf is not None and len(vf.data) > 0:
if OPFFunctions.g_vf is not None:
if vf.color_info.color_format == UniTAP.ColorInfo.ColorFormat.CF_DSC:
if OPFFunctions.g_vf.data[4:] in vf.data:
return OPFDialogAnswer.PASS
else:
if OPFFunctions.g_vf.data in vf.data:
return OPFDialogAnswer.PASS
return OPFDialogAnswer.FAIL
@staticmethod
def opf_13_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_14_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
data = dptx.dpcd.read(0x120, 1).data
data.append(1)
dptx.dpcd.write(0x120, data)
if OPFFunctions.check_dprx(dprx):
data = dptx.dpcd.read(0x120, 1).data
if len(data) > 0 and data[0] == 1:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_15_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
dptx.fec.enable(True)
def is_fec_enabled(_dptx):
return _dptx.fec.is_enabled()
if not function_scheduler(is_fec_enabled, dptx, interval=2, timeout=10):
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_16_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
dptx.fec.enable(False)
def is_fec_disable(_dptx):
return not _dptx.fec.is_enabled()
if not function_scheduler(is_fec_disable, dptx, interval=2, timeout=10):
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_17_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
logging.info(message)
res_x = 1920
res_y = 1080
res_frate = 60000
res_bpc = 8
col_yc = None
col_format = "RGB"
timing_manager = dptx.pg.timing_manager
standard = Timing.Standard.SD_CTA
timing = timing_manager.search(res_x, res_y, res_frate, standard=standard)
return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing,
pattern_name=None, enable_dsc=True)
@staticmethod
def opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing, pattern_name=None,
enable_dsc=True):
color_mode = ColorInfo()
color_mode.color_format = OPFFunctions.dict_color_formats.get(col_format)
color_mode.bpc = res_bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_yc is None else \
ColorInfo.Colorimetry.CM_ITUR_BT601
color_mode.dynamic_range = ColorInfo.DynamicRange.DR_VESA if col_yc is None else ColorInfo.DynamicRange.DR_CTA
data_info = DataInfo()
data_info.packing = DataInfo.Packing.P_PACKED if col_yc is None else DataInfo.Packing.P_PLANAR
data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_yc is None else DataInfo.ComponentOrder.CO_YCbCr
data_info.alignment = DataInfo.Alignment.A_LSB
video_mode = VideoMode(timing=timing, color_info=color_mode)
if enable_dsc:
sink_dsc_caps = dptx.dpcd.read(0x60, 16)
block_prediction = bool(sink_dsc_caps.data[6] & 0x1)
dsc_minor = sink_dsc_caps.data[1] >> 4 & 0xf
dsc_major = sink_dsc_caps.data[1] & 0xf
h_slice_size = int(calculate_slice_size(res_x, 10, color_mode.color_format))
v_slice_size = int(calculate_slice_size(res_y, 10, color_mode.color_format))
dptx.pg.set_vm(video_mode)
pixel_rate = timing.htotal * timing.vtotal * (timing.frame_rate / 1000)
link_bandwidth = dptx.link.status.available_link_rate
bpp = (link_bandwidth * 0.9 / pixel_rate) * 16
bpp = res_bpc * 16 if res_bpc * 16 < bpp else round(bpp)
compression_info = CompressionInfo()
compression_info.v_slice_size = v_slice_size
compression_info.h_slice_size = h_slice_size
compression_info.bpp = bpp
compression_info.color_format = OPFFunctions.dict_dsc_color_formats.get(col_format)
compression_info.buffer_bit_depth = res_bpc + 1
compression_info.is_block_prediction_enabled = block_prediction
compression_info.version = (dsc_major, dsc_minor)
if OPFFunctions.dsc_content_library_path != "":
if not os.path.exists(OPFFunctions.dsc_content_library_path):
os.makedirs(OPFFunctions.dsc_content_library_path)
dsc_image_name = f"{res_x}x{res_y}_{OPFFunctions.dict_dsc_color_names.get(compression_info.color_format)}_" \
f"{'BPY' if block_prediction else 'NBP'}_bpc{res_bpc}_" \
f"bpp{compression_info.bpp}_{10}slicew_{10}sliceh_" \
f"{compression_info.buffer_bit_depth}lb_v{dsc_major}{dsc_minor}.dsc"
full_path = os.path.join(OPFFunctions.dsc_content_library_path, dsc_image_name)
if os.path.exists(full_path):
#
# Temporary message "Update DSC folder"
#
warnings.warn("Since version 3.5.X/3.6.X the usual file for generating DSC images has been updated."
"Please update/regenerate your DSC folder/library with using new file.")
with open(full_path, 'rb') as file:
data = bytearray(file.read())
encoded_video_frame = dsc_video_frame_from_data(data)
else:
custom_vf = generate_pattern_as_vf(PatternType.Unigraf, res_x, res_y, color_mode, data_info)
encoded_video_frame = encode_video_frame(custom_vf, compression_info)
with open(full_path, 'wb') as file:
file.write(encoded_video_frame.data)
else:
custom_vf = generate_pattern_as_vf(PatternType.Unigraf, res_x, res_y, color_mode, data_info)
encoded_video_frame = encode_video_frame(custom_vf, compression_info)
dptx.pg.set_pattern(pattern=encoded_video_frame)
res = dptx.pg.apply()
if not res and dptx.pg.status().error != PGStatus.PGError.OK:
return OPFDialogAnswer.ABORT
OPFFunctions.g_vf = encoded_video_frame
elif pattern_name is not None:
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=pattern_name)
res = dptx.pg.apply()
if not res and dptx.pg.status().error != PGStatus.PGError.OK:
return OPFDialogAnswer.ABORT
try:
if not OPFFunctions.check_video(dprx):
print('Video is not available.')
return OPFDialogAnswer.ABORT
except BaseException as e:
print(e)
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_18_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc,
tim_std) \
-> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std))
timing_manager = dptx.pg.timing_manager
standard = Timing.Standard.SD_CTA if OPFFunctions.dict_reduce_blanking.get(
tim_std) is None else Timing.Standard.SD_CVT
timing = timing_manager.search(res_x, res_y, res_frate, standard=standard,
rb=OPFFunctions.dict_reduce_blanking.get(tim_std))
return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing)
@staticmethod
def opf_19_handler(dptx, dprx, use_3tap) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.ABORT
# check DSC state
if not dprx.link.capabilities.link_caps_status().dsc:
return OPFDialogAnswer.ABORT
incorrect_crc = [0xFFFF, 0xFFFF, 0xFFFF, 0x1111, 0x1111, 0x1111]
def load_dsc(handle: int, values: list) -> OPFDialogAnswer:
TSIX_TS_SetConfigItem(handle, TSI_DPRX_DSC_TEST_CRC, values, data_count=6)
return OPFDialogAnswer.FAIL
# TODO: Need to change on public interface. Temporary solution.
device_handle = dprx.bulk_capturer.__getattribute__("_BulkCapturer__memory_manager"). \
__getattribute__('_MemoryManager__io'). \
__getattribute__('device_handle')
iterations = 5
for i in range(iterations):
# capture frame
try:
dprx.video_capturer.start(frames_count=1)
vf = dprx.video_capturer.capture_result.buffer[-1]
dprx.video_capturer.stop()
except BaseException:
dprx.video_capturer.stop()
return load_dsc(device_handle, incorrect_crc)
def create_reference_image(video_frame, crc_values: list):
try:
pps_info = DSCL_ExtractPPSFromData(video_frame.data)
except BaseException:
return
sampling = UICL_Sampling.Sampling_444
if pps_info.is_yuv():
if pps_info.is_420():
sampling = UICL_Sampling.Sampling_420
elif pps_info.is_422():
sampling = UICL_Sampling.Sampling_422
elif pps_info.is_simple_422():
sampling = UICL_Sampling.Sampling_422
image = UICL_Image()
parameters = UICL_ImageParameters()
parameters.Width = pps_info.width()
parameters.Height = pps_info.height()
parameters.BitsPerColor = pps_info.bpc()
parameters.Alignment = UICL_Alignment.Alignment_LSB
parameters.IsFullRange = False
if pps_info.is_yuv():
parameters.Colorspace = UICL_Colorspace.Colorspace_YCbCr
parameters.Colorimetry = UICL_Colorimetry.Colorimetry_ITU_R_BT709
parameters.Sampling = sampling
parameters.ComponentOrder = UICL_ComponentOrder.Order_YCbCr
parameters.Packing = UICL_Packing.Packing_Planar
image.Parameters = parameters
try:
image.DataSize = UICL_GetRequiredBufferSize(image)
image.DataPtr = (c_uint8 * image.DataSize)()
if use_3tap:
res = UICL_GeneratePattern_3Tap(image, 1)
else:
res = UICL_GeneratePattern(image, 1)
if res < UICL_SUCCESS:
return
except BaseException:
return
else:
parameters.Colorspace = UICL_Colorspace.Colorspace_RGB
parameters.Colorimetry = UICL_Colorimetry.Colorimetry_Unknown
parameters.Sampling = sampling
parameters.ComponentOrder = UICL_ComponentOrder.Order_RGB
parameters.Packing = UICL_Packing.Packing_Packed
image.Parameters = parameters
try:
image.DataSize = UICL_GetRequiredBufferSize(image)
image.DataPtr = (c_uint8 * image.DataSize)()
if use_3tap:
res = UICL_GeneratePattern_3Tap(image, 0)
else:
res = UICL_GeneratePattern(image, 0)
if res < UICL_SUCCESS:
return
except BaseException:
return
dsc_encoded_image = DSCL_Encode(image, pps_info)
OPFFunctions.g_vf = dscl_image_to_dsc_vf(dsc_encoded_image)
dsc_decoded_uicl_image = DSCL_Decode(dsc_encoded_image)
reference_image_crc = UICL_CRC16()
try:
res = UICL_CalculateCRC16(dsc_decoded_uicl_image, reference_image_crc)
if res < UICL_SUCCESS:
crc_values.extend([0xFFFF, 0xFFFF, 0xFFFF])
else:
crc_values.extend([reference_image_crc.R, reference_image_crc.G, reference_image_crc.B])
except BaseException:
return
def decompress_dsc(video_frame, crc_values: list):
decoded_captured_vf = decode_video_frame(video_frame)
uicl_decoded_image_2 = image_from_vf(decoded_captured_vf)
decompressed_image_crc = UICL_CRC16()
try:
res = UICL_CalculateCRC16(uicl_decoded_image_2, decompressed_image_crc)
if res < UICL_SUCCESS:
crc_values.extend([0x1111, 0x1111, 0x1111])
else:
crc_values.extend([decompressed_image_crc.R, decompressed_image_crc.G,
decompressed_image_crc.B])
except BaseException:
return
reference_image_crc_values = []
decompressed_image_crc_values = []
thread_create_reference_image = threading.Thread(target=create_reference_image,
args=(vf, reference_image_crc_values))
thread_decompressed_image = threading.Thread(target=decompress_dsc,
args=(vf, decompressed_image_crc_values))
thread_create_reference_image.start()
thread_decompressed_image.start()
thread_create_reference_image.join()
thread_decompressed_image.join()
if len(reference_image_crc_values) > 0 and len(decompressed_image_crc_values) > 0 and \
((reference_image_crc_values == decompressed_image_crc_values) or
(reference_image_crc_values != decompressed_image_crc_values and (i == iterations - 1))):
# Write crc to register
load_dsc(device_handle, [decompressed_image_crc_values[0],
decompressed_image_crc_values[1],
decompressed_image_crc_values[2],
reference_image_crc_values[0],
reference_image_crc_values[1],
reference_image_crc_values[2]])
return OPFDialogAnswer.PROCEED
return load_dsc(device_handle, incorrect_crc)
@staticmethod
def opf_20_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std,
pattern_name, enable_dsc) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, res_bpc, col_format, col_range, col_yc, tim_std, pattern_name))
if pattern_name is not None:
if pattern_name == 'No Video':
pattern_name = VideoPattern.Disabled
elif pattern_name == 'Color Ramp':
pattern_name = VideoPattern.ColorRamp
elif pattern_name == 'Color Square':
pattern_name = VideoPattern.ColorSquares
elif pattern_name == 'Black and Vertical lines':
pattern_name = VideoPattern.WhiteVStrips
elif pattern_name == "Any":
if col_format == "RGB":
pattern_name = UniTAP.VideoPattern.ColorRamp
else:
pattern_name = UniTAP.VideoPattern.ColorSquares
else:
pattern_name = VideoPattern.ColorBars
timing_manager = dptx.pg.timing_manager
for item in tim_std:
if item.find("VIC") != -1:
tim_id = int(re.findall(r'\d+', item)[0])
timing = timing_manager.get_cta(tim_id)
if timing is None:
continue
break
elif item.find("DMT") != -1:
res = re.findall(r'\w+', item)[1][:-1]
tim_id = int(res, 16)
timing = timing_manager.get_dmt(tim_id)
if timing is None:
continue
break
elif item.find("CVT") != -1:
res = re.findall(r'RB1|RB2|RB3', item)
tim_id = res[0] if len(res) > 0 else None
standard = Timing.Standard.SD_CVT
timing = timing_manager.search(res_x, res_y, res_frate,
standard=standard,
rb=OPFFunctions.dict_reduce_blanking.get(tim_id))
if timing is None:
continue
break
elif item.find("UGF") != -1:
pass
elif item.find("OVT") != -1:
standard = Timing.Standard.SD_OVT
timing = timing_manager.search(res_x, res_y, res_frate,
standard=standard)
if timing is None:
continue
break
else:
timing = timing_manager.search(res_x, res_y, res_frate)
return OPFFunctions.opf_18_20_support(dptx, dprx, col_format, res_x, res_y, res_bpc, col_yc, timing,
pattern_name, enable_dsc)
@staticmethod
def opf_21_handler(dptx, dprx, audio_pattern, ch_count, sample_freq, sample_size, ch_alloc, ch_info):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((audio_pattern, ch_count, sample_freq, sample_size))
audio_mode = AudioMode()
audio_mode.channel_count = ch_count
audio_mode.bits = sample_size
audio_mode.sample_rate = sample_freq
audio_pattern_value = UniTAP.AudioPattern.SignalSawtooth if audio_pattern.find("Sawtooth") != -1 else (
UniTAP.AudioPattern.SignalSine)
dptx.ag.setup(audio_mode=audio_mode,
audio_pattern=audio_pattern_value,
signal_frequency=2000,
amplitude=60)
res = dptx.ag.apply()
if not res:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_101_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.FAIL
try:
dprx.audio_capturer.start(10)
dprx.audio_capturer.stop()
return UniTAP.OPFDialogAnswer.PASS if len(
dprx.audio_capturer.capture_result.buffer) > 0 else UniTAP.OPFDialogAnswer.FAIL
except BaseException as e:
print(f"Cannot capture audio. Error: {e}")
return UniTAP.OPFDialogAnswer.FAIL
@staticmethod
def opf_102_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.FAIL
def is_crc_valid(rx, dsc_flag):
return rx.link.status.stream(0).dsc_crc != [0, 0, 0] if dsc_flag else \
rx.link.status.stream(0).crc != [0, 0, 0]
logging.info(message)
if OPFFunctions.g_vm is not None:
try:
if UniTAP.utils.function_scheduler(is_crc_valid, dprx, True, interval=2, timeout=10):
try:
dprx.video_capturer.start(frames_count=5)
vf_list = dprx.video_capturer.capture_result.buffer
dprx.video_capturer.stop()
except BaseException as e:
logging.info(f'{e}\nCannot capture frames.')
dprx.video_capturer.stop()
return OPFDialogAnswer.FAIL
if isinstance(vf_list, list) and len(vf_list) == 5:
for vf in vf_list:
delta = len(vf.data) - len(OPFFunctions.g_vm.data) + 4
if OPFFunctions.g_vm.data[4:] in vf.data[:-delta]:
return OPFDialogAnswer.PASS
logging.info("Incorrect sequence of frames.")
return OPFDialogAnswer.FAIL
else:
logging.info("CRC is not valid.")
return OPFDialogAnswer.FAIL
except BaseException as e:
logging.info(f"{e}\nCannot check CRC.")
return OPFDialogAnswer.FAIL
else:
if UniTAP.utils.function_scheduler(is_crc_valid, dprx, False, interval=2, timeout=10):
def capture_frame(_dprx):
try:
dprx.video_capturer.start()
_vf = dprx.video_capturer.pop_element()
dprx.video_capturer.stop()
return _vf
except BaseException:
return None
iterations = 5
for i in range(iterations):
vf = capture_frame(dprx)
if len(vf.data) > 0:
return OPFDialogAnswer.PASS
return OPFDialogAnswer.FAIL
@staticmethod
def opf_103_handler(dptx, dprx, width, height, rate, color_format, is_block_prediction_enabled, bpc,
bpp,
h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_minor, dsc_v_major):
# TODO: Need to resolve problem with writing TSI_VR_DSC_MEMORY_BLOCK and TSI_VR_DSC_DATA_SIZE
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp,
h_slice_number,
buffer_bit_depth, v_slice_number, dsc_v_major, dsc_v_minor))
# TODO: Need to change on public interface. Temporary solution.
device_handle = dptx.pg.__getattribute__('_DpMstPatternGenerator__pg_list')[0]. \
__getattribute__("_PatternGenerator__memory_manager"). \
__getattribute__('_MemoryManager__io'). \
__getattribute__('device_handle')
color_mode = ColorInfo()
color_mode.color_format = OPFFunctions.dict_color_formats.get(color_format)
col_rgb = color_mode.color_format == ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_rgb else ColorInfo.Colorimetry.CM_ITUR_BT709
h_slice_size = int(calculate_slice_size(width, h_slice_number, color_mode.color_format))
v_slice_size = int(calculate_slice_size(height, v_slice_number, color_mode.color_format))
OPFFunctions.g_h_slice_size = h_slice_size
OPFFunctions.g_v_slice_size = v_slice_size
res, suite_id = TSIX_TS_GetConfigItem(device_handle, TSI_SELECT_SUITE, c_uint32)
if res > 0:
suite_id = TestGroupId(suite_id)
else:
suite_id = TestGroupId.UNKNOWN
res, is_simple_as_444 = 0, False
try:
if suite_id in [TestGroupId.DP_TX_LL_CTS, TestGroupId.DP_TX_LL_CTS_DSC,
TestGroupId.DP_TX_DISPLAYID, TestGroupId.DP_TX_ADAPTIVESYNC]:
res, is_simple_as_444 = TSIX_TS_GetConfigItem(device_handle, TSI_DP14_SINKCTS_SUPPORT_444CRC, c_uint32)
elif suite_id in [TestGroupId.DP_2_1_TX_LL_CTS, TestGroupId.DP_2_1_TX_DSC_CTS,
TestGroupId.DP_2_1_TX_DISPAYID, TestGroupId.DP_2_1_TX_ADAPTIVESYNC]:
res, is_simple_as_444 = TSIX_TS_GetConfigItem(device_handle, TSI_DP20_SINKCTS_SUPPORT_444CRC, c_uint32)
except BaseException as e:
pass
compression_info = OPFFunctions.get_compression_info(v_slice_size, h_slice_size, bpp, color_format,
buffer_bit_depth, is_block_prediction_enabled,
dsc_v_major, dsc_v_minor, is_simple_as_444)
data_info = DataInfo()
data_info.packing = DataInfo.Packing.P_PACKED if col_rgb else DataInfo.Packing.P_PLANAR
data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_rgb else DataInfo.ComponentOrder.CO_YCbCr
data_info.alignment = DataInfo.Alignment.A_LSB
try:
encoded_video_frame = OPFFunctions.check_dsc_content_library(width, height, color_mode,
is_block_prediction_enabled, bpc, bpp,
h_slice_number, v_slice_number,
compression_info, dsc_v_major,
dsc_v_minor, data_info)
except BaseException as e:
logging.info(e)
return OPFDialogAnswer.ABORT
OPFFunctions.g_vm = copy.deepcopy(encoded_video_frame)
TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_LAYOUT, len(encoded_video_frame.data), c_uint64)
TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_BLOCK_INDEX, 0, data_size=4)
TSIX_TS_SetConfigItem(device_handle, TSI_MEMORY_WRITE_W, bytearray(encoded_video_frame.data),
data_type=c_uint8,
data_count=len(encoded_video_frame.data))
TSIX_TS_SetConfigItem(device_handle, TSI_DSC_MEMORY_BLOCK, 0)
TSIX_TS_SetConfigItem(device_handle, TSI_DSC_DATA_SIZE, len(encoded_video_frame.data))
decoded_vf = decode_video_frame(encoded_video_frame)
crc0, crc1, crc2 = calculate_crc(decoded_vf)
TSIX_TS_SetConfigItem(device_handle, TSI_DSC_TX_CRC, [crc0, crc1, crc2], data_count=3)
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_104_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.FAIL
logging.info(message)
def capture_frame(_dprx):
try:
dprx.video_capturer.start()
_vf = dprx.video_capturer.pop_element()
dprx.video_capturer.stop()
return _vf
except BaseException:
return None
iterations = 5
for i in range(iterations):
vf = capture_frame(dprx)
if vf is not None and isinstance(vf, VideoFrameDSC):
slice_size = OPFFunctions.g_v_slice_size * OPFFunctions.g_h_slice_size
pps_size = 128
delta = len(vf.data) - len(OPFFunctions.g_vm.data) + 4
# Slice #1
if OPFFunctions.g_vm.data[4 + pps_size:slice_size] in vf.data[pps_size:slice_size]:
# Slice #3
if OPFFunctions.g_vm.data[4 + pps_size + slice_size * 2:slice_size * 3] in \
vf.data[pps_size + slice_size * 2:slice_size * 3]:
# Slice #4
if OPFFunctions.g_vm.data[4 + pps_size + slice_size * 3:slice_size * 4] in \
vf.data[pps_size + slice_size * 3:]:
# Slice #2
if OPFFunctions.g_vm.data[4 + pps_size + slice_size:slice_size * 2] not in \
vf.data[pps_size + slice_size:slice_size * 2]:
return OPFDialogAnswer.PASS
else:
print(f"Frame {i + 1}: Image looks good and distortion free for slice #2. It is wrong.")
else:
print(
f"Frame {i + 1}: Image does not look good and distortion free for slice #4. It is wrong.")
else:
print(f"Frame {i + 1}: Image does not look good and distortion free for slice #3. It is wrong.")
else:
print(f"Frame {i + 1}: Image does not look good and distortion free for slice #1. It is wrong.")
return OPFDialogAnswer.FAIL
@staticmethod
def opf_105_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_106_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_107_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_120_handler(dptx, dprx, res_x, res_y, res_frate, res_bpc, color_format) -> OPFDialogAnswer:
if not OPFFunctions.check_dprx(dprx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, res_bpc, color_format))
stream_status = dprx.link.status.stream(0)
if res_x == stream_status.video_mode.timing.hactive and res_y == stream_status.video_mode.timing.vactive and \
abs(res_frate - stream_status.video_mode.timing.frame_rate) <= 500 and \
res_bpc == stream_status.video_mode.color_info.bpc and \
OPFFunctions.dict_color_formats.get(
color_format) == stream_status.video_mode.color_info.color_format:
return OPFDialogAnswer.PASS
else:
return OPFDialogAnswer.FAIL
@staticmethod
def opf_121_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_122_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_123_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_140_handler(dptx, dprx, res_x, res_y, res_frate, tim_std, tim_std_num) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((res_x, res_y, res_frate, tim_std, tim_std_num))
if tim_std_num is not None and tim_std_num.find('h') != -1:
tim_std_num = tim_std_num.replace('h', '')
timing_manager = dptx.pg.timing_manager
if tim_std == 'DMT':
timing = timing_manager.get_dmt(int(tim_std_num, 16))
elif tim_std == 'VIC':
timing = timing_manager.get_cta(int(tim_std_num))
else:
timing = timing_manager.search(res_x, res_y, res_frate,
rb=OPFFunctions.dict_reduce_blanking.get(tim_std))
color_mode = ColorInfo()
color_mode.color_format = ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = 8
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB
video_mode = VideoMode(timing=timing, color_info=color_mode)
dptx.pg.set_vm(video_mode)
dptx.pg.set_pattern(pattern=VideoPattern.ColorBars)
res_pg = dptx.pg.apply()
if not res_pg:
return OPFDialogAnswer.ABORT
time.sleep(5)
rx_vm = dprx.link.status.stream(0).video_mode
logging.info((rx_vm.timing.hactive, video_mode.timing.hactive, " | ", rx_vm.timing.vactive,
video_mode.timing.vactive))
if rx_vm.timing.hactive == video_mode.timing.hactive and rx_vm.timing.vactive == video_mode.timing.vactive:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
@staticmethod
def opf_141_handler(dptx, dprx, audio_format, channels, size, rate) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info((audio_format, channels, size, rate))
audio_mode = AudioMode()
audio_mode.channel_count = channels
audio_mode.bits = size
audio_mode.sample_rate = rate
dptx.ag.setup(audio_mode=audio_mode,
audio_pattern=UniTAP.AudioPattern.SignalSine,
signal_frequency=2000,
amplitude=60)
dptx.ag.apply()
time.sleep(3)
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_142_handler(dptx, dprx, message: str):
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
if dptx.link.status.link_encoding == UniTAP.DpLinkEncoding.LE_8b10b:
config = UniTAP.LinkConfig.DP8b10b()
else:
config = UniTAP.LinkConfig.DP128b132b()
config.adaptive_sync_auto_enable = True
dptx.link.config.set(config)
dptx.link.start_link_training()
time.sleep(1)
res_app = dptx.pg.status().error
def is_adaptive_sync_enabled(_dptx):
status = _dptx.pg[0].adaptive_sync_status()
return status
if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30):
return OPFDialogAnswer.ABORT
if res_app == PGStatus.PGError.OK:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
@staticmethod
def opf_143_handler(dptx, dprx, rate: int) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(rate)
dptx.pg.set_as_config(as_config=FixedASParams(refresh_rate=rate,
divide_by_1_001=False,
increase_lines=100,
decrease_lines=100))
dptx.pg.apply()
res_app = dptx.pg.status().error
def is_adaptive_sync_enabled(_dptx):
status = _dptx.pg[0].adaptive_sync_status()
return status
if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30):
return OPFDialogAnswer.ABORT
if res_app == PGStatus.PGError.OK:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
@staticmethod
def opf_144_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
logging.info(message)
if message.find("Zigzag Sweep") != -1 or message.find("Square") != -1:
increase_lines, decrease_lines, max_lanes, min_lanes = OPFFunctions.as_parse_edid(dptx)
if message.find("Zigzag Sweep") != -1:
dptx.pg.set_as_config(as_config=ZigzagASParams(min_lanes=min_lanes,
max_lanes=max_lanes,
increase_lines=increase_lines,
decrease_lines=decrease_lines))
else:
dptx.pg.set_as_config(as_config=SquareASParams(min_lanes=min_lanes,
max_lanes=max_lanes,
period_frames=20))
else:
dptx.pg.set_as_config(as_config=ConstantASParams(lines=20))
dptx.pg.apply()
res_app = dptx.pg.status().error
def is_adaptive_sync_enabled(_dptx):
status = _dptx.pg[0].adaptive_sync_status()
return status
if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30):
return OPFDialogAnswer.ABORT
if res_app == PGStatus.PGError.OK:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
@staticmethod
def opf_145_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
if not OPFFunctions.check_dptx(dptx):
return OPFDialogAnswer.ABORT
increase_lines, decrease_lines, max_lanes, min_lanes = OPFFunctions.as_parse_edid(dptx)
dptx.pg.set_as_config(as_config=ZigzagASParams(min_lanes=min_lanes,
max_lanes=max_lanes,
increase_lines=increase_lines,
decrease_lines=decrease_lines))
dptx.pg.apply()
res_app = dptx.pg.status().error
def is_adaptive_sync_enabled(_dptx):
time.sleep(0.5)
status = _dptx.pg[0].adaptive_sync_status()
return status
if not function_scheduler(is_adaptive_sync_enabled, dptx, interval=3, timeout=30):
return OPFDialogAnswer.ABORT
if res_app == PGStatus.PGError.OK:
return OPFDialogAnswer.PROCEED
else:
return OPFDialogAnswer.ABORT
@staticmethod
def opf_150_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFFunctions.opf_pass_handler(dptx, dprx, message)
@staticmethod
def opf_151_handler(hdtx, hdrx, width, height, rate, color_format, is_block_prediction_enabled, bpc,
bpp, h_slice_number, buffer_bit_depth, v_slice_number, dsc_v_minor, dsc_v_major):
if not OPFFunctions.check_hdtx(hdtx):
return OPFDialogAnswer.ABORT
logging.info(width, height, rate, color_format, is_block_prediction_enabled, bpc, bpp,
h_slice_number,
buffer_bit_depth, v_slice_number, dsc_v_major, dsc_v_minor)
color_mode = ColorInfo()
color_mode.color_format = OPFFunctions.dict_color_formats.get(color_format)
col_rgb = color_mode.color_format == ColorInfo.ColorFormat.CF_RGB
color_mode.bpc = bpc
color_mode.colorimetry = ColorInfo.Colorimetry.CM_sRGB if col_rgb else ColorInfo.Colorimetry.CM_ITUR_BT709
h_slice_size = int(calculate_slice_size(width, h_slice_number))
v_slice_size = int(calculate_slice_size(height, v_slice_number))
OPFFunctions.g_h_slice_size = h_slice_size
OPFFunctions.g_v_slice_size = v_slice_size
compression_info = OPFFunctions.get_compression_info(v_slice_size, h_slice_size, bpp, color_format,
buffer_bit_depth, is_block_prediction_enabled,
dsc_v_major, dsc_v_minor, False)
data_info = DataInfo()
data_info.packing = DataInfo.Packing.P_PACKED if col_rgb else DataInfo.Packing.P_PLANAR
data_info.component_order = DataInfo.ComponentOrder.CO_RGB if col_rgb else DataInfo.ComponentOrder.CO_YCbCr
data_info.alignment = DataInfo.Alignment.A_LSB
try:
encoded_video_frame = OPFFunctions.check_dsc_content_library(width, height, color_mode,
is_block_prediction_enabled, bpc, bpp,
h_slice_number, v_slice_number,
compression_info, dsc_v_major,
dsc_v_minor, data_info)
except BaseException as e:
logging.info(e)
return OPFDialogAnswer.ABORT
OPFFunctions.g_vm = copy.deepcopy(encoded_video_frame)
hdtx.pg.set_pattern(pattern=encoded_video_frame)
res = hdtx.pg.apply()
if not res and hdtx.pg.status().error != PGStatus.PGError.OK:
return OPFDialogAnswer.ABORT
return OPFDialogAnswer.PROCEED
@staticmethod
def opf_161_handler(dptx, dprx, message: str) -> OPFDialogAnswer:
return OPFDialogAnswer.PROCEED
@staticmethod
def as_parse_edid(dptx):
edid_data = dptx.edid.read_i2c()
display_id_data = dptx.edid._parser.find_main_block(MainBlockType.DisplayID, edid_data)
if len(display_id_data) == 0:
return OPFDialogAnswer.ABORT
as_data = dptx.edid._parser.find_additional_block(AdditionalBlockType.AdaptiveSync, display_id_data)
if len(as_data) == 0:
return OPFDialogAnswer.ABORT
increase_lines = as_data[4]
decrease_lines = as_data[8]
min_ref_rate = round(as_data[5] / 1.001, 3)
frame_rate = dptx.link.status.stream(0).video_mode.timing.frame_rate / 1000
v_total = dptx.link.status.stream(0).video_mode.timing.vtotal
max_lanes = round(v_total * (frame_rate - min_ref_rate) / min_ref_rate)
max_ref_rate = round((int.from_bytes(as_data[6: 8], "little") + 1) * 1.00035)
min_lanes = round(v_total * (frame_rate - max_ref_rate) / max_ref_rate)
return increase_lines, decrease_lines, max_lanes, min_lanes
@staticmethod
def check_dsc_content_library(*args):
width, height, color_mode, is_block_prediction_enabled, bpc, bpp, h_slice_number, v_slice_number, \
compression_info, dsc_v_major, dsc_v_minor, data_info = args
if OPFFunctions.dsc_content_library_path != "":
if not os.path.exists(OPFFunctions.dsc_content_library_path):
os.makedirs(OPFFunctions.dsc_content_library_path)
dsc_image_name = f"{width}x{height}_{OPFFunctions.dict_dsc_color_names.get(compression_info.color_format)}_" \
f"{'BPY' if is_block_prediction_enabled else 'NBP'}_bpc{bpc}_" \
f"bpp{bpp}_{h_slice_number}slicew_{v_slice_number}sliceh_" \
f"{compression_info.buffer_bit_depth}lb_v{dsc_v_major}{dsc_v_minor}.dsc"
full_path = os.path.join(OPFFunctions.dsc_content_library_path, dsc_image_name)
if os.path.exists(full_path):
#
# Temporary message "Update DSC folder"
#
warnings.warn("Since version 3.5.X/3.6.X the usual file for generating DSC images has been updated."
"Please update/regenerate your DSC folder/library with using new file.")
with open(full_path, 'rb') as file:
data = bytearray(file.read())
encoded_video_frame = dsc_video_frame_from_data(data)
else:
vf = generate_pattern_as_vf(PatternType.Unigraf, width, height, color_mode, data_info)
encoded_video_frame = encode_video_frame(vf, compression_info)
with open(full_path, 'wb') as file:
file.write(encoded_video_frame.data)
else:
vf = generate_pattern_as_vf(PatternType.Unigraf, width, height, color_mode, data_info)
encoded_video_frame = encode_video_frame(vf, compression_info)
return encoded_video_frame
@staticmethod
def get_compression_info(*args):
v_slice_size, h_slice_size, bpp, color_mode, buffer_bit_depth, is_block_prediction_enabled, dsc_v_major, \
dsc_v_minor, is_simple_as_444 = args
compression_info = CompressionInfo()
compression_info.v_slice_size = v_slice_size
compression_info.h_slice_size = h_slice_size
compression_info.bpp = bpp
compression_info.color_format = OPFFunctions.dict_dsc_color_formats.get(color_mode)
compression_info.buffer_bit_depth = buffer_bit_depth
compression_info.is_block_prediction_enabled = is_block_prediction_enabled
compression_info.version = (dsc_v_major, dsc_v_minor)
compression_info.is_simple_as_444 = is_simple_as_444
return compression_info