import time import warnings import copy from typing import Union, List from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig from UniTAP.dev.modules.capturer.statuses import VideoCaptureStatus from UniTAP.common import VideoFrame, VideoFrameDSC from .result_video import ResultVideoObject class VideoCapturer: """ Class `VideoCapturer` allows working with capturing video frames on Sink (RX - receiver) side. You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing `capture_result`. """ def __init__(self, capturer: Capturer, max_stream_number: int): self.__capturer = capturer self.__result = ResultVideoObject() self.__max_stream_number = max_stream_number @property def status(self) -> VideoCaptureStatus: """ Returns current video capturer status. Returns: object of `VideoCaptureStatus` type """ return self.__capturer.video_capturer_status @property def capture_result(self) -> ResultVideoObject: """ Returns result of video capturing. Returns: object of `ResultVideoObject` type """ return self.__result @property def max_stream_number(self) -> int: """ Returns max stream number supported for capturing. Returns: object of `int` type """ return self.__max_stream_number def stop(self): """ Stop capture video. """ config = CaptureConfig() config.video = True config.type = CaptureConfig.Type.LIVE self.__capturer.stop_capture(config) self.__result.end_capture_time = time.time() def pop_element(self) -> Union[VideoFrame, VideoFrameDSC]: """ Return first object of `VideoFrame` or `VideoFrameDSC`. Returns: object of `VideoFrame` or `VideoFrameDSC` type """ if self.status == VideoCaptureStatus.Idle: warnings.warn("Video capture is not working now. Please, turn it on.") captured_video_frames = self.__capturer.capture_video_by_n_frames(1) self.__result.buffer.extend(copy.deepcopy(captured_video_frames)) return captured_video_frames[0] def pop_element_as_result_object(self) -> ResultVideoObject: """ Return captured video frame(objects of `VideoFrame` or VideoFrameDSC`) as `ResultVideoObject`. Returns: object of `ResultVideoObject` type """ available_frame_count = self.__capturer.get_available_video_frame_count() captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count) res = ResultVideoObject() res.buffer.extend(captured_video_frames) return res def pop_all_elements(self) -> Union[List[VideoFrame], List[VideoFrameDSC]]: """ Return all captured video frames(objects of `VideoFrame` or `VideoFrameDSC`). Returns: object of list[`VideoFrame` or `VideoFrameDSC`] type """ if self.status == VideoCaptureStatus.Idle: warnings.warn("Video capture is not working now. Please, turn it on.") available_frame_count = self.__capturer.get_available_video_frame_count() print(f"Available frames count {available_frame_count}") captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count) self.__result.buffer.extend(copy.deepcopy(captured_video_frames)) return captured_video_frames def get_crc(self, crc_frame_count: int = 1) -> List[tuple[int, int, int]]: """ Returns captured crc values. Returns: list[tuple[int, int, int]] """ return self.__capturer.capture_crc(crc_frame_count) class VideoCapturerDP(VideoCapturer): """ Class `VideoCapturerDP` inherited from class `VideoCapturer` and also allows working with capturing video frames on DP Sink (RX - receiver) side. You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing `capture_result`. """ def __init__(self, capturer: Capturer, max_stream_number: int): super().__init__(capturer, max_stream_number) self.__capturer = capturer def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0, capture_type: CaptureConfig.Type = CaptureConfig.Type.LIVE): """ Start capturing. Possible some variants of capturing: - Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped). - Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped). - Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and `pop_all_elements`) All results can be obtained using the function `capture_result`. Args: frames_count (int) sec (int) stream_number (int) capture_type (CaptureConfig.Type) """ if stream_number >= self.max_stream_number: assert ValueError(f"Incorrect index of stream. Value must be from available options: " f"{', '.join([str(i) for i in range(self.max_stream_number)])}") self.__capturer.set_video_stream_number(stream_number) config = CaptureConfig() config.video = True config.type = capture_type self.__capturer.start_capture(config) self.capture_result.clear() self.capture_result.start_capture_time = time.time() if frames_count > 0: self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count, capture_type)) elif sec > 0: self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec)) def get_buffer_capacity(self, stream_number: int = 0): return self.__capturer.get_buffer_capacity(stream_number) class VideoCapturerHDMI(VideoCapturer): """ Class `VideoCapturerHDMI` inherited from class `VideoCapturer` and also allows working with capturing video frames on HDMI Sink (RX - receiver) side. You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing `capture_result`. """ def __init__(self, capturer: Capturer, max_stream_number: int): super().__init__(capturer, max_stream_number) self.__capturer = capturer def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0): """ Start capturing. Possible some variants of capturing: - Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped). - Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped). - Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and `pop_all_elements`) All results can be obtained using the function `capture_result`. Args: frames_count (int) sec (int) stream_number (int) """ if stream_number >= self.max_stream_number: assert ValueError(f"Incorrect index of stream. Value must be from available options: " f"{', '.join([str(i) for i in range(self.max_stream_number)])}") config = CaptureConfig() config.video = True config.type = CaptureConfig.Type.LIVE self.__capturer.start_capture(config) self.capture_result.clear() self.capture_result.start_capture_time = time.time() if frames_count > 0: if frames_count > 1: self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count)) else: self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count)) elif sec > 0: self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec)) def get_buffer_capacity(self): return self.__capturer.get_buffer_capacity()