Files

206 lines
7.7 KiB
Python
Raw Permalink Normal View History

2026-04-16 16:51:05 +08:00
import time
import warnings
import copy
from typing import Union, TypeVar, Type, List
from UniTAP.libs.lib_tsi.tsi import TSIX_TS_SetPortConfigItem
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.capturer.statuses import EventCaptureStatus
from .result_event import ResultEventObject
from .event_types import EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc, EventData
from UniTAP.version import __version__
EventFilterType = TypeVar("EventFilterType",
EventFilterDpRx,
EventFilterDpTx,
EventFilterHdRx,
EventFilterHdTx,
EventFilterUsbc
)
class EventCapturer:
"""
Class `EventCapturer` allows working with capturing events on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing
`capture_result`.
"""
def __init__(self, capturer: Capturer, port_id: int, event_filter: list):
self.__device = capturer.device
self.__capturer = capturer
self.__status = EventCaptureStatus.Unknown
device_name, serial_number, fw_version, front_end, memory_size = \
capturer.device.get_prepared_fw_info().split("|")
self.__fw_info = {"bundle_version": capturer.device.get_bundle_version(),
"app_version": __version__,
"frontend_version": front_end,
"memory_size": memory_size,
"ucd_device": f"{device_name} [{serial_number.replace(' ','')}]",
"ucd_fw_version": fw_version}
self.__result = ResultEventObject(self.__fw_info)
self.__event_filter = event_filter
self.__port_id = port_id
@property
def status(self) -> EventCaptureStatus:
"""
Returns current event capturer status.
Returns:
object of `VideoCaptureStatus` type
"""
return self.__capturer.event_capturer_status
@property
def capture_result(self) -> ResultEventObject:
"""
Returns result of event capturing.
Returns:
object of `ResultEventObject` type
"""
return self.__result
def event_filter(self, event_filter_type: Type[EventFilterType]) -> EventFilterType:
"""
Returns event filter for current `EventCapturer`.
Returns:
object of one of available [EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx,
EventFilterUsbc] type
"""
for item in self.__event_filter:
if isinstance(item, event_filter_type):
return item
available_variants = '\n'.join([str(type(e)) for e in self.__event_filter])
raise TypeError(f"Unsupported type of EventFilter: {event_filter_type}. Available variants:\n"
f"{available_variants}")
def configure_capturer(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx,
EventFilterUsbc]):
"""
Configure
Args:
event_filter (Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc])
"""
if not self.__check_event_filter_type(event_filter):
available_variants = '\n'.join([str(type(e)) for e in self.__event_filter])
raise TypeError(f"Unsupported type of EventFilter: {type(event_filter)}. "
f"Available variants:\n{available_variants}")
for i, item in enumerate(self.__event_filter):
if isinstance(item, type(event_filter)):
self.__event_filter[i] = event_filter
for item in event_filter.additional_filter:
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item.ci_control, item.value,
data_count=len(item.value) if isinstance(item.value, list) else 1)
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, event_filter._control_ci,
event_filter.config)
def clear_capturer_config(self):
"""
Clear event captuter configuration (filter).
"""
for item in self.__event_filter:
item.clear()
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item._control_ci, 0)
def start(self, sec=0, n_elements=0):
"""
Start capturing. Possible some variants of capturing: \n
- Capture with fixed event count (will be captured fixed event count and capturing will be stopped)
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
- Capture without parameters Live capturing (for getting events you need to use functions `pop_element` and
`pop_all_elements`). Here you need to manually call the `stop` after capture.
All results can be obtained using the function `capture_result`.
Args:
n_elements (int)
sec (int)
"""
config = CaptureConfig()
config.event = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.start_event_capture()
self.__capturer.start_capture(config)
self.__result.clear()
self.__result.start_capture_time = time.time()
if n_elements > 0:
self.__result.buffer.extend(self.__capturer.capture_n_events(n_elements))
self.stop()
elif sec > 0:
time.sleep(sec)
self.stop()
self.__result.buffer.extend(self.__capturer.read_all_events())
def stop(self):
"""
Stop capture events.
"""
config = CaptureConfig()
config.event = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.stop_event_capture()
self.__capturer.stop_capture(config)
self.__result.end_capture_time = time.time()
def pop_element(self) -> EventData:
"""
Return first captured object of `EventData`.
Returns:
object of `EventData` type or `ResultEventObject`
"""
if self.status == EventCaptureStatus.Stop:
warnings.warn("Event capture is not working now. Please, turn it on.")
captured_events = self.__capturer.capture_n_events(1)
self.__result.buffer.extend(copy.deepcopy(captured_events))
return captured_events[0]
def __get_events_count(self) -> int:
return self.__capturer.get_available_events_count()
def __check_event_filter_type(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx,
EventFilterHdTx, EventFilterUsbc]) -> bool:
for item in self.__event_filter:
if isinstance(item, type(event_filter)):
return True
return False
def pop_all_elements(self) -> List[EventData]:
"""
Return all captured event frames(objects of `EventData`).
Returns:
object of list[`EventData`] type
"""
return self.__capturer.read_all_events()
def pop_all_elements_as_result_object(self) -> ResultEventObject:
"""
Return all captured event frames(objects of `EventData`) as `ResultEventObject`.
Returns:
object of `ResultEventObject` type
"""
captured_events = self.__capturer.read_all_events()
res = ResultEventObject(self.__fw_info)
res.buffer.extend(captured_events)
return res