145 lines
4.7 KiB
Python
145 lines
4.7 KiB
Python
|
|
import copy
|
||
|
|
|
||
|
|
from .pdc_io import PDCPortIO
|
||
|
|
from .pdc_utils import load_pdo, save_pdo
|
||
|
|
from .pdo_types import FixedPdoSink, BatteryPdo, VariablePdo, PdoTypeEnum, PdoSide
|
||
|
|
from .pdo import Pdo
|
||
|
|
from .pdo_utils import get_pdo_value
|
||
|
|
from typing import List
|
||
|
|
|
||
|
|
|
||
|
|
class PowerSink:
|
||
|
|
|
||
|
|
"""
|
||
|
|
Class `PowerSink` contains information about PDO's on Sink side.
|
||
|
|
- Get PDO count `pdo_count`, type `int`.
|
||
|
|
- Set and get PDO list, `set_pdo_list`, `get_pdo_list`, type `list` with `Pdo`.
|
||
|
|
- Set and get PDO by index, `set_pdo_by_index`, `get_pdo_by_index`, type `Pdo`.
|
||
|
|
- Save information about PDO's to file, `save_pdo`.
|
||
|
|
- Load information about PDO's from file, `load_pdo`.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, pdc_io: PDCPortIO):
|
||
|
|
self.__io = pdc_io
|
||
|
|
self.__pdo_count = self.__io.read_hw_caps().pdo_count
|
||
|
|
self.__pdo_list = self.__read_pdo()
|
||
|
|
|
||
|
|
def __read_pdo(self) -> List[Pdo]:
|
||
|
|
pdo_list = []
|
||
|
|
values = self.__io.read_local_spr_sink_pdo()
|
||
|
|
for i in range(self.__pdo_count):
|
||
|
|
if PdoTypeEnum(values[i].brief_info.pdo_type) == PdoTypeEnum.Fixed and values[i].data > 0:
|
||
|
|
pdo_list.append(Pdo(pdo=FixedPdoSink(values[i].fixed_pdo_sink, disable=False), side=PdoSide.Sink))
|
||
|
|
elif PdoTypeEnum(values[i].brief_info.pdo_type) == PdoTypeEnum.Fixed and values[i].data == 0:
|
||
|
|
pdo_list.append(Pdo(pdo=FixedPdoSink(values[i].fixed_pdo_sink, disable=True), side=PdoSide.Sink))
|
||
|
|
elif PdoTypeEnum(values[i].brief_info.pdo_type) == PdoTypeEnum.Battery:
|
||
|
|
pdo_list.append(Pdo(pdo=BatteryPdo(values[i].battery_pdo), side=PdoSide.Sink))
|
||
|
|
elif PdoTypeEnum(values[i].brief_info.pdo_type) == PdoTypeEnum.Variable:
|
||
|
|
pdo_list.append(Pdo(pdo=VariablePdo(values[i].variable_pdo), side=PdoSide.Sink))
|
||
|
|
return pdo_list
|
||
|
|
|
||
|
|
def __write_pdo(self) -> List[int]:
|
||
|
|
pdo_list = []
|
||
|
|
for index, item in enumerate(self.__pdo_list):
|
||
|
|
if index == 0:
|
||
|
|
pdo_list.append(get_pdo_value(item.get_pdo_as_selected_type(FixedPdoSink)))
|
||
|
|
elif item.pdo_type != PdoTypeEnum.Disabled:
|
||
|
|
pdo_list.append(get_pdo_value(item.pdo_object))
|
||
|
|
else:
|
||
|
|
pdo_list.append(0)
|
||
|
|
return pdo_list
|
||
|
|
|
||
|
|
@property
|
||
|
|
def pdo_count(self) -> int:
|
||
|
|
"""
|
||
|
|
Returns current pdo count.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `int` type
|
||
|
|
"""
|
||
|
|
return self.__pdo_count
|
||
|
|
|
||
|
|
def set_pdo_list(self, pdo_list: List[Pdo]):
|
||
|
|
"""
|
||
|
|
Set new Pdo list.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
pdo_list (`list` with 'Pdo')
|
||
|
|
"""
|
||
|
|
self.__pdo_list = copy.deepcopy(pdo_list)
|
||
|
|
self.__io.write_local_spr_sink_pdo(self.__write_pdo())
|
||
|
|
|
||
|
|
def set_pdo_by_index(self, pdo_object, index: int):
|
||
|
|
"""
|
||
|
|
Set new Pdo by index.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
pdo_object ('Pdo')
|
||
|
|
index (`int`)
|
||
|
|
"""
|
||
|
|
if not 0 <= index < self.__pdo_count:
|
||
|
|
raise ValueError(f"Incorrect PDO index. Supported PDO amount: {self.__pdo_count}")
|
||
|
|
self.__pdo_list[index] = copy.deepcopy(pdo_object)
|
||
|
|
self.__io.write_local_spr_sink_pdo(self.__write_pdo())
|
||
|
|
|
||
|
|
def get_pdo_list(self, read_from_device: bool = False) -> List[Pdo]:
|
||
|
|
"""
|
||
|
|
Returns current pdo list.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
read_from_device (`bool`)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `list` type with `Pdo`.
|
||
|
|
"""
|
||
|
|
if read_from_device:
|
||
|
|
self.__pdo_list = self.__read_pdo()
|
||
|
|
return copy.deepcopy(self.__pdo_list)
|
||
|
|
|
||
|
|
def get_pdo_by_index(self, index: int, read_from_device: bool = False) -> Pdo:
|
||
|
|
"""
|
||
|
|
Returns current pdo by index.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
index (`int`)
|
||
|
|
read_from_device (`bool`)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `Pdo` type.
|
||
|
|
"""
|
||
|
|
if read_from_device:
|
||
|
|
self.__pdo_list = self.__read_pdo()
|
||
|
|
if not 0 <= index < self.__pdo_count:
|
||
|
|
raise ValueError(f"Incorrect PDO index. Supported PDO amount: {self.__pdo_count}")
|
||
|
|
return copy.deepcopy(self.__pdo_list[index])
|
||
|
|
|
||
|
|
def save_pdo(self, path: str):
|
||
|
|
"""
|
||
|
|
Save information about PDO's to file.
|
||
|
|
Supported formats:
|
||
|
|
- txt
|
||
|
|
- json
|
||
|
|
|
||
|
|
Args:
|
||
|
|
path (`str`)
|
||
|
|
"""
|
||
|
|
save_pdo(path, self.__pdo_list, False)
|
||
|
|
|
||
|
|
def load_pdo(self, path: str):
|
||
|
|
"""
|
||
|
|
Load information about PDO's from file.
|
||
|
|
Supported formats:
|
||
|
|
- txt
|
||
|
|
- json
|
||
|
|
|
||
|
|
Args:
|
||
|
|
path (`str`)
|
||
|
|
"""
|
||
|
|
self.__pdo_list = load_pdo(path, False)
|
||
|
|
|
||
|
|
def __str__(self) -> str:
|
||
|
|
pdo_status = ""
|
||
|
|
for index, item in enumerate(self.__pdo_list):
|
||
|
|
pdo_status += f"PDO {index}\n{item.pdo_object.__str__()}\n"
|
||
|
|
return f"Power Sink\n{pdo_status}\n"
|