from UniTAP.libs.lib_tsi.tsi import * from UniTAP.libs.lib_tsi.tsi_io import PortIO from .fec_shared import FECCounters, FECErrorType8b10b, FECErrorType128b132b from UniTAP.dev.ports.modules.dpcd.dpcd import DPCDRegisters from typing import Union class FecTx: """ Class `FecTx` allows working with FEC functionality from Source (TX - transmitter) side. You can: - Check enabled FEC or not `is_enabled`. - Check state that FEC is prefers after link training `is_prefer_after_lt`. - Enable/Disable FEC `enable`. - Enable/Disable intent FEC `enable_intent`. - Enable/Disable calculating sum of errors `aggregate_errors`. - Get error counters `get_error_counters`. - Clear all errors `clear`. """ def __init__(self, port_io: PortIO, dpcd: DPCDRegisters): self.__io = port_io self.__dpcd = dpcd self.__aggregate_error = 0 def is_enabled(self) -> bool: """ Returns status of FEC, is enabled or not. Returns: object of `bool` type. """ result = self.__io.get(TSI_DPTX_FEC_STATUS_R, c_int) status_fec = ((result[1] & 0x1) != 0) return bool(status_fec) def is_prefer_after_lt(self) -> bool: """ Check state that FEC is prefers after link training. Returns: object of `bool` type. """ result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int) enabled_fec = (result[1] & 0x2) != 0 return enabled_fec def enable(self, enable: bool): """ Enable/Disable FEC. Args: enable (bool) - enable (True) or disable (False) """ result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int) val = result[1] val |= 0x8 if enable else 0x10 self.__io.set(TSI_DPTX_FEC_CTRL, val) def enable_intent(self, enable: bool): """ Enable/Disable intent FEC. Args: enable (bool) - enable (True) or disable (False) """ result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int) val = result[1] val |= 1 if enable else 4 self.__io.set(TSI_DPTX_FEC_CTRL, val) def aggregate_errors(self, enable: bool): """ Enable/Disable calculating sum of errors. Args: enable (bool) - enable (True) or disable (False) """ result = self.__io.get(TSI_DPTX_FEC_CONTROL, c_int) val = result[1] if enable: val |= 0x40 self.__aggregate_error = 1 else: val &= ~0x40 self.__aggregate_error = 0 self.__io.set(TSI_DPTX_FEC_CONTROL, val) def generate_errors(self, error_type: Union[FECErrorType8b10b, FECErrorType128b132b], lane: list, ms: int = 100): """ Generate FEC errors. Args: error_type (Union[`FECErrorType8b10b`, `FECErrorType128b132b`]) lane (list) ms (int) - time in m seconds """ result, status = self.__io.get(TSI_DPTX_LINK_MODE_R, c_int) resut, hw_caps, size = self.__io.get(TSI_DPTX_HW_CAPS_R, c_uint32, 4) if status == 0: if isinstance(error_type, FECErrorType128b132b): assert False, "This device doesn't support 128b/132b" if (hw_caps[1] & 0x7) == 0 else "Change link mode!" if status == 1: if isinstance(error_type, FECErrorType8b10b): assert False, "This device doesn't support 8b/10b" if (hw_caps[1] & 0x7) == 0 else "Change link mode!" dpcd = self.__dpcd.read(0x120, 1).data[0] dpcd |= 2 self.__dpcd.write(0x120, dpcd) delay = ms * 100 self.__io.set(TSI_MLEG_CONTROL, 0) nl = self.__io.get(TSI_R_DPTX_LINK_STATUS_LANE_COUNT, c_uint32)[1] self.__io.set(TSI_MLEG_SYMBOL_REPLACE_A, 0x00010000) self.__io.set(TSI_MLEG_SYMBOL_REPLACE_MASK_A, 0x00010001) self.__io.set(TSI_MLEG_SYMBOL_REPLACE_B, 0x00000001) self.__io.set(TSI_MLEG_SYMBOL_REPLACE_MASK_B, 0x00010001) self.__io.set(TSI_MLEG_DELAY_COUNTER, delay) n = lane[0] << 16 self.__io.set(TSI_MLEG_LANE0_REPLACE_COUNTERS, n) n = lane[1] << 16 self.__io.set(TSI_MLEG_LANE1_REPLACE_COUNTERS, n) n = lane[2] << 16 self.__io.set(TSI_MLEG_LANE2_REPLACE_COUNTERS, n) n = lane[3] << 16 self.__io.set(TSI_MLEG_LANE3_REPLACE_COUNTERS, n) ctrl = 0 ctrl |= (error_type.value << 16) if nl == 1: ctrl |= (1 << 13) ctrl |= (1 << 12) ctrl |= (0xF << 4) if lane[0]: ctrl |= (1 << 0) if lane[1]: ctrl |= (1 << 1) if lane[2]: ctrl |= (1 << 2) if lane[3]: ctrl |= (1 << 3) self.__io.set(TSI_MLEG_CONTROL, ctrl) def get_error_counters(self) -> FECCounters: """ Get current error counters. Returns: object of `FECCounters` type """ result = FECCounters() lane_count = self.__io.get(TSI_R_DPTX_LINK_STATUS_LANE_COUNT, c_uint32)[1] if lane_count == 4: lane_count += 1 if self.__aggregate_error else 0 dpcd = self.__dpcd.read(0x120, 1).data[0] dpcdaggr = bool(dpcd & (1 << 6)) for i in range(lane_count): if i == 0 and dpcdaggr: dpcd &= ~(1 << 6) if i == 4 and dpcdaggr: dpcd |= 1 << 6 elif i == 4: continue if i < 4: dpcd &= ~(0x3 << 4) dpcd |= (i << 4) for j in range(1, 6): dpcd &= ~(0x7 << 1) v = dpcd | (j << 1) self.__dpcd.write(0x120, v) val = int.from_bytes(self.__dpcd.read(0x281, 2).data, 'little') if j == 1: result.uncorrectedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 2: result.correctedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 3: result.bitErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 4: result.parityBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None elif j == 5: result.parityBitErrors[i] = val & 0x7FFF if val & 0x8000 else None return result def clear(self): """ Clear all errors. """ address = 0x120 dpcd = self.__dpcd.read(address, 1).data[0] fec_ready = dpcd & ~0xE dpcd = fec_ready self.__dpcd.write(address, dpcd)