import warnings from typing import Optional from UniTAP.utils import function_scheduler from .pdc_io import PDCPortIO, PDCControlCommand, HWControlCommand from .pdc_types import CableControlOrientation, DifferentialPair class PdcControlsBase: """ Class `PdcControlsBase` allows controlling some PDC commands: - Send PR Swap `send_pr_swap`. - Send DR Swap `send_dr_swap`. - Send FR Swap `send_fr_swap`. - Send VCONN Swap `send_vconn_swap`. - Change cable control orientation `orientation`. - Attach/DeAttach `attach`. - Enable/Disable auto negotiate power contract `enable_auto_negotiate_power_contract`. - Enable/Disable communication capable as PD Sink `communication_capable_as_pd_sink`. - Enable/Disable communication capable as PD Source `communication_capable_as_pd_source`. """ def __init__(self, pdc_io: PDCPortIO): self.__io = pdc_io self.__caps = self.__io.read_pdc_caps() def send_pr_swap(self): """ Send Power Role swap. It will be necessary to manually check that the changes have been made. """ self.__io.write_pdc_command(PDCControlCommand.TogglePowerRole) def send_dr_swap(self): """ Send Dual Role swap. It will be necessary to manually check that the changes have been made. """ self.__io.write_pdc_command(PDCControlCommand.ToggleDataRole) def send_fr_swap(self): """ Send Fast Role swap. It will be necessary to manually check that the changes have been made. """ if self.__caps.fr_swap: self.__io.write_pdc_command(PDCControlCommand.FastRoleSwap) else: warnings.warn("Do not support Fast Role Swap.") def send_vconn_swap(self): """ Send VCONN Role swap. It will be necessary to manually check that the changes have been made. """ self.__io.write_pdc_command(PDCControlCommand.SwapVconn) @property def orientation(self) -> Optional[CableControlOrientation]: if self.__io.read_hw_status().et_cable: if self.__io.read_hw_status().cc1_closed: return CableControlOrientation.CC1 elif self.__io.read_hw_status().cc2_closed: return CableControlOrientation.CC2 else: return None else: return None @orientation.setter def orientation(self, orientation: CableControlOrientation): """ Change cable control orientation. Args: orientation (`CableControlOrientation`) """ if self.__io.read_hw_status().et_cable: self.__io.write_hw_command(HWControlCommand.SetCableFlipToOn if orientation == CableControlOrientation.CC2 else HWControlCommand.SetCableFlipToOff) else: warnings.warn("Unigraf ET cable is not connected. Cannot switch cable control orientation.") def enable_auto_negotiate_power_contract(self, enable: bool): """ Enable/Disable auto negotiate power contract. Args: enable (`bool`) """ caps = self.__io.read_pdc_contract_control() caps.auto_negotate = enable self.__io.write_pdc_contract_control(caps) def communication_capable_as_pd_sink(self, capable: bool): """ Enable/Disable communication capable as PD Sink. Args: capable (`bool`) """ self.enable_auto_negotiate_power_contract(capable) pdo = self.__io.read_local_spr_source_pdo() vdo = self.__io.read_tx_id_vdo() if isinstance(vdo, list): if capable: vdo[0] |= 1 << 30 else: vdo[0] &= ~(1 << 30) self.__io.write_tx_id_vdo(vdo) if isinstance(pdo, list): if capable: pdo[0].data |= 1 << 26 else: pdo[0].data &= ~(1 << 26) self.__io.write_local_spr_source_pdo([v.data for v in pdo]) def communication_capable_as_pd_source(self, capable: bool): """ Enable/Disable communication capable as PD Source. Args: capable (`bool`) """ pdo = self.__io.read_local_spr_source_pdo() vdo = self.__io.read_tx_id_vdo() if isinstance(vdo, list): if capable: vdo[0] |= 1 << 31 else: vdo[0] &= ~(1 << 31) self.__io.write_tx_id_vdo(vdo) if isinstance(pdo, list): if capable: pdo[0].data |= 1 << 26 else: pdo[0].data &= ~(1 << 26) self.__io.write_local_spr_source_pdo([v.data for v in pdo]) class PdcControls340(PdcControlsBase): """ Class `PdcControls340` inherited from class `PdcControlsBase`. Class `PdcControls340` allows controlling additional commands. Also has all the `PdcControlsBase` functionality. - Reconnect device `reconnect`. - Reset device `reset`. - Set ET cable different pairs `et_cable_diff_pairs`. """ def __init__(self, pdc_io: PDCPortIO): super().__init__(pdc_io) self.__io = pdc_io def reset(self): """ Reset device. """ self.__io.write_hw_command(HWControlCommand.ResetPDController) def attach(self, attach: bool): """ Attach/DeAttach device. """ self.__io.write_hw_command(HWControlCommand.ConnectCCLines if attach else HWControlCommand.DisconnectCCLines) def reconnect(self): """ Reconnect device. Returns: result of reconnection, type `bool`. """ self.__io.write_hw_command(HWControlCommand.RePlugCable) def is_reconnect_success(__io): return self.__io.read_pdc_status().cable_plug != 0 return function_scheduler(is_reconnect_success, self.__io, interval=1, timeout=10) def et_cable_diff_pairs(self, differential_pair: DifferentialPair): """ Set ET cable different pairs. Args: differential_pair (`DifferentialPair`) """ self.__io.write_hw_command(HWControlCommand.SetNewETCableMode if differential_pair.OnePair else HWControlCommand.SetOldETCableMode) class PdcControls424(PdcControlsBase): """ Class `PdcControls424` inherited from class `PdcControlsBase`. Class `PdcControls424` allows controlling additional commands. Also has all the `PdcControlsBase` functionality. - Reconnect device `reconnect`. - Reset device `reset`. """ def __init__(self, pdc_io: PDCPortIO): super().__init__(pdc_io) self.__io = pdc_io def reset(self): """ Reset device. """ self.__io.write_hw_command(HWControlCommand.ResetPDController) def reconnect(self) -> bool: """ Reconnect device. Returns: result of reconnection, type `bool`. """ self.__io.write_pdc_command(PDCControlCommand.SimulateReConnect) def is_reconnect_success(__io): return self.__io.read_pdc_status().cable_plug != 0 return function_scheduler(is_reconnect_success, self.__io, interval=1, timeout=10) class PdcControls500(PdcControlsBase): """ Class `PdcControls500` inherited from class `PdcControlsBase`. Class `PdcControls500` allows controlling additional commands. Also has all the `PdcControlsBase` functionality. - Reconnect device `reconnect`. - Enable/Disable internal resistance 10 Ohm `enable_internal_load_10_ohm`. """ def __init__(self, pdc_io: PDCPortIO): super().__init__(pdc_io) self.__io = pdc_io def reconnect(self) -> bool: """ Reconnect device. Returns: result of reconnection, type `bool`. """ self.__io.write_hw_command(HWControlCommand.RePlugCable) def is_reconnect_success(__io): return self.__io.read_pdc_status().cable_plug != 0 return function_scheduler(is_reconnect_success, self.__io, interval=1, timeout=10) def attach(self, attach: bool): """ Attach/DeAttach device. """ self.__io.write_hw_command(HWControlCommand.ConnectCCLines if attach else HWControlCommand.DisconnectCCLines) def enable_internal_load_10_ohm(self, enable: bool): """ Enable/Disable internal resistance 10 Ohm. Args: enable (`bool`) """ self.__io.write_hw_command(HWControlCommand.EnableIntLoad_10_ohm if enable else HWControlCommand.DisableIntLoad_10_ohm)