import time import warnings import copy from typing import Union, TypeVar, Type, List from UniTAP.libs.lib_tsi.tsi import TSIX_TS_SetPortConfigItem from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig from UniTAP.dev.modules.capturer.statuses import EventCaptureStatus from .result_event import ResultEventObject from .event_types import EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc, EventData from UniTAP.version import __version__ EventFilterType = TypeVar("EventFilterType", EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc ) class EventCapturer: """ Class `EventCapturer` allows working with capturing events on Sink (RX - receiver) side. You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing `capture_result`. """ def __init__(self, capturer: Capturer, port_id: int, event_filter: list): self.__device = capturer.device self.__capturer = capturer self.__status = EventCaptureStatus.Unknown device_name, serial_number, fw_version, front_end, memory_size = \ capturer.device.get_prepared_fw_info().split("|") self.__fw_info = {"bundle_version": capturer.device.get_bundle_version(), "app_version": __version__, "frontend_version": front_end, "memory_size": memory_size, "ucd_device": f"{device_name} [{serial_number.replace(' ','')}]", "ucd_fw_version": fw_version} self.__result = ResultEventObject(self.__fw_info) self.__event_filter = event_filter self.__port_id = port_id @property def status(self) -> EventCaptureStatus: """ Returns current event capturer status. Returns: object of `VideoCaptureStatus` type """ return self.__capturer.event_capturer_status @property def capture_result(self) -> ResultEventObject: """ Returns result of event capturing. Returns: object of `ResultEventObject` type """ return self.__result def event_filter(self, event_filter_type: Type[EventFilterType]) -> EventFilterType: """ Returns event filter for current `EventCapturer`. Returns: object of one of available [EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc] type """ for item in self.__event_filter: if isinstance(item, event_filter_type): return item available_variants = '\n'.join([str(type(e)) for e in self.__event_filter]) raise TypeError(f"Unsupported type of EventFilter: {event_filter_type}. Available variants:\n" f"{available_variants}") def configure_capturer(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc]): """ Configure Args: event_filter (Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc]) """ if not self.__check_event_filter_type(event_filter): available_variants = '\n'.join([str(type(e)) for e in self.__event_filter]) raise TypeError(f"Unsupported type of EventFilter: {type(event_filter)}. " f"Available variants:\n{available_variants}") for i, item in enumerate(self.__event_filter): if isinstance(item, type(event_filter)): self.__event_filter[i] = event_filter for item in event_filter.additional_filter: TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item.ci_control, item.value, data_count=len(item.value) if isinstance(item.value, list) else 1) TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, event_filter._control_ci, event_filter.config) def clear_capturer_config(self): """ Clear event captuter configuration (filter). """ for item in self.__event_filter: item.clear() TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item._control_ci, 0) def start(self, sec=0, n_elements=0): """ Start capturing. Possible some variants of capturing: \n - Capture with fixed event count (will be captured fixed event count and capturing will be stopped) - Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped). - Capture without parameters Live capturing (for getting events you need to use functions `pop_element` and `pop_all_elements`). Here you need to manually call the `stop` after capture. All results can be obtained using the function `capture_result`. Args: n_elements (int) sec (int) """ config = CaptureConfig() config.event = True config.type = CaptureConfig.Type.LIVE self.__capturer.start_event_capture() self.__capturer.start_capture(config) self.__result.clear() self.__result.start_capture_time = time.time() if n_elements > 0: self.__result.buffer.extend(self.__capturer.capture_n_events(n_elements)) self.stop() elif sec > 0: time.sleep(sec) self.stop() self.__result.buffer.extend(self.__capturer.read_all_events()) def stop(self): """ Stop capture events. """ config = CaptureConfig() config.event = True config.type = CaptureConfig.Type.LIVE self.__capturer.stop_event_capture() self.__capturer.stop_capture(config) self.__result.end_capture_time = time.time() def pop_element(self) -> EventData: """ Return first captured object of `EventData`. Returns: object of `EventData` type or `ResultEventObject` """ if self.status == EventCaptureStatus.Stop: warnings.warn("Event capture is not working now. Please, turn it on.") captured_events = self.__capturer.capture_n_events(1) self.__result.buffer.extend(copy.deepcopy(captured_events)) return captured_events[0] def __get_events_count(self) -> int: return self.__capturer.get_available_events_count() def __check_event_filter_type(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc]) -> bool: for item in self.__event_filter: if isinstance(item, type(event_filter)): return True return False def pop_all_elements(self) -> List[EventData]: """ Return all captured event frames(objects of `EventData`). Returns: object of list[`EventData`] type """ return self.__capturer.read_all_events() def pop_all_elements_as_result_object(self) -> ResultEventObject: """ Return all captured event frames(objects of `EventData`) as `ResultEventObject`. Returns: object of `ResultEventObject` type """ captured_events = self.__capturer.read_all_events() res = ResultEventObject(self.__fw_info) res.buffer.extend(captured_events) return res