Files
xinzhu.yin c157e774e5 1.1.0版本
2026-04-16 16:51:05 +08:00

132 lines
3.9 KiB
Python

from UniTAP.libs.lib_tsi.tsi import *
from UniTAP.libs.lib_tsi.tsi_io import PortIO
from .fec_shared import FECCounters
from UniTAP.dev.ports.modules.dpcd.dpcd import DPCDRegisters
class FecRx:
"""
Class `FecRx` allows working with FEC functionality from Sink (RX - receiver) side. You can:
- Check capable FEC or not `is_capable`.
- Check enabled FEC or not `is_enabled`.
- Enable/Disable FEC `enable`.
- Enable/Disable calculating sum of errors `aggregate_errors`.
- Get error counters `get_error_counters`.
- Clear all errors `clear`.
"""
def __init__(self, port_io: PortIO, dpcd: DPCDRegisters):
self.__io = port_io
self.__dpcd = dpcd
self.__aggregate_error = 0
def is_enabled(self) -> bool:
"""
Returns status of FEC, is enabled or not.
Returns:
object of `bool` type.
"""
result = self.__io.get(TSI_DPRX_FEC_STATUS_R, c_int)
status_fec = ((result[1] & 0x1) != 0)
return bool(status_fec)
def is_capable(self) -> bool:
"""
Returns status of FEC, is capable or not.
Returns:
object of `bool` type.
"""
result = self.__io.get(TSI_DPRX_FEC_CTRL, c_int)
enabled_fec = (result[1] & 0x1) != 0
return enabled_fec
def enable(self, enable: bool):
"""
Enable/Disable FEC.
Args:
enable (bool) - enable (True) or disable (False)
"""
val = 0x1 if enable else 0x0
self.__io.set(TSI_DPRX_FEC_CTRL, val)
def aggregate_errors(self, enable: bool):
"""
Enable/Disable calculating sum of errors.
Args:
enable (bool) - enable (True) or disable (False)
"""
result = self.__io.get(TSI_DPRX_FEC_CONTROL, c_int)
val = result[1]
if enable:
val |= 0x2
self.__aggregate_error = 1
else:
val &= ~0x2
self.__aggregate_error = 0
self.__io.set(TSI_DPRX_FEC_CONTROL, val)
def get_error_counters(self) -> FECCounters:
"""
Get current error counters.
Returns:
object of `FECCounters` type
"""
result = FECCounters()
lane_count = self.__io.get(TSI_R_DPRX_LINK_LANE_COUNT, c_int)[1]
if lane_count == 4:
lane_count += 1 if self.__aggregate_error else 0
dpcd = self.__dpcd.read(0x120, 1).data[0]
dpcdaggr = bool(dpcd & (1 << 6))
for i in range(lane_count):
if i == 0 and dpcdaggr:
dpcd &= ~(1 << 6)
if i == 4 and dpcdaggr:
dpcd |= 1 << 6
elif i == 4:
continue
if i < 4:
dpcd &= ~(0x3 << 4)
dpcd |= (i << 4)
for j in range(1, 6):
dpcd &= ~(0x7 << 1)
v = dpcd | (j << 1)
self.__dpcd.write(0x120, v)
val = int.from_bytes(self.__dpcd.read(0x281, 2).data, 'little')
if j == 1:
result.uncorrectedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 2:
result.correctedBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 3:
result.bitErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 4:
result.parityBlockErrors[i] = val & 0x7FFF if val & 0x8000 else None
elif j == 5:
result.parityBitErrors[i] = val & 0x7FFF if val & 0x8000 else None
return result
def clear(self):
"""
Clear all errors.
"""
address = 0x120
dpcd = self.__dpcd.read(address, 1).data[0]
old_fec = dpcd
fec_ready = dpcd & ~0xE
dpcd = fec_ready
self.__dpcd.write(address, dpcd)
dpcd = old_fec
self.__dpcd.write(address, dpcd)