Files

133 lines
4.7 KiB
Python
Raw Permalink Normal View History

2026-04-16 16:51:05 +08:00
import warnings
import time
from typing import Optional
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.memory_manager import MemoryManager
from UniTAP.dev.ports.modules.capturer.bulk.bulk_types import (TriggerPosition, TriggerVarType, _get_trigger_value,
EncodingTypeEnum, LaneCountEnum)
from UniTAP.dev.ports.modules.capturer.bulk.result_bulk import ResultBulkObject
megabyte = 1024 * 1024
class BulkCapturer:
"""
Class `BulkCapturer` allows working with capturing Bulk data on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
capturing `capture_result`.
"""
def __init__(self, capturer: Capturer, memory_manager: MemoryManager):
self.__capturer = capturer
self.__memory_manager = memory_manager
self.__trigger_position = TriggerPosition.TP_Start
self.__result = ResultBulkObject()
@property
def status(self):
"""
Returns current bulk capturer status.
Returns:
object of `VideoCaptureStatus` type
"""
return self.__capturer.bulk_capturer_status
@property
def capture_result(self) -> ResultBulkObject:
"""
Returns result of bulk capturing.
Returns:
object of `ResultBulkObject` type
"""
return self.__result
@property
def encoding_type(self) -> EncodingTypeEnum:
"""
Returns current encoding type of capturing.
Returns:
object of `EncodingTypeEnum` type
"""
return self.__capturer.read_encoding_type()
@property
def lane_count(self) -> LaneCountEnum:
"""
Returns current lane count for capturing.
Returns:
object of `LaneCountEnum` type
"""
return self.__capturer.read_lane_count()
def start(self, bulk_size: int = 1, trigger_position: TriggerPosition = TriggerPosition.TP_Start,
trigger_config: Optional[TriggerVarType] = None, assume_scrambler: bool = False, gpio: bool = False,
encoding_type: Optional[EncodingTypeEnum] = None, lane_count: Optional[LaneCountEnum] = None):
"""
Start capturing. All results can be obtained using the function `capture_result`.
Args:
bulk_size (int) - bulk data size in megabytes
trigger_position (`TriggerPosition`)
trigger_config (`TriggerVarType`|None)
assume_scrambler (bool)
gpio (bool)
encoding_type (`EncodingTypeEnum`|None)
lane_count (`LaneCountEnum`|None)
"""
self.__result.clear()
if trigger_config is None:
trigger_position = TriggerPosition.TP_Start
capture_config = CaptureConfig()
capture_config.video = True
capture_config.event = True
capture_config.audio = True
capture_config.type = CaptureConfig.Type.LIVE
self.__capturer.stop_capture(capture_config)
self.__capturer.stop_bulk_capture()
self.__capturer.clear_bulk_buffer()
max_bulk_block_bytes = self.__memory_manager.get_total_memory() - self.__memory_manager.RESERVED_MEMORY_BYTES
bulk_block_bytes = max_bulk_block_bytes if bulk_size * megabyte > max_bulk_block_bytes else bulk_size * megabyte
self.__memory_manager.set_memory_layout([bulk_block_bytes])
self.__capturer.write_bulk_size(bulk_block_bytes)
if encoding_type is None:
encoding_type = EncodingTypeEnum.Encoding_Auto
self.__capturer.write_encoding_type(encoding_type)
if lane_count is None:
lane_count = LaneCountEnum.Auto
self.__capturer.write_lane_count(lane_count)
if self.__capturer.read_bulk_capture_caps().bitGrabWidth:
self.__capturer.write_bulk_gpio(gpio)
self.__capturer.write_bulk_trigger_position(trigger_position.value)
if self.__capturer.read_bulk_capture_caps().triggers:
self.__capturer.write_bulk_trigger_settings(*_get_trigger_value(trigger_config,
self.__capturer.read_bulk_trigger_caps()))
else:
warnings.warn("Triggers are not supported.")
self.__result.assume_scrambler_disabled = assume_scrambler
self.__result.start_capture_time = time.time()
self.__capturer.start_bulk_capture()
self.__result.buffer.extend(self.__capturer.bulk_capture(bulk_block_bytes, trigger_config))
def stop(self):
"""
Stop capture video.
"""
self.__capturer.stop_bulk_capture()
self.__result.end_capture_time = time.time()