import warnings import time from typing import Optional from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig from UniTAP.dev.modules.memory_manager import MemoryManager from UniTAP.dev.ports.modules.capturer.bulk.bulk_types import (TriggerPosition, TriggerVarType, _get_trigger_value, EncodingTypeEnum, LaneCountEnum) from UniTAP.dev.ports.modules.capturer.bulk.result_bulk import ResultBulkObject megabyte = 1024 * 1024 class BulkCapturer: """ Class `BulkCapturer` allows working with capturing Bulk data on Sink (RX - receiver) side. You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing `capture_result`. """ def __init__(self, capturer: Capturer, memory_manager: MemoryManager): self.__capturer = capturer self.__memory_manager = memory_manager self.__trigger_position = TriggerPosition.TP_Start self.__result = ResultBulkObject() @property def status(self): """ Returns current bulk capturer status. Returns: object of `VideoCaptureStatus` type """ return self.__capturer.bulk_capturer_status @property def capture_result(self) -> ResultBulkObject: """ Returns result of bulk capturing. Returns: object of `ResultBulkObject` type """ return self.__result @property def encoding_type(self) -> EncodingTypeEnum: """ Returns current encoding type of capturing. Returns: object of `EncodingTypeEnum` type """ return self.__capturer.read_encoding_type() @property def lane_count(self) -> LaneCountEnum: """ Returns current lane count for capturing. Returns: object of `LaneCountEnum` type """ return self.__capturer.read_lane_count() def start(self, bulk_size: int = 1, trigger_position: TriggerPosition = TriggerPosition.TP_Start, trigger_config: Optional[TriggerVarType] = None, assume_scrambler: bool = False, gpio: bool = False, encoding_type: Optional[EncodingTypeEnum] = None, lane_count: Optional[LaneCountEnum] = None): """ Start capturing. All results can be obtained using the function `capture_result`. Args: bulk_size (int) - bulk data size in megabytes trigger_position (`TriggerPosition`) trigger_config (`TriggerVarType`|None) assume_scrambler (bool) gpio (bool) encoding_type (`EncodingTypeEnum`|None) lane_count (`LaneCountEnum`|None) """ self.__result.clear() if trigger_config is None: trigger_position = TriggerPosition.TP_Start capture_config = CaptureConfig() capture_config.video = True capture_config.event = True capture_config.audio = True capture_config.type = CaptureConfig.Type.LIVE self.__capturer.stop_capture(capture_config) self.__capturer.stop_bulk_capture() self.__capturer.clear_bulk_buffer() max_bulk_block_bytes = self.__memory_manager.get_total_memory() - self.__memory_manager.RESERVED_MEMORY_BYTES bulk_block_bytes = max_bulk_block_bytes if bulk_size * megabyte > max_bulk_block_bytes else bulk_size * megabyte self.__memory_manager.set_memory_layout([bulk_block_bytes]) self.__capturer.write_bulk_size(bulk_block_bytes) if encoding_type is None: encoding_type = EncodingTypeEnum.Encoding_Auto self.__capturer.write_encoding_type(encoding_type) if lane_count is None: lane_count = LaneCountEnum.Auto self.__capturer.write_lane_count(lane_count) if self.__capturer.read_bulk_capture_caps().bitGrabWidth: self.__capturer.write_bulk_gpio(gpio) self.__capturer.write_bulk_trigger_position(trigger_position.value) if self.__capturer.read_bulk_capture_caps().triggers: self.__capturer.write_bulk_trigger_settings(*_get_trigger_value(trigger_config, self.__capturer.read_bulk_trigger_caps())) else: warnings.warn("Triggers are not supported.") self.__result.assume_scrambler_disabled = assume_scrambler self.__result.start_capture_time = time.time() self.__capturer.start_bulk_capture() self.__result.buffer.extend(self.__capturer.bulk_capture(bulk_block_bytes, trigger_config)) def stop(self): """ Stop capture video. """ self.__capturer.stop_bulk_capture() self.__result.end_capture_time = time.time()