Files
xinzhu.yin c157e774e5 1.1.0版本
2026-04-16 16:51:05 +08:00

551 lines
20 KiB
Python

import time
from typing import Union
from UniTAP.libs.lib_tsi.tsi_io import DeviceIO
from UniTAP.libs.lib_tsi.tsi import *
from UniTAP.dev.modules.capturer.statuses import CaptureStatus, AudioCaptureStatus, EventCaptureStatus, \
VideoCaptureStatus, BulkCaptureStatus
from UniTAP.common import AudioFrameData, VideoFrameDSC, create_from_pps
from UniTAP.dev.ports.modules.capturer.event.event_types import EventData
from threading import Lock
from UniTAP.utils import function_scheduler
from UniTAP.dev.ports.modules.capturer.bulk.private_bulk_types import *
from UniTAP.dev.ports.modules.capturer.bulk.bulk_types import *
from .types import *
TIMEOUT = 10
class CaptureConfig:
class Type(IntEnum):
NONE = -1
LIVE = 0
BUFFERED = 1
def __init__(self):
self.audio = False
self.video = False
self.event = False
self.type = CaptureConfig.Type.NONE
self.frame_count = 0
def from_int(self, value: int):
self.video = bool(value & 1 << 3)
self.audio = bool(value & 1 << 4)
self.event = bool(value & 1 << 5)
self.type = CaptureConfig.Type.LIVE if value & (1 << 2) else CaptureConfig.Type.BUFFERED
self.frame_count = value >> 8
def to_int(self) -> int:
value = 0
if self.video:
value |= (1 << 3)
if self.audio:
value |= (1 << 4)
if self.event:
value |= (1 << 5)
if self.video or self.audio:
if self.type == CaptureConfig.Type.LIVE:
value |= (1 << 2)
else:
value |= (self.frame_count << 8)
return value
class Capturer:
def __init__(self, port_io: DeviceIO):
self.__io = port_io
self.__config = CaptureConfig()
self.__status = CaptureStatus.Unknown
self.__mutex = Lock()
@property
def config(self):
return self.__config
@property
def status(self):
return self.__status
@status.setter
def status(self, value: CaptureStatus):
self.__status = value
@property
def device(self):
return self.__io
@property
def video_capturer_status(self) -> VideoCaptureStatus:
return VideoCaptureStatus(self.__io.get(TSI_VIDCAP_CAPTURE_STATUS_R, c_int)[1])
@property
def audio_capturer_status(self) -> AudioCaptureStatus:
return AudioCaptureStatus(self.__io.get(TSI_R_AUDCAP_STATUS, c_int)[1])
@property
def event_capturer_status(self) -> EventCaptureStatus:
return EventCaptureStatus(self.__io.get(TSI_EVCAP_CTRL, c_uint32)[1] & 0xFF)
@property
def bulk_capturer_status(self) -> BulkCaptureStatus:
return BulkCaptureStatus(self.__io.get(TSI_BULK_CAPTURE_STATUS_R, c_uint32)[1] & 0x3)
def start_capture(self, config: CaptureConfig):
self.__mutex.acquire()
self.__config.from_int(self.__current_config().to_int() | config.to_int())
self.__io.set(TSI_CAP_CONFIG, self.__config.to_int())
if self.__io.set(TSI_W_CAP_COMMAND, 1) == 0:
self.__status = CaptureStatus.Running
else:
self.__status = CaptureStatus.Unknown
self.__mutex.release()
def stop_capture(self, config: CaptureConfig):
self.__mutex.acquire()
self.__config.from_int(self.__current_config().to_int() & ~config.to_int())
self.__io.set(TSI_CAP_CONFIG, self.__config.to_int())
if self.__io.set(TSI_W_CAP_COMMAND, 2) == 0:
self.__status = CaptureStatus.Stop
else:
self.__status = CaptureStatus.Unknown
self.__mutex.release()
def __capture_events(self, event_count=0):
event = EventData()
if event_count > 0:
event_size = self.__io.get(TSI_R_EVCAP_DATA, None, 0)[0]
if event_size > 0:
event.data = bytearray(self.__io.get(TSI_R_EVCAP_DATA, c_ubyte, event_size)[1])
return event
def get_buffer_capacity(self, stream_number: int = None):
if stream_number is not None:
self.__io.set(TSI_DPRX_STREAM_SELECT, stream_number, c_uint32)
self.__io.set(TSI_DPRX_MSA_COMMAND_W, 2, c_uint32)
width = self.__io.get(TSI_R_INPUT_WIDTH , c_uint32)[1]
height = self.__io.get(TSI_R_INPUT_HEIGHT , c_uint32)[1]
total_memory_bytes = self.__io.get(TSI_MEMORY_SIZE_R, c_uint64)[1]
bpc = self.__io.get(TSI_INPUT_COLOR_DEPTH_R , c_uint32)[1]
color_format = self.__io.get(TSI_INPUT_COLOR_MODE_R , c_uint32)[1]
bpp = self._get_bits_per_pixel(bpc, color_format)
line_alignment = 1023
pixel_size = self._get_pixel_size(color_format, bpp)
line_size = width * pixel_size
line_pitch = self._align(line_size, line_alignment)
one_frame_size_bytes = (height + 1) * line_pitch
if one_frame_size_bytes == 0:
return 0
else:
return int(total_memory_bytes / one_frame_size_bytes)
@staticmethod
def _get_bits_per_pixel(bpc, color_format) -> int:
if color_format in [0, 1, 6]:
return 0
elif color_format in [2, 4, 9]:
return 3 * bpc
elif color_format == 3:
return 2 * bpc
elif color_format == 5:
return 3 * bpc / 2
elif color_format == 7:
return bpc
elif color_format == 8:
return bpc
@staticmethod
def _get_pixel_size(color_format, bpp):
if color_format == 5:
return 4 if bpp >= 18 else 2
elif color_format in [7, 8]:
return 6 if bpp > 10 else 4
else:
return 6 if bpp > 32 else 4
@staticmethod
def _align(value, alignment):
return (value + alignment) & ~alignment
def get_available_events_count(self) -> int:
return self.__io.get(TSI_R_EVCAP_COUNT, c_uint32)[1]
def capture_n_events(self, events_count: int):
if events_count <= 0:
raise ValueError(f"Events count must be more than 0.")
buffer = []
def is_enough_events():
return self.get_available_events_count() >= events_count
function_scheduler(is_enough_events, interval=1, timeout=TIMEOUT)
for i in range(events_count):
buffer.append(self.__capture_events(self.get_available_events_count()))
return buffer
def read_all_events(self):
buffer = []
while self.get_available_events_count() > 0:
buffer.append(self.__capture_events(self.get_available_events_count()))
return buffer
def __capture_audio(self, m_sec=1000):
audio_frame = AudioFrameData()
audio_frame.channel_count = self.__io.get(TSI_R_AUDCAP_CHANNEL_COUNT, c_int)[1]
audio_frame.sample_size = self.__io.get(TSI_R_AUDCAP_SAMPLE_SIZE, c_int)[1] * 8
audio_frame.sample_rate = self.__io.get(TSI_R_AUDCAP_SAMPLE_RATE, c_int)[1]
audio_frame.timestamp = self.__io.get(TSI_R_AUDCAP_TIMESTAMP, c_uint64)[1]
audio_frame.samples = self.__io.get(TSI_R_AUDCAP_SAMPLE_COUNT, c_int)[1]
audio_frame.frame_counter = self.__io.get(TSI_R_AUDCAP_FRAME_COUNTER, c_int)[1]
audio_frame.sample_format = self.__io.get(TSI_R_AUDCAP_SAMPLE_FORMAT, c_int)[1]
min_buff_size = self.__io.get(TSI_R_AUDCAP_MIN_BUFFER_SIZE, c_uint32)[1]
audio_frame.data = self.__io.get(TSI_R_AUDCAP_SAMPLE_DATA, c_uint8, min_buff_size)[1]
m_sec -= 1000 * len(audio_frame.data) / 2 / audio_frame.channel_count / audio_frame.sample_rate
return audio_frame, m_sec
def capture_audio_by_n_frames(self, frames_count: int, timeout: int = None):
if frames_count <= 0:
raise ValueError(f"Frames count must be more than 0.")
buffer = []
time_break = False
timeout = timeout if timeout is not None else TIMEOUT
while not time_break and len(buffer) < frames_count:
captured = 0
start_time = time.time()
while captured < 10:
status = self.audio_capturer_status
current_time = time.time()
if current_time - start_time > timeout:
time_break = True
break
if status == AudioCaptureStatus.Stop:
continue
audio_frame, m_sec = self.__capture_audio()
if len(audio_frame.data) > 0:
captured += 1
buffer.append(audio_frame)
return buffer
def capture_audio_by_m_sec(self, m_sec: int):
if m_sec <= 0:
raise ValueError(f"Seconds count must be more than 0.")
buffer = []
time_break = False
while m_sec > 0 and not time_break:
captured = 0
start_time = time.time()
while captured < 10 and m_sec > 0:
status = self.audio_capturer_status
current_time = time.time()
if current_time - start_time > TIMEOUT:
time_break = True
break
if status == AudioCaptureStatus.Stop:
continue
audio_frame, m_sec = self.__capture_audio(m_sec=m_sec)
if len(audio_frame.data) > 0:
captured += 1
buffer.append(audio_frame)
return buffer
def get_available_video_frame_count(self):
return self.__io.get(TSI_VIDCAP_AVAILABLE_FRAME_COUNT, c_int)[1]
def __check_available_video(self, timeout) -> bool:
def is_video_available(capturer):
return capturer.video_capturer_status == VideoCaptureStatus.LiveModeActive
return function_scheduler(is_video_available, self, interval=1, timeout=timeout)
def __check_available_buffered_video(self, timeout) -> bool:
def is_video_available(capturer):
return capturer.video_capturer_status == VideoCaptureStatus.Transferring
return function_scheduler(is_video_available, self, interval=1, timeout=timeout)
def __check_available_bulk_data(self, timeout) -> bool:
def is_bulk_available(capturer):
return capturer.bulk_capturer_status == BulkCaptureStatus.Transferring
return function_scheduler(is_bulk_available, self, interval=1, timeout=timeout)
def __capture_video(self, timeout=TIMEOUT,
capture_type=CaptureConfig.Type.LIVE) -> Union[VideoFrame, VideoFrameDSC]:
if timeout <= 0:
raise ValueError(f"Timeout must be more than 0.")
if capture_type == CaptureConfig.Type.BUFFERED:
if not self.__check_available_buffered_video(timeout):
raise BufferedCaptureError(
f"Cannot get frames from buffer. "
f"Current buffered capture status is {self.video_capturer_status.name}"
)
else:
if not self.__check_available_video(timeout):
raise CaptureError(
f"Cannot start to capture video. Current video capture status {self.video_capturer_status.name}")
try:
result = self.__io.set(TSI_VIDCAP_CAPTURE_NEXT_W, 0)
if result == TSI_ERROR_DATA_PROTECTION_ENABLED:
raise CaptureError("Video data is HDCP protected. Capturing is not available.")
except AssertionError as e:
raise CaptureError(f"Error: {e}")
try:
min_buffer_size = self.__io.get(TSI_R_VIDCAP_MIN_BUFFER_SIZE, c_int)[1]
if min_buffer_size <= 0:
raise ValueError("Minimum buffer size must be more than 0")
except AssertionError as e:
raise CaptureError(f"Error: {e}")
try:
frame_data = bytearray(self.__io.get(TSI_R_VIDCAP_FRAME_DATA, c_uint8, min_buffer_size)[1])
if len(frame_data) <= 0:
raise ValueError("Minimum length of captured data must be more than 0")
except AssertionError as e:
raise CaptureError(f"Error: {e}")
frame_attributes = self.__io.get(TSI_VIDCAP_FRAME_HEADER_R, VideoFrameHeader)[1]
if frame_attributes.is_dsc() and len(frame_data) > 128:
vf = VideoFrameDSC()
vf.compression_info = create_from_pps(frame_data[:128])
else:
vf = VideoFrame()
vf.width = frame_attributes.width
vf.height = frame_attributes.height
vf.color_info.bpc = frame_attributes.bpc
vf.color_info.dynamic_range = frame_attributes.dynamic_range
vf.color_info.color_format = frame_attributes.color_format
vf.color_info.colorimetry = frame_attributes.colorimetry
vf.data_info.component_order = DataInfo.ComponentOrder.CO_UCDRX
vf.data_info.alignment = DataInfo.Alignment.A_MSB
vf.data_info.packing = DataInfo.Packing.P_PACKED
vf.timestamp = frame_attributes.timestamp
vf.data = frame_data
return vf
def capture_video_by_n_frames(self, frames_count: int, capture_type: CaptureConfig.Type = CaptureConfig.Type.LIVE):
if frames_count <= 0:
raise ValueError(f"Frames count must be more than 0.")
buffer = []
if capture_type == CaptureConfig.Type.BUFFERED:
timeout = max(10, round(0.006 * frames_count))
try:
for i in range(frames_count):
buffer.append(self.__capture_video(timeout=timeout, capture_type=capture_type))
except BufferedCaptureError as e:
return buffer
else:
for i in range(frames_count):
buffer.append(self.__capture_video())
return buffer
def capture_video_by_n_sec(self, sec: int):
if sec <= 0:
raise ValueError(f"Seconds count must be more than 0.")
buffer = []
time_start = time.time()
while time.time() - time_start < sec:
buffer.append(self.__capture_video())
return buffer
def set_video_stream_number(self, number: int):
self.__mutex.acquire()
self.__io.set(TSI_DPRX_STREAM_SELECT, number, c_uint32)
self.__mutex.release()
def __current_config(self) -> CaptureConfig:
config = CaptureConfig()
config.from_int(self.__io.get(TSI_CAP_CONFIG, c_uint)[1])
return config
# Capture CRC
def capture_crc(self, crc_frame_count: int = 1) -> List[tuple[int, int, int]]:
if crc_frame_count <= 0:
raise ValueError(f"Incorrect crc frame count: {crc_frame_count}")
crc_values = self.__io.get(TSI_VIDCAP_SIGNAL_CRC_R, CrcStruct, crc_frame_count)[1]
crc_list = []
if crc_frame_count == 1:
crc_list = [(crc_values.r, crc_values.g, crc_values.b)]
else:
[crc_list.append((crc.r, crc.g, crc.b)) for crc in crc_values]
return crc_list
# Bulk Capturer
def read_bulk_capture_caps(self) -> CaptureCaps:
return self.__io.get(TSI_BULK_CAPTURE_CAPS_R, CaptureCaps)[1]
def read_bulk_trigger_caps(self) -> int:
return self.__io.get(TSI_BULK_TRIGGER_CAPS_R, c_uint32)[1]
def write_bulk_trigger_settings(self, trigger_mask: int, trigger_config: list, trigger_config_ext: list):
self.__io.set(TSI_BULK_TRIGGER_MASK_W, trigger_mask, c_uint32)
self.__io.set(TSI_BULK_TRIGGER_CONFIGURATION_W, trigger_config, c_uint32, data_count=len(trigger_config))
self.__io.set(TSI_BULK_TRIGGER_CONFIGURATION_EXT_W, trigger_config_ext, c_uint32,
data_count=len(trigger_config_ext))
def write_bulk_size(self, size: int):
data = SBlock()
data.BLOCK = 0
data.OFFSET = 0
data.SIZE = size
self.__io.set(TSI_BULK_CAPTURE_BLOCK, data, SBlock)
def write_encoding_type(self, value: EncodingTypeEnum):
self.__io.set(TSI_BULK_CAPTURE_TYPE, value.value, c_uint32)
def read_encoding_type(self) -> EncodingTypeEnum:
return EncodingTypeEnum(self.__io.get(TSI_BULK_CAPTURE_TYPE, c_uint32)[1])
def write_lane_count(self, value: LaneCountEnum):
self.__io.set(TSI_BULK_CAPTURE_LANE_COUNT, value.value, c_uint32)
def read_lane_count(self) -> LaneCountEnum:
return LaneCountEnum(self.__io.get(TSI_BULK_CAPTURE_LANE_COUNT, c_uint32)[1])
def write_bulk_gpio(self, gpio: bool):
self.__io.set(TSI_BULK_CAPTURE_GPIO_W, TSI_BULK_CAPTURE_GPIO_5BIT if gpio else TSI_BULK_CAPTURE_GPIO_OFF,
c_uint32)
def write_bulk_trigger_position(self, position: int):
self.__io.set(TSI_BULK_TRIGGER_POS, position)
def start_bulk_capture(self):
self.__mutex.acquire()
self.__io.set(TSI_EVCAP_CTRL, 1)
self.__io.set(TSI_BULK_CAPTURE_CONTROL_W, TSI_BULK_CAPTURE_START)
self.__mutex.release()
def stop_bulk_capture(self):
self.__mutex.acquire()
self.__io.set(TSI_EVCAP_CTRL, 0)
self.__io.set(TSI_BULK_CAPTURE_CONTROL_W, TSI_BULK_CAPTURE_STOP)
self.__mutex.release()
def start_event_capture(self):
self.__mutex.acquire()
self.__io.set(TSI_EVCAP_CTRL, 1)
self.__io.set(TSI_EVCAP_EVENT_SRC_EN, UCD_ALL_EVENTS)
self.__mutex.release()
def stop_event_capture(self):
self.__mutex.acquire()
self.__io.set(TSI_EVCAP_CTRL, 0)
self.__mutex.release()
def clear_bulk_buffer(self):
max_time_waiting = 5
value = -1
time_waited = time.time()
while value != TSI_BULK_STATUS_IDLE and time.time() - time_waited < max_time_waiting:
self.__io.set(TSI_BULK_CAPTURE_CLEAR_W, 0)
value = self.__io.get(TSI_BULK_CAPTURE_STATUS_R)[1]
def bulk_capture(self, all_size: int, trigger_enabled: Optional[TriggerVarType]) -> list:
buffer = []
iterations = int(all_size / (1024 * 1024))
prev_status = 0
last_bulk_capture_time = time.time()
for i in range(iterations):
event_count = self.__io.get(TSI_EVCAP_COUNT_R, c_uint32)[1]
if event_count > 0:
event_number = 0
prev_timestamp = 0
events_captured = 0
while event_count > 0 and events_captured < 500:
event_size = self.__io.get(TSI_R_EVCAP_DATA, None, 0)[0]
if event_size > 0:
cap_data = CapturedData()
cap_data.data = bytearray(self.__io.get(TSI_R_EVCAP_DATA, c_ubyte, event_size)[1])
cap_data.data = cap_data.data[3:]
cap_data.type = CapturedDataType.Event
cap_data.timestamp = int.from_bytes(bytes=cap_data.data[:8], byteorder='big')
if cap_data.timestamp == prev_timestamp:
event_number += 1
else:
event_number = 0
buffer.append(cap_data)
prev_timestamp = cap_data.timestamp
events_captured += 1
event_count -= 1
bulk_status = self.__io.get(TSI_BULK_CAPTURE_STATUS_R, c_int32)[1]
if bulk_status == TSI_BULK_STATUS_IDLE and prev_status == TSI_BULK_STATUS_TRANSFERRING:
return buffer
prev_status = bulk_status
now = time.time()
if trigger_enabled is not None and now - last_bulk_capture_time >= TIMEOUT:
return buffer
if not self.__check_available_bulk_data(TIMEOUT):
return buffer
self.__io.set(TSI_EVCAP_CTRL, 0)
cap_data = CapturedData()
cap_data.type = CapturedDataType.Bulk
result, data, _ = self.__io.get(TSI_BULK_CAPTURE_DATA_R, c_ubyte, 1024 * 1024)
cap_data.data = bytearray(data)
if result >= TSI_SUCCESS and len(cap_data.data) > 0:
buffer.append(cap_data)
last_bulk_capture_time = time.time()
return buffer