225 lines
8.2 KiB
Python
225 lines
8.2 KiB
Python
|
|
import time
|
||
|
|
import warnings
|
||
|
|
import copy
|
||
|
|
|
||
|
|
from typing import Union, List
|
||
|
|
|
||
|
|
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
|
||
|
|
from UniTAP.dev.modules.capturer.statuses import VideoCaptureStatus
|
||
|
|
from UniTAP.common import VideoFrame, VideoFrameDSC
|
||
|
|
from .result_video import ResultVideoObject
|
||
|
|
|
||
|
|
|
||
|
|
class VideoCapturer:
|
||
|
|
"""
|
||
|
|
Class `VideoCapturer` allows working with capturing video frames on Sink (RX - receiver) side.
|
||
|
|
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
|
||
|
|
capturing `capture_result`.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, capturer: Capturer, max_stream_number: int):
|
||
|
|
self.__capturer = capturer
|
||
|
|
self.__result = ResultVideoObject()
|
||
|
|
self.__max_stream_number = max_stream_number
|
||
|
|
|
||
|
|
@property
|
||
|
|
def status(self) -> VideoCaptureStatus:
|
||
|
|
"""
|
||
|
|
Returns current video capturer status.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `VideoCaptureStatus` type
|
||
|
|
"""
|
||
|
|
return self.__capturer.video_capturer_status
|
||
|
|
|
||
|
|
@property
|
||
|
|
def capture_result(self) -> ResultVideoObject:
|
||
|
|
"""
|
||
|
|
Returns result of video capturing.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `ResultVideoObject` type
|
||
|
|
"""
|
||
|
|
return self.__result
|
||
|
|
|
||
|
|
@property
|
||
|
|
def max_stream_number(self) -> int:
|
||
|
|
"""
|
||
|
|
Returns max stream number supported for capturing.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `int` type
|
||
|
|
"""
|
||
|
|
return self.__max_stream_number
|
||
|
|
|
||
|
|
def stop(self):
|
||
|
|
"""
|
||
|
|
Stop capture video.
|
||
|
|
"""
|
||
|
|
config = CaptureConfig()
|
||
|
|
config.video = True
|
||
|
|
config.type = CaptureConfig.Type.LIVE
|
||
|
|
|
||
|
|
self.__capturer.stop_capture(config)
|
||
|
|
self.__result.end_capture_time = time.time()
|
||
|
|
|
||
|
|
def pop_element(self) -> Union[VideoFrame, VideoFrameDSC]:
|
||
|
|
"""
|
||
|
|
Return first object of `VideoFrame` or `VideoFrameDSC`.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `VideoFrame` or `VideoFrameDSC` type
|
||
|
|
"""
|
||
|
|
if self.status == VideoCaptureStatus.Idle:
|
||
|
|
warnings.warn("Video capture is not working now. Please, turn it on.")
|
||
|
|
captured_video_frames = self.__capturer.capture_video_by_n_frames(1)
|
||
|
|
self.__result.buffer.extend(copy.deepcopy(captured_video_frames))
|
||
|
|
return captured_video_frames[0]
|
||
|
|
|
||
|
|
def pop_element_as_result_object(self) -> ResultVideoObject:
|
||
|
|
"""
|
||
|
|
Return captured video frame(objects of `VideoFrame` or VideoFrameDSC`) as `ResultVideoObject`.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of `ResultVideoObject` type
|
||
|
|
"""
|
||
|
|
available_frame_count = self.__capturer.get_available_video_frame_count()
|
||
|
|
captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count)
|
||
|
|
|
||
|
|
res = ResultVideoObject()
|
||
|
|
res.buffer.extend(captured_video_frames)
|
||
|
|
return res
|
||
|
|
|
||
|
|
def pop_all_elements(self) -> Union[List[VideoFrame], List[VideoFrameDSC]]:
|
||
|
|
"""
|
||
|
|
Return all captured video frames(objects of `VideoFrame` or `VideoFrameDSC`).
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
object of list[`VideoFrame` or `VideoFrameDSC`] type
|
||
|
|
"""
|
||
|
|
if self.status == VideoCaptureStatus.Idle:
|
||
|
|
warnings.warn("Video capture is not working now. Please, turn it on.")
|
||
|
|
available_frame_count = self.__capturer.get_available_video_frame_count()
|
||
|
|
print(f"Available frames count {available_frame_count}")
|
||
|
|
captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count)
|
||
|
|
self.__result.buffer.extend(copy.deepcopy(captured_video_frames))
|
||
|
|
return captured_video_frames
|
||
|
|
|
||
|
|
def get_crc(self, crc_frame_count: int = 1) -> List[tuple[int, int, int]]:
|
||
|
|
"""
|
||
|
|
Returns captured crc values.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
list[tuple[int, int, int]]
|
||
|
|
"""
|
||
|
|
return self.__capturer.capture_crc(crc_frame_count)
|
||
|
|
|
||
|
|
|
||
|
|
class VideoCapturerDP(VideoCapturer):
|
||
|
|
"""
|
||
|
|
Class `VideoCapturerDP` inherited from class `VideoCapturer` and also allows working with capturing video frames
|
||
|
|
on DP Sink (RX - receiver) side.
|
||
|
|
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
|
||
|
|
capturing `capture_result`.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, capturer: Capturer, max_stream_number: int):
|
||
|
|
super().__init__(capturer, max_stream_number)
|
||
|
|
self.__capturer = capturer
|
||
|
|
|
||
|
|
def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0,
|
||
|
|
capture_type: CaptureConfig.Type = CaptureConfig.Type.LIVE):
|
||
|
|
"""
|
||
|
|
Start capturing. Possible some variants of capturing:
|
||
|
|
- Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped).
|
||
|
|
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
|
||
|
|
- Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and
|
||
|
|
`pop_all_elements`)
|
||
|
|
|
||
|
|
All results can be obtained using the function `capture_result`.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
frames_count (int)
|
||
|
|
sec (int)
|
||
|
|
stream_number (int)
|
||
|
|
capture_type (CaptureConfig.Type)
|
||
|
|
"""
|
||
|
|
|
||
|
|
if stream_number >= self.max_stream_number:
|
||
|
|
assert ValueError(f"Incorrect index of stream. Value must be from available options: "
|
||
|
|
f"{', '.join([str(i) for i in range(self.max_stream_number)])}")
|
||
|
|
|
||
|
|
self.__capturer.set_video_stream_number(stream_number)
|
||
|
|
|
||
|
|
config = CaptureConfig()
|
||
|
|
config.video = True
|
||
|
|
config.type = capture_type
|
||
|
|
|
||
|
|
self.__capturer.start_capture(config)
|
||
|
|
|
||
|
|
self.capture_result.clear()
|
||
|
|
self.capture_result.start_capture_time = time.time()
|
||
|
|
|
||
|
|
if frames_count > 0:
|
||
|
|
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count, capture_type))
|
||
|
|
elif sec > 0:
|
||
|
|
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec))
|
||
|
|
|
||
|
|
|
||
|
|
def get_buffer_capacity(self, stream_number: int = 0):
|
||
|
|
return self.__capturer.get_buffer_capacity(stream_number)
|
||
|
|
|
||
|
|
|
||
|
|
class VideoCapturerHDMI(VideoCapturer):
|
||
|
|
"""
|
||
|
|
Class `VideoCapturerHDMI` inherited from class `VideoCapturer` and also allows working with capturing video frames
|
||
|
|
on HDMI Sink (RX - receiver) side.
|
||
|
|
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
|
||
|
|
capturing `capture_result`.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, capturer: Capturer, max_stream_number: int):
|
||
|
|
super().__init__(capturer, max_stream_number)
|
||
|
|
self.__capturer = capturer
|
||
|
|
|
||
|
|
def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0):
|
||
|
|
"""
|
||
|
|
Start capturing. Possible some variants of capturing:
|
||
|
|
- Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped).
|
||
|
|
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
|
||
|
|
- Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and
|
||
|
|
`pop_all_elements`)
|
||
|
|
|
||
|
|
All results can be obtained using the function `capture_result`.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
frames_count (int)
|
||
|
|
sec (int)
|
||
|
|
stream_number (int)
|
||
|
|
"""
|
||
|
|
|
||
|
|
if stream_number >= self.max_stream_number:
|
||
|
|
assert ValueError(f"Incorrect index of stream. Value must be from available options: "
|
||
|
|
f"{', '.join([str(i) for i in range(self.max_stream_number)])}")
|
||
|
|
|
||
|
|
config = CaptureConfig()
|
||
|
|
config.video = True
|
||
|
|
config.type = CaptureConfig.Type.LIVE
|
||
|
|
|
||
|
|
self.__capturer.start_capture(config)
|
||
|
|
|
||
|
|
self.capture_result.clear()
|
||
|
|
self.capture_result.start_capture_time = time.time()
|
||
|
|
|
||
|
|
if frames_count > 0:
|
||
|
|
if frames_count > 1:
|
||
|
|
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count))
|
||
|
|
else:
|
||
|
|
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count))
|
||
|
|
elif sec > 0:
|
||
|
|
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec))
|
||
|
|
|
||
|
|
|
||
|
|
def get_buffer_capacity(self):
|
||
|
|
return self.__capturer.get_buffer_capacity()
|