import time from UniTAP.libs.lib_tsi.tsi_io import PortIO from .types import HdmiModeRx, FrlMode, _update_error_counters_rx from UniTAP.libs.lib_tsi.tsi_types import TSI_HDRX_BEHAVIOR, TSI_HDRX_LANES_ERR_COUNTERS_R, TSI_HDRX_HPD_STATUS_R,\ TSI_HDRX_FRL_CAPABILITY, TSI_HDRX_LINK_STATUS_R, TSI_HDRX_HPD_CONTROL_W, TSI_HDRX_VIDEO_MODE_R, \ TSI_VIDCAP_SIGNAL_CRC_R, TSI_SUCCESS from ctypes import c_uint8, c_uint32, c_uint64 from UniTAP.utils.function_wrapper import function_scheduler from .types import _hdmi_vm_info_to_video_mode, HdmiVideoModeInfo from .private_types import HdmiCrc, HdmiVideoMode from ..dp.link_status_common import StreamStatus class StatusRx: """ Class `StatusRx` describes information about HDMI link status on Sink (RX - receiver) side. Contains following info: - HDMI mode `hdmi_mode`. - Link Error counters `error_counters`. - Channel lock `channel_lock`. - HPD status `hpd_status` and HPD assert `set_assert_state`. - Stream info `stream`. """ def __init__(self, port_io: PortIO): self.__io = port_io def __frl_status(self) -> FrlMode: return FrlMode(self.__io.get(TSI_HDRX_FRL_CAPABILITY, c_uint32)[1] & 0xF) @property def hdmi_mode(self) -> HdmiModeRx: """ Returns current HDMI mode. Returns: object of `HdmiModeRx` type """ return HdmiModeRx(self.__io.get(TSI_HDRX_BEHAVIOR, c_uint32)[1]) @hdmi_mode.setter def hdmi_mode(self, hdmi_mode: HdmiModeRx): """ Set new HDMI mode. Args: hdmi_mode (HdmiModeRx) """ self.__io.set(TSI_HDRX_BEHAVIOR, hdmi_mode.value, c_uint32) @property def error_counters(self) -> list: """ Returns values of current errors on link. Returns: object of list type """ return _update_error_counters_rx(self.__io.get(TSI_HDRX_LANES_ERR_COUNTERS_R, c_uint64)[1]) def set_assert_state(self, asserted: bool = True): """ Assert/Deassert HPD state. Args: asserted (bool) """ self.__io.set(TSI_HDRX_HPD_CONTROL_W, int(asserted), c_uint32) @property def channel_lock(self) -> list: """ Returns channel lock states of current link. Returns: object of list type """ return self.__update_channel_lock(self.__io.get(TSI_HDRX_LINK_STATUS_R, c_uint32)[1], self.hdmi_mode) @staticmethod def __update_channel_lock(value: int, mode: HdmiModeRx) -> list: if mode != HdmiModeRx.HDMI_2_1: lane_0 = (value & (1 << 4)) != 0 lane_1 = (value & (1 << 5)) != 0 lane_2 = (value & (1 << 6)) != 0 lane_3 = False else: lane_0 = (value & (1 << 13)) != 0 lane_1 = (value & (1 << 14)) != 0 lane_2 = (value & (1 << 15)) != 0 lane_3 = (value & (1 << 16)) != 0 return [lane_0, lane_1, lane_2, lane_3] @property def hpd_status(self) -> bool: """ Returns True if HDP is enabled, False - if not. Returns: object of bool type """ return (self.__io.get(TSI_HDRX_HPD_STATUS_R, c_uint32)[1] & 0x1) != 0 def __check_video(self) -> bool: def is_msa_available(io): result, msa_info, size = io.get(TSI_HDRX_VIDEO_MODE_R, HdmiVideoMode, 4) return result > 0 return function_scheduler(is_msa_available, self.__io, interval=5, timeout=10) def stream(self, stream_index: int = 0) -> StreamStatus: """ Returns status of selected stream `StreamStatus`. Args: stream_index (int) - number of selected number Returns: object of `StreamStatus` type """ stream_status = StreamStatus() if not self.__check_video(): raise ValueError("Video is not available.") time.sleep(1) result, hdmi_video_mode_info, size = self.__io.get(TSI_HDRX_VIDEO_MODE_R, HdmiVideoMode, 4) # TODO - Size temporary will be equal 1 size = 1 if 0 <= stream_index < size: stream_status.video_mode = _hdmi_vm_info_to_video_mode(HdmiVideoModeInfo(hdmi_video_mode_info [stream_index])) result = self.__io.get(TSI_VIDCAP_SIGNAL_CRC_R, data_type=c_uint8, data_count=6) if result[0] >= TSI_SUCCESS: hdmi_crc = HdmiCrc.from_buffer(bytearray(result[1])) stream_status.crc = [hdmi_crc.r, hdmi_crc.g, hdmi_crc.b] stream_status.dsc_crc = [0, 0, 0] else: raise ValueError(f"Selected stream {stream_index} is not available. " f"Available stream number: {size}") return stream_status def __str__(self): return f"HDMI Mode: {self.hdmi_mode.name}\n" \ f"Channel Lock:\n{self.channel_lock}\n" \ f"Error Counters:\n{self.error_counters.__str__()}\n"