from UniTAP.libs.lib_tsi.tsi import * from UniTAP.libs.lib_tsi.tsi_io import PortIO from .fec_shared import FECCounters from UniTAP.dev.ports.modules.dpcd.dpcd import DPCDRegisters class FecRx: """ Class `FecRx` allows working with FEC functionality from Sink (RX - receiver) side. You can: - Check capable FEC or not `is_capable`. - Check enabled FEC or not `is_enabled`. - Enable/Disable FEC `enable`. - Enable/Disable calculating sum of errors `aggregate_errors`. - Get error counters `get_error_counters`. - Clear all errors `clear`. """ def __init__(self, port_io: PortIO, dpcd: DPCDRegisters): self.__io = port_io self.__dpcd = dpcd self.__aggregate_error = 0 def is_enabled(self) -> bool: """ Returns status of FEC, is enabled or not. Returns: object of `bool` type. """ result = self.__io.get(TSI_DPRX_FEC_STATUS_R, c_int) status_fec = ((result[1] & 0x1) != 0) return bool(status_fec) def is_capable(self) -> bool: """ Returns status of FEC, is capable or not. Returns: object of `bool` type. """ result = self.__io.get(TSI_DPRX_FEC_CTRL, c_int) enabled_fec = (result[1] & 0x1) != 0 return enabled_fec def enable(self, enable: bool): """ Enable/Disable FEC. Args: enable (bool) - enable (True) or disable (False) """ val = 0x1 if enable else 0x0 self.__io.set(TSI_DPRX_FEC_CTRL, val) def aggregate_errors(self, enable: bool): """ Enable/Disable calculating sum of errors. Args: enable (bool) - enable (True) or disable (False) """ result = self.__io.get(TSI_DPRX_FEC_CONTROL, c_int) val = result[1] if enable: val |= 0x2 self.__aggregate_error = 1 else: val &= ~0x2 self.__aggregate_error = 0 self.__io.set(TSI_DPRX_FEC_CONTROL, val) def get_error_counters(self) -> FECCounters: """ Get current error counters. Returns: object of `FECCounters` type """ result = FECCounters() lane_count = self.__io.get(TSI_R_DPRX_LINK_LANE_COUNT, c_int)[1] if lane_count == 4: lane_count += 1 if self.__aggregate_error else 0 dpcd = self.__dpcd.read(0x120, 1).data[0] dpcdaggr = bool(dpcd & (1 << 6)) for i in range(lane_count): if i == 0 and dpcdaggr: dpcd &= ~(1 << 6) if i == 4 and dpcdaggr: dpcd |= 1 << 6 elif i == 4: continue if i < 4: dpcd &= ~(0x3 << 4) dpcd |= (i << 4) for j in range(1, 6): dpcd &= ~(0x7 << 1) v = dpcd | (j << 1) self.__dpcd.write(0x120, v) val = int.from_bytes(self.__dpcd.read(0x281, 2).data, 'little') if j == 1: result.uncorrectedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 2: result.correctedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 3: result.bitErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 4: result.parityBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 5: result.parityBitErrors[i] = val & 0x7FFF if val & 0x8000 else None return result def clear(self): """ Clear all errors. """ address = 0x120 dpcd = self.__dpcd.read(address, 1).data[0] old_fec = dpcd fec_ready = dpcd & ~0xE dpcd = fec_ready self.__dpcd.write(address, dpcd) dpcd = old_fec self.__dpcd.write(address, dpcd)