Files
pqAutomationApp/UniTAP/dev/ports/modules/fec/fec_tx.py

205 lines
6.6 KiB
Python
Raw Normal View History

2026-04-16 16:51:05 +08:00
from UniTAP.libs.lib_tsi.tsi import *
from UniTAP.libs.lib_tsi.tsi_io import PortIO
from .fec_shared import FECCounters, FECErrorType8b10b, FECErrorType128b132b
from UniTAP.dev.ports.modules.dpcd.dpcd import DPCDRegisters
from typing import Union
class FecTx:
"""
Class `FecTx` allows working with FEC functionality from Source (TX - transmitter) side. You can:
- Check enabled FEC or not `is_enabled`.
- Check state that FEC is prefers after link training `is_prefer_after_lt`.
- Enable/Disable FEC `enable`.
- Enable/Disable intent FEC `enable_intent`.
- Enable/Disable calculating sum of errors `aggregate_errors`.
- Get error counters `get_error_counters`.
- Clear all errors `clear`.
"""
def __init__(self, port_io: PortIO, dpcd: DPCDRegisters):
self.__io = port_io
self.__dpcd = dpcd
self.__aggregate_error = 0
def is_enabled(self) -> bool:
"""
Returns status of FEC, is enabled or not.
Returns:
object of `bool` type.
"""
result = self.__io.get(TSI_DPTX_FEC_STATUS_R, c_int)
status_fec = ((result[1] & 0x1) != 0)
return bool(status_fec)
def is_prefer_after_lt(self) -> bool:
"""
Check state that FEC is prefers after link training.
Returns:
object of `bool` type.
"""
result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int)
enabled_fec = (result[1] & 0x2) != 0
return enabled_fec
def enable(self, enable: bool):
"""
Enable/Disable FEC.
Args:
enable (bool) - enable (True) or disable (False)
"""
result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int)
val = result[1]
val |= 0x8 if enable else 0x10
self.__io.set(TSI_DPTX_FEC_CTRL, val)
def enable_intent(self, enable: bool):
"""
Enable/Disable intent FEC.
Args:
enable (bool) - enable (True) or disable (False)
"""
result = self.__io.get(TSI_DPTX_FEC_CTRL, c_int)
val = result[1]
val |= 1 if enable else 4
self.__io.set(TSI_DPTX_FEC_CTRL, val)
def aggregate_errors(self, enable: bool):
"""
Enable/Disable calculating sum of errors.
Args:
enable (bool) - enable (True) or disable (False)
"""
result = self.__io.get(TSI_DPTX_FEC_CONTROL, c_int)
val = result[1]
if enable:
val |= 0x40
self.__aggregate_error = 1
else:
val &= ~0x40
self.__aggregate_error = 0
self.__io.set(TSI_DPTX_FEC_CONTROL, val)
def generate_errors(self, error_type: Union[FECErrorType8b10b, FECErrorType128b132b], lane: list, ms: int = 100):
"""
Generate FEC errors.
Args:
error_type (Union[`FECErrorType8b10b`, `FECErrorType128b132b`])
lane (list)
ms (int) - time in m seconds
"""
result, status = self.__io.get(TSI_DPTX_LINK_MODE_R, c_int)
resut, hw_caps, size = self.__io.get(TSI_DPTX_HW_CAPS_R, c_uint32, 4)
if status == 0:
if isinstance(error_type, FECErrorType128b132b):
assert False, "This device doesn't support 128b/132b" if (hw_caps[1] & 0x7) == 0 else "Change link mode!"
if status == 1:
if isinstance(error_type, FECErrorType8b10b):
assert False, "This device doesn't support 8b/10b" if (hw_caps[1] & 0x7) == 0 else "Change link mode!"
dpcd = self.__dpcd.read(0x120, 1).data[0]
dpcd |= 2
self.__dpcd.write(0x120, dpcd)
delay = ms * 100
self.__io.set(TSI_MLEG_CONTROL, 0)
nl = self.__io.get(TSI_R_DPTX_LINK_STATUS_LANE_COUNT, c_uint32)[1]
self.__io.set(TSI_MLEG_SYMBOL_REPLACE_A, 0x00010000)
self.__io.set(TSI_MLEG_SYMBOL_REPLACE_MASK_A, 0x00010001)
self.__io.set(TSI_MLEG_SYMBOL_REPLACE_B, 0x00000001)
self.__io.set(TSI_MLEG_SYMBOL_REPLACE_MASK_B, 0x00010001)
self.__io.set(TSI_MLEG_DELAY_COUNTER, delay)
n = lane[0] << 16
self.__io.set(TSI_MLEG_LANE0_REPLACE_COUNTERS, n)
n = lane[1] << 16
self.__io.set(TSI_MLEG_LANE1_REPLACE_COUNTERS, n)
n = lane[2] << 16
self.__io.set(TSI_MLEG_LANE2_REPLACE_COUNTERS, n)
n = lane[3] << 16
self.__io.set(TSI_MLEG_LANE3_REPLACE_COUNTERS, n)
ctrl = 0
ctrl |= (error_type.value << 16)
if nl == 1:
ctrl |= (1 << 13)
ctrl |= (1 << 12)
ctrl |= (0xF << 4)
if lane[0]:
ctrl |= (1 << 0)
if lane[1]:
ctrl |= (1 << 1)
if lane[2]:
ctrl |= (1 << 2)
if lane[3]:
ctrl |= (1 << 3)
self.__io.set(TSI_MLEG_CONTROL, ctrl)
def get_error_counters(self) -> FECCounters:
"""
Get current error counters.
Returns:
object of `FECCounters` type
"""
result = FECCounters()
lane_count = self.__io.get(TSI_R_DPTX_LINK_STATUS_LANE_COUNT, c_uint32)[1]
if lane_count == 4:
lane_count += 1 if self.__aggregate_error else 0
dpcd = self.__dpcd.read(0x120, 1).data[0]
dpcdaggr = bool(dpcd & (1 << 6))
for i in range(lane_count):
if i == 0 and dpcdaggr:
dpcd &= ~(1 << 6)
if i == 4 and dpcdaggr:
dpcd |= 1 << 6
elif i == 4:
continue
if i < 4:
dpcd &= ~(0x3 << 4)
dpcd |= (i << 4)
for j in range(1, 6):
dpcd &= ~(0x7 << 1)
v = dpcd | (j << 1)
self.__dpcd.write(0x120, v)
val = int.from_bytes(self.__dpcd.read(0x281, 2).data, 'little')
if j == 1:
result.uncorrectedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 2:
result.correctedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 3:
result.bitErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 4:
result.parityBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 5:
result.parityBitErrors[i] = val & 0x7FFF if val & 0x8000 else None
return result
def clear(self):
"""
Clear all errors.
"""
address = 0x120
dpcd = self.__dpcd.read(address, 1).data[0]
fec_ready = dpcd & ~0xE
dpcd = fec_ready
self.__dpcd.write(address, dpcd)