import time from typing import Union from UniTAP.libs.lib_tsi.tsi_io import DeviceIO from UniTAP.libs.lib_tsi.tsi import * from UniTAP.dev.modules.capturer.statuses import CaptureStatus, AudioCaptureStatus, EventCaptureStatus, \ VideoCaptureStatus, BulkCaptureStatus from UniTAP.common import AudioFrameData, VideoFrameDSC, create_from_pps from UniTAP.dev.ports.modules.capturer.event.event_types import EventData from threading import Lock from UniTAP.utils import function_scheduler from UniTAP.dev.ports.modules.capturer.bulk.private_bulk_types import * from UniTAP.dev.ports.modules.capturer.bulk.bulk_types import * from .types import * TIMEOUT = 10 class CaptureConfig: class Type(IntEnum): NONE = -1 LIVE = 0 BUFFERED = 1 def __init__(self): self.audio = False self.video = False self.event = False self.type = CaptureConfig.Type.NONE self.frame_count = 0 def from_int(self, value: int): self.video = bool(value & 1 << 3) self.audio = bool(value & 1 << 4) self.event = bool(value & 1 << 5) self.type = CaptureConfig.Type.LIVE if value & (1 << 2) else CaptureConfig.Type.BUFFERED self.frame_count = value >> 8 def to_int(self) -> int: value = 0 if self.video: value |= (1 << 3) if self.audio: value |= (1 << 4) if self.event: value |= (1 << 5) if self.video or self.audio: if self.type == CaptureConfig.Type.LIVE: value |= (1 << 2) else: value |= (self.frame_count << 8) return value class Capturer: def __init__(self, port_io: DeviceIO): self.__io = port_io self.__config = CaptureConfig() self.__status = CaptureStatus.Unknown self.__mutex = Lock() @property def config(self): return self.__config @property def status(self): return self.__status @status.setter def status(self, value: CaptureStatus): self.__status = value @property def device(self): return self.__io @property def video_capturer_status(self) -> VideoCaptureStatus: return VideoCaptureStatus(self.__io.get(TSI_VIDCAP_CAPTURE_STATUS_R, c_int)[1]) @property def audio_capturer_status(self) -> AudioCaptureStatus: return AudioCaptureStatus(self.__io.get(TSI_R_AUDCAP_STATUS, c_int)[1]) @property def event_capturer_status(self) -> EventCaptureStatus: return EventCaptureStatus(self.__io.get(TSI_EVCAP_CTRL, c_uint32)[1] & 0xFF) @property def bulk_capturer_status(self) -> BulkCaptureStatus: return BulkCaptureStatus(self.__io.get(TSI_BULK_CAPTURE_STATUS_R, c_uint32)[1] & 0x3) def start_capture(self, config: CaptureConfig): self.__mutex.acquire() self.__config.from_int(self.__current_config().to_int() | config.to_int()) self.__io.set(TSI_CAP_CONFIG, self.__config.to_int()) if self.__io.set(TSI_W_CAP_COMMAND, 1) == 0: self.__status = CaptureStatus.Running else: self.__status = CaptureStatus.Unknown self.__mutex.release() def stop_capture(self, config: CaptureConfig): self.__mutex.acquire() self.__config.from_int(self.__current_config().to_int() & ~config.to_int()) self.__io.set(TSI_CAP_CONFIG, self.__config.to_int()) if self.__io.set(TSI_W_CAP_COMMAND, 2) == 0: self.__status = CaptureStatus.Stop else: self.__status = CaptureStatus.Unknown self.__mutex.release() def __capture_events(self, event_count=0): event = EventData() if event_count > 0: event_size = self.__io.get(TSI_R_EVCAP_DATA, None, 0)[0] if event_size > 0: event.data = bytearray(self.__io.get(TSI_R_EVCAP_DATA, c_ubyte, event_size)[1]) return event def get_buffer_capacity(self, stream_number: int = None): if stream_number is not None: self.__io.set(TSI_DPRX_STREAM_SELECT, stream_number, c_uint32) self.__io.set(TSI_DPRX_MSA_COMMAND_W, 2, c_uint32) width = self.__io.get(TSI_R_INPUT_WIDTH , c_uint32)[1] height = self.__io.get(TSI_R_INPUT_HEIGHT , c_uint32)[1] total_memory_bytes = self.__io.get(TSI_MEMORY_SIZE_R, c_uint64)[1] bpc = self.__io.get(TSI_INPUT_COLOR_DEPTH_R , c_uint32)[1] color_format = self.__io.get(TSI_INPUT_COLOR_MODE_R , c_uint32)[1] bpp = self._get_bits_per_pixel(bpc, color_format) line_alignment = 1023 pixel_size = self._get_pixel_size(color_format, bpp) line_size = width * pixel_size line_pitch = self._align(line_size, line_alignment) one_frame_size_bytes = (height + 1) * line_pitch if one_frame_size_bytes == 0: return 0 else: return int(total_memory_bytes / one_frame_size_bytes) @staticmethod def _get_bits_per_pixel(bpc, color_format) -> int: if color_format in [0, 1, 6]: return 0 elif color_format in [2, 4, 9]: return 3 * bpc elif color_format == 3: return 2 * bpc elif color_format == 5: return 3 * bpc / 2 elif color_format == 7: return bpc elif color_format == 8: return bpc @staticmethod def _get_pixel_size(color_format, bpp): if color_format == 5: return 4 if bpp >= 18 else 2 elif color_format in [7, 8]: return 6 if bpp > 10 else 4 else: return 6 if bpp > 32 else 4 @staticmethod def _align(value, alignment): return (value + alignment) & ~alignment def get_available_events_count(self) -> int: return self.__io.get(TSI_R_EVCAP_COUNT, c_uint32)[1] def capture_n_events(self, events_count: int): if events_count <= 0: raise ValueError(f"Events count must be more than 0.") buffer = [] def is_enough_events(): return self.get_available_events_count() >= events_count function_scheduler(is_enough_events, interval=1, timeout=TIMEOUT) for i in range(events_count): buffer.append(self.__capture_events(self.get_available_events_count())) return buffer def read_all_events(self): buffer = [] while self.get_available_events_count() > 0: buffer.append(self.__capture_events(self.get_available_events_count())) return buffer def __capture_audio(self, m_sec=1000): audio_frame = AudioFrameData() audio_frame.channel_count = self.__io.get(TSI_R_AUDCAP_CHANNEL_COUNT, c_int)[1] audio_frame.sample_size = self.__io.get(TSI_R_AUDCAP_SAMPLE_SIZE, c_int)[1] * 8 audio_frame.sample_rate = self.__io.get(TSI_R_AUDCAP_SAMPLE_RATE, c_int)[1] audio_frame.timestamp = self.__io.get(TSI_R_AUDCAP_TIMESTAMP, c_uint64)[1] audio_frame.samples = self.__io.get(TSI_R_AUDCAP_SAMPLE_COUNT, c_int)[1] audio_frame.frame_counter = self.__io.get(TSI_R_AUDCAP_FRAME_COUNTER, c_int)[1] audio_frame.sample_format = self.__io.get(TSI_R_AUDCAP_SAMPLE_FORMAT, c_int)[1] min_buff_size = self.__io.get(TSI_R_AUDCAP_MIN_BUFFER_SIZE, c_uint32)[1] audio_frame.data = self.__io.get(TSI_R_AUDCAP_SAMPLE_DATA, c_uint8, min_buff_size)[1] m_sec -= 1000 * len(audio_frame.data) / 2 / audio_frame.channel_count / audio_frame.sample_rate return audio_frame, m_sec def capture_audio_by_n_frames(self, frames_count: int, timeout: int = None): if frames_count <= 0: raise ValueError(f"Frames count must be more than 0.") buffer = [] time_break = False timeout = timeout if timeout is not None else TIMEOUT while not time_break and len(buffer) < frames_count: captured = 0 start_time = time.time() while captured < 10: status = self.audio_capturer_status current_time = time.time() if current_time - start_time > timeout: time_break = True break if status == AudioCaptureStatus.Stop: continue audio_frame, m_sec = self.__capture_audio() if len(audio_frame.data) > 0: captured += 1 buffer.append(audio_frame) return buffer def capture_audio_by_m_sec(self, m_sec: int): if m_sec <= 0: raise ValueError(f"Seconds count must be more than 0.") buffer = [] time_break = False while m_sec > 0 and not time_break: captured = 0 start_time = time.time() while captured < 10 and m_sec > 0: status = self.audio_capturer_status current_time = time.time() if current_time - start_time > TIMEOUT: time_break = True break if status == AudioCaptureStatus.Stop: continue audio_frame, m_sec = self.__capture_audio(m_sec=m_sec) if len(audio_frame.data) > 0: captured += 1 buffer.append(audio_frame) return buffer def get_available_video_frame_count(self): return self.__io.get(TSI_VIDCAP_AVAILABLE_FRAME_COUNT, c_int)[1] def __check_available_video(self, timeout) -> bool: def is_video_available(capturer): return capturer.video_capturer_status == VideoCaptureStatus.LiveModeActive return function_scheduler(is_video_available, self, interval=1, timeout=timeout) def __check_available_buffered_video(self, timeout) -> bool: def is_video_available(capturer): return capturer.video_capturer_status == VideoCaptureStatus.Transferring return function_scheduler(is_video_available, self, interval=1, timeout=timeout) def __check_available_bulk_data(self, timeout) -> bool: def is_bulk_available(capturer): return capturer.bulk_capturer_status == BulkCaptureStatus.Transferring return function_scheduler(is_bulk_available, self, interval=1, timeout=timeout) def __capture_video(self, timeout=TIMEOUT, capture_type=CaptureConfig.Type.LIVE) -> Union[VideoFrame, VideoFrameDSC]: if timeout <= 0: raise ValueError(f"Timeout must be more than 0.") if capture_type == CaptureConfig.Type.BUFFERED: if not self.__check_available_buffered_video(timeout): raise BufferedCaptureError( f"Cannot get frames from buffer. " f"Current buffered capture status is {self.video_capturer_status.name}" ) else: if not self.__check_available_video(timeout): raise CaptureError( f"Cannot start to capture video. Current video capture status {self.video_capturer_status.name}") try: result = self.__io.set(TSI_VIDCAP_CAPTURE_NEXT_W, 0) if result == TSI_ERROR_DATA_PROTECTION_ENABLED: raise CaptureError("Video data is HDCP protected. Capturing is not available.") except AssertionError as e: raise CaptureError(f"Error: {e}") try: min_buffer_size = self.__io.get(TSI_R_VIDCAP_MIN_BUFFER_SIZE, c_int)[1] if min_buffer_size <= 0: raise ValueError("Minimum buffer size must be more than 0") except AssertionError as e: raise CaptureError(f"Error: {e}") try: frame_data = bytearray(self.__io.get(TSI_R_VIDCAP_FRAME_DATA, c_uint8, min_buffer_size)[1]) if len(frame_data) <= 0: raise ValueError("Minimum length of captured data must be more than 0") except AssertionError as e: raise CaptureError(f"Error: {e}") frame_attributes = self.__io.get(TSI_VIDCAP_FRAME_HEADER_R, VideoFrameHeader)[1] if frame_attributes.is_dsc() and len(frame_data) > 128: vf = VideoFrameDSC() vf.compression_info = create_from_pps(frame_data[:128]) else: vf = VideoFrame() vf.width = frame_attributes.width vf.height = frame_attributes.height vf.color_info.bpc = frame_attributes.bpc vf.color_info.dynamic_range = frame_attributes.dynamic_range vf.color_info.color_format = frame_attributes.color_format vf.color_info.colorimetry = frame_attributes.colorimetry vf.data_info.component_order = DataInfo.ComponentOrder.CO_UCDRX vf.data_info.alignment = DataInfo.Alignment.A_MSB vf.data_info.packing = DataInfo.Packing.P_PACKED vf.timestamp = frame_attributes.timestamp vf.data = frame_data return vf def capture_video_by_n_frames(self, frames_count: int, capture_type: CaptureConfig.Type = CaptureConfig.Type.LIVE): if frames_count <= 0: raise ValueError(f"Frames count must be more than 0.") buffer = [] if capture_type == CaptureConfig.Type.BUFFERED: timeout = max(10, round(0.006 * frames_count)) try: for i in range(frames_count): buffer.append(self.__capture_video(timeout=timeout, capture_type=capture_type)) except BufferedCaptureError as e: return buffer else: for i in range(frames_count): buffer.append(self.__capture_video()) return buffer def capture_video_by_n_sec(self, sec: int): if sec <= 0: raise ValueError(f"Seconds count must be more than 0.") buffer = [] time_start = time.time() while time.time() - time_start < sec: buffer.append(self.__capture_video()) return buffer def set_video_stream_number(self, number: int): self.__mutex.acquire() self.__io.set(TSI_DPRX_STREAM_SELECT, number, c_uint32) self.__mutex.release() def __current_config(self) -> CaptureConfig: config = CaptureConfig() config.from_int(self.__io.get(TSI_CAP_CONFIG, c_uint)[1]) return config # Capture CRC def capture_crc(self, crc_frame_count: int = 1) -> List[tuple[int, int, int]]: if crc_frame_count <= 0: raise ValueError(f"Incorrect crc frame count: {crc_frame_count}") crc_values = self.__io.get(TSI_VIDCAP_SIGNAL_CRC_R, CrcStruct, crc_frame_count)[1] crc_list = [] if crc_frame_count == 1: crc_list = [(crc_values.r, crc_values.g, crc_values.b)] else: [crc_list.append((crc.r, crc.g, crc.b)) for crc in crc_values] return crc_list # Bulk Capturer def read_bulk_capture_caps(self) -> CaptureCaps: return self.__io.get(TSI_BULK_CAPTURE_CAPS_R, CaptureCaps)[1] def read_bulk_trigger_caps(self) -> int: return self.__io.get(TSI_BULK_TRIGGER_CAPS_R, c_uint32)[1] def write_bulk_trigger_settings(self, trigger_mask: int, trigger_config: list, trigger_config_ext: list): self.__io.set(TSI_BULK_TRIGGER_MASK_W, trigger_mask, c_uint32) self.__io.set(TSI_BULK_TRIGGER_CONFIGURATION_W, trigger_config, c_uint32, data_count=len(trigger_config)) self.__io.set(TSI_BULK_TRIGGER_CONFIGURATION_EXT_W, trigger_config_ext, c_uint32, data_count=len(trigger_config_ext)) def write_bulk_size(self, size: int): data = SBlock() data.BLOCK = 0 data.OFFSET = 0 data.SIZE = size self.__io.set(TSI_BULK_CAPTURE_BLOCK, data, SBlock) def write_encoding_type(self, value: EncodingTypeEnum): self.__io.set(TSI_BULK_CAPTURE_TYPE, value.value, c_uint32) def read_encoding_type(self) -> EncodingTypeEnum: return EncodingTypeEnum(self.__io.get(TSI_BULK_CAPTURE_TYPE, c_uint32)[1]) def write_lane_count(self, value: LaneCountEnum): self.__io.set(TSI_BULK_CAPTURE_LANE_COUNT, value.value, c_uint32) def read_lane_count(self) -> LaneCountEnum: return LaneCountEnum(self.__io.get(TSI_BULK_CAPTURE_LANE_COUNT, c_uint32)[1]) def write_bulk_gpio(self, gpio: bool): self.__io.set(TSI_BULK_CAPTURE_GPIO_W, TSI_BULK_CAPTURE_GPIO_5BIT if gpio else TSI_BULK_CAPTURE_GPIO_OFF, c_uint32) def write_bulk_trigger_position(self, position: int): self.__io.set(TSI_BULK_TRIGGER_POS, position) def start_bulk_capture(self): self.__mutex.acquire() self.__io.set(TSI_EVCAP_CTRL, 1) self.__io.set(TSI_BULK_CAPTURE_CONTROL_W, TSI_BULK_CAPTURE_START) self.__mutex.release() def stop_bulk_capture(self): self.__mutex.acquire() self.__io.set(TSI_EVCAP_CTRL, 0) self.__io.set(TSI_BULK_CAPTURE_CONTROL_W, TSI_BULK_CAPTURE_STOP) self.__mutex.release() def start_event_capture(self): self.__mutex.acquire() self.__io.set(TSI_EVCAP_CTRL, 1) self.__io.set(TSI_EVCAP_EVENT_SRC_EN, UCD_ALL_EVENTS) self.__mutex.release() def stop_event_capture(self): self.__mutex.acquire() self.__io.set(TSI_EVCAP_CTRL, 0) self.__mutex.release() def clear_bulk_buffer(self): max_time_waiting = 5 value = -1 time_waited = time.time() while value != TSI_BULK_STATUS_IDLE and time.time() - time_waited < max_time_waiting: self.__io.set(TSI_BULK_CAPTURE_CLEAR_W, 0) value = self.__io.get(TSI_BULK_CAPTURE_STATUS_R)[1] def bulk_capture(self, all_size: int, trigger_enabled: Optional[TriggerVarType]) -> list: buffer = [] iterations = int(all_size / (1024 * 1024)) prev_status = 0 last_bulk_capture_time = time.time() for i in range(iterations): event_count = self.__io.get(TSI_EVCAP_COUNT_R, c_uint32)[1] if event_count > 0: event_number = 0 prev_timestamp = 0 events_captured = 0 while event_count > 0 and events_captured < 500: event_size = self.__io.get(TSI_R_EVCAP_DATA, None, 0)[0] if event_size > 0: cap_data = CapturedData() cap_data.data = bytearray(self.__io.get(TSI_R_EVCAP_DATA, c_ubyte, event_size)[1]) cap_data.data = cap_data.data[3:] cap_data.type = CapturedDataType.Event cap_data.timestamp = int.from_bytes(bytes=cap_data.data[:8], byteorder='big') if cap_data.timestamp == prev_timestamp: event_number += 1 else: event_number = 0 buffer.append(cap_data) prev_timestamp = cap_data.timestamp events_captured += 1 event_count -= 1 bulk_status = self.__io.get(TSI_BULK_CAPTURE_STATUS_R, c_int32)[1] if bulk_status == TSI_BULK_STATUS_IDLE and prev_status == TSI_BULK_STATUS_TRANSFERRING: return buffer prev_status = bulk_status now = time.time() if trigger_enabled is not None and now - last_bulk_capture_time >= TIMEOUT: return buffer if not self.__check_available_bulk_data(TIMEOUT): return buffer self.__io.set(TSI_EVCAP_CTRL, 0) cap_data = CapturedData() cap_data.type = CapturedDataType.Bulk result, data, _ = self.__io.get(TSI_BULK_CAPTURE_DATA_R, c_ubyte, 1024 * 1024) cap_data.data = bytearray(data) if result >= TSI_SUCCESS and len(cap_data.data) > 0: buffer.append(cap_data) last_bulk_capture_time = time.time() return buffer