1.1.0版本

This commit is contained in:
xinzhu.yin
2026-04-16 16:51:05 +08:00
commit c157e774e5
333 changed files with 70759 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
from .event import *
from .video import *

View File

@@ -0,0 +1,117 @@
import time
import copy
import warnings
from typing import List
from UniTAP.common.audio_mode import AudioFrameData
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.capturer.statuses import AudioCaptureStatus
from .result_audio import ResultAudioObject
class AudioCapturer:
"""
Class `AudioCapturer` allows working with capturing audio frames on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing
`capture_result`.
"""
def __init__(self, capturer: Capturer):
self.__capturer = capturer
self.__result = ResultAudioObject()
@property
def status(self) -> AudioCaptureStatus:
"""
Returns current audio capturer status.
Returns:
object of `AudioCaptureStatus` type
"""
return self.__capturer.audio_capturer_status
@property
def capture_result(self) -> ResultAudioObject:
"""
Returns result of audio capturing.
Returns:
object of `ResultAudioObject` type
"""
return self.__result
def start(self, frames_count=0, m_sec=0, timeout=None):
"""
Start capturing. Possible some variants of capturing:
- Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped).
- Capture with fixed audio duration (captures audio for the specified duration in milliseconds).
- Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element`)
- Capture with timeout (maximum duration for capturing operations before stopping, in seconds).
All results can be obtained using the function `capture_result`.
Args:
frames_count (int)
m_sec (int)
"""
config = CaptureConfig()
config.audio = True
config.type = CaptureConfig.Type.LIVE
config.video = True
self.__capturer.start_capture(config)
self.__result.clear()
self.__result.start_capture_time = time.time()
if frames_count > 0:
self.__result.buffer.extend(self.__capturer.capture_audio_by_n_frames(frames_count, timeout))
self.__fill_audio_mode()
elif m_sec > 0:
self.__result.buffer.extend(self.__capturer.capture_audio_by_m_sec(m_sec))
self.__fill_audio_mode()
def __fill_audio_mode(self):
if len(self.__result.buffer) > 0:
self.__result.audio_mode.channel_count = self.__result.buffer[0].channel_count
self.__result.audio_mode.bits = self.__result.buffer[0].sample_size
self.__result.audio_mode.sample_rate = self.__result.buffer[0].sample_rate
def stop(self):
"""
Stop capture audio.
"""
config = CaptureConfig()
config.audio = True
config.type = CaptureConfig.Type.LIVE
config.video = True
self.__capturer.stop_capture(config)
self.__result.end_capture_time = time.time()
def pop_element(self) -> List[AudioFrameData]:
"""
Return first object of `AudioFrameData`.
Returns:
object of `AudioFrameData` type
"""
if self.status == AudioCaptureStatus.Stop:
warnings.warn("Audio capture is not working now. Please, turn it on.")
captured_audio_frames = self.__capturer.capture_audio_by_n_frames(1)
self.__result.buffer.extend(copy.deepcopy(captured_audio_frames))
self.__fill_audio_mode()
return captured_audio_frames[0]
def pop_element_as_result_object(self) -> ResultAudioObject:
"""
Return captured audio frame(objects of `AudioFrameData`) as `ResultAudioObject`.
Returns:
object of `ResultAudioObject` type
"""
captured_audio_frames = self.__capturer.capture_audio_by_n_frames(1)
res = ResultAudioObject()
res.buffer.extend(captured_audio_frames)
return res

View File

@@ -0,0 +1,60 @@
from UniTAP.dev.modules.capturer.result_object import ResultObject
from UniTAP.common.audio_mode import AudioMode, AudioFileFormat
from UniTAP.dev.ports.modules.ag.ag_utils import save_to_bin_file, save_to_wave_file
import warnings
class ResultAudioObject(ResultObject):
"""
Class `ResultAudioObject` inherited from class `ResultObject`.
Class `ResultAudioObject` allows saving captured frames to image `save_image_to_file`.
Also has all the `ResultObject` functionality.
"""
def __init__(self):
super().__init__()
self.__audio_mode = AudioMode()
@property
def audio_mode(self) -> AudioMode:
"""
Returns current audio mode for captured audio frames.
Returns:
object of `AudioMode` type
"""
return self.__audio_mode
def save_to_file(self, file_format: AudioFileFormat, path: str):
"""
Saving audio frames to file. Supported file formats describe in `AudioFileFormat`.
Args:
file_format (`AudioFileFormat`) - file format
path (str) - path to save
"""
if len(self.buffer) > 0:
if file_format == AudioFileFormat.BIN:
save_to_bin_file(path=path, data=self.__convert_buffer())
elif file_format == AudioFileFormat.WAV:
save_to_wave_file(path=path, audio_mode=self.audio_mode, data=self.__convert_buffer())
else:
raise ValueError(f"Incorrect audio format. Available formats: "
f"{AudioFileFormat.BIN.name}, {AudioFileFormat.WAV.name}.\n"
f"Transferred audio format: {file_format.name}")
else:
warnings.warn("Buffer size is equal 0.")
def __convert_buffer(self):
data = []
for audio_frame in self.buffer:
data.append(audio_frame.data)
return bytearray().join(data)
def __str__(self):
return f"Start capture time: {self.start_capture_time}\n" \
f"End capture time: {self.end_capture_time}\n" \
f"Timestamp: {self.timestamp.__str__()}\n" \
f"Audio mode:\n{self.audio_mode.__str__()}\n"

View File

@@ -0,0 +1 @@
from .bulk_types import TriggerType, TriggerPosition, TriggerTypeEnum

View File

@@ -0,0 +1,132 @@
import warnings
import time
from typing import Optional
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.memory_manager import MemoryManager
from UniTAP.dev.ports.modules.capturer.bulk.bulk_types import (TriggerPosition, TriggerVarType, _get_trigger_value,
EncodingTypeEnum, LaneCountEnum)
from UniTAP.dev.ports.modules.capturer.bulk.result_bulk import ResultBulkObject
megabyte = 1024 * 1024
class BulkCapturer:
"""
Class `BulkCapturer` allows working with capturing Bulk data on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
capturing `capture_result`.
"""
def __init__(self, capturer: Capturer, memory_manager: MemoryManager):
self.__capturer = capturer
self.__memory_manager = memory_manager
self.__trigger_position = TriggerPosition.TP_Start
self.__result = ResultBulkObject()
@property
def status(self):
"""
Returns current bulk capturer status.
Returns:
object of `VideoCaptureStatus` type
"""
return self.__capturer.bulk_capturer_status
@property
def capture_result(self) -> ResultBulkObject:
"""
Returns result of bulk capturing.
Returns:
object of `ResultBulkObject` type
"""
return self.__result
@property
def encoding_type(self) -> EncodingTypeEnum:
"""
Returns current encoding type of capturing.
Returns:
object of `EncodingTypeEnum` type
"""
return self.__capturer.read_encoding_type()
@property
def lane_count(self) -> LaneCountEnum:
"""
Returns current lane count for capturing.
Returns:
object of `LaneCountEnum` type
"""
return self.__capturer.read_lane_count()
def start(self, bulk_size: int = 1, trigger_position: TriggerPosition = TriggerPosition.TP_Start,
trigger_config: Optional[TriggerVarType] = None, assume_scrambler: bool = False, gpio: bool = False,
encoding_type: Optional[EncodingTypeEnum] = None, lane_count: Optional[LaneCountEnum] = None):
"""
Start capturing. All results can be obtained using the function `capture_result`.
Args:
bulk_size (int) - bulk data size in megabytes
trigger_position (`TriggerPosition`)
trigger_config (`TriggerVarType`|None)
assume_scrambler (bool)
gpio (bool)
encoding_type (`EncodingTypeEnum`|None)
lane_count (`LaneCountEnum`|None)
"""
self.__result.clear()
if trigger_config is None:
trigger_position = TriggerPosition.TP_Start
capture_config = CaptureConfig()
capture_config.video = True
capture_config.event = True
capture_config.audio = True
capture_config.type = CaptureConfig.Type.LIVE
self.__capturer.stop_capture(capture_config)
self.__capturer.stop_bulk_capture()
self.__capturer.clear_bulk_buffer()
max_bulk_block_bytes = self.__memory_manager.get_total_memory() - self.__memory_manager.RESERVED_MEMORY_BYTES
bulk_block_bytes = max_bulk_block_bytes if bulk_size * megabyte > max_bulk_block_bytes else bulk_size * megabyte
self.__memory_manager.set_memory_layout([bulk_block_bytes])
self.__capturer.write_bulk_size(bulk_block_bytes)
if encoding_type is None:
encoding_type = EncodingTypeEnum.Encoding_Auto
self.__capturer.write_encoding_type(encoding_type)
if lane_count is None:
lane_count = LaneCountEnum.Auto
self.__capturer.write_lane_count(lane_count)
if self.__capturer.read_bulk_capture_caps().bitGrabWidth:
self.__capturer.write_bulk_gpio(gpio)
self.__capturer.write_bulk_trigger_position(trigger_position.value)
if self.__capturer.read_bulk_capture_caps().triggers:
self.__capturer.write_bulk_trigger_settings(*_get_trigger_value(trigger_config,
self.__capturer.read_bulk_trigger_caps()))
else:
warnings.warn("Triggers are not supported.")
self.__result.assume_scrambler_disabled = assume_scrambler
self.__result.start_capture_time = time.time()
self.__capturer.start_bulk_capture()
self.__result.buffer.extend(self.__capturer.bulk_capture(bulk_block_bytes, trigger_config))
def stop(self):
"""
Stop capture video.
"""
self.__capturer.stop_bulk_capture()
self.__result.end_capture_time = time.time()

View File

@@ -0,0 +1,979 @@
from enum import IntEnum
from typing import TypeVar, Optional
class EncodingTypeEnum(IntEnum):
Encoding_Auto = 0,
Encoding_10Bit = 1,
Encoding_32Bit = 2
class LaneCountEnum(IntEnum):
Auto = 0,
Lane_1 = 1,
Lane_2 = 2,
Lane_4 = 4
class TriggerPosition(IntEnum):
"""
Trigger position relative to the start of capture, in percent of total capture size.
- TP_Start 0%
- TP_25 25%
- TP_50 50%
- TP_75 75%
- TP_End - 100%
"""
TP_Start = 0
TP_25 = 1
TP_50 = 2
TP_75 = 3
TP_End = 4
class TriggerTypeEnum:
"""
Class `TriggerTypeEnum` contains all necessary enum types for describing values.
"""
class SourceType(IntEnum):
TPS1 = 0
TPS2 = 1
TPS3 = 2
TPS4 = 3
class SourceTypePosition(IntEnum):
InitialLT = 0
AfterALPM = 1
InitialLTORAfterALPM = 2
class SourceMLPHY(IntEnum):
Standby = 0
Sleep = 1
class SourceVBIDWithMask(IntEnum):
AnyVB_IDChange = 0
VB_IDMatchWithMask = 1
ChangeAnyBitSetInMask = 2
class SourceVBID(IntEnum):
BS = 0
SR = 1
CPBS = 2
CPSR = 3
class SDPTypeReceived(IntEnum):
MatchHB0 = 1
MatchHB1 = 2
MatchHB0AndHB1 = 3
class MSA(IntEnum):
AnyMSAChange = 0
ChangeMSAAttribute = 1
MatchMSAAttribute = 2
class Error8b_10b(IntEnum):
CodeError = 0
DisparityError = 0
Both = 0
class TypeAUX(IntEnum):
NativeWrite = 0x8
NativeRead = 0x9
class TriggerType:
"""
Main class `TriggerType` defines possible variants of trigger types.
- `U1` - Start of TPS1/TPS2/TPS3/TPS4 - initial LT, after ALPM exit, both.
- `U2` - Exit of TPS1/TPS2/TPS3/TPS4 - initial LT, after ALPM exit, both.
- `U3` - Start of ML_PHY_STANDBY or ML_PHY_SLEEP.
- `U4` - Exit of ML_PHY_STANDBY or ML_PHY_SLEEP.
- `U5` - Start of EIEOS - initial LT, after ALPM exit, both.
- `U6` - Exit of EIEOS - initial LT, after ALPM exit, both.
- `U7` - VB-ID with the MASK - any change, match, selected bit transition.
- `U8` - VB-ID on TYPE - BS/SR/CPBS/CPSR.
- `U9` - Up to 8 control or data symbols, 8b/10b encoded.
- `U10` - SDP Type received HB0 and/or HB1 match.
- `U11` - MSA any change, change by mask, match by mask.
- `U12` - 8b/10b error code error, disparity error, both.
- `U13` - Any AUX transaction - initial LT, after ALPM exit, both.
- `U17` - AUX read or write of specific address.
"""
class U1:
"""
Class `U1` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source type `source_type` - `TriggerTypeEnum.SourceType`.
- Set and get position `position` - `TriggerTypeEnum.SourceTypePosition.
"""
def __init__(self):
self.__trigger_mask = 1
self.__source_type = TriggerTypeEnum.SourceType.TPS1
self.__position = TriggerTypeEnum.SourceTypePosition.InitialLT
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source_type(self) -> TriggerTypeEnum.SourceType:
"""
Returns source type.
Returns:
object of `TriggerTypeEnum.SourceType` value
"""
return self.__source_type
@property
def position(self) -> TriggerTypeEnum.SourceTypePosition:
"""
Returns position.
Returns:
object of `TriggerTypeEnum.SourceTypePosition` value
"""
return self.__position
@source_type.setter
def source_type(self, value: TriggerTypeEnum.SourceType):
self.__source_type = value
@position.setter
def position(self, value: TriggerTypeEnum.SourceTypePosition):
self.__position = value
class U2:
"""
Class `U2` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source type `source_type` - `TriggerTypeEnum.SourceType`.
- Set and get position `position` - `TriggerTypeEnum.SourceTypePosition.
"""
def __init__(self):
self.__trigger_mask = 1 << 1
self.__source_type = TriggerTypeEnum.SourceType.TPS1
self.__position = TriggerTypeEnum.SourceTypePosition.InitialLT
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source_type(self) -> TriggerTypeEnum.SourceType:
"""
Returns source type.
Returns:
object of `TriggerTypeEnum.SourceType` value
"""
return self.__source_type
@property
def position(self) -> TriggerTypeEnum.SourceTypePosition:
"""
Returns position.
Returns:
object of `TriggerTypeEnum.SourceTypePosition` value
"""
return self.__position
@source_type.setter
def source_type(self, value: TriggerTypeEnum.SourceType):
self.__source_type = value
@position.setter
def position(self, value: TriggerTypeEnum.SourceTypePosition):
self.__position = value
class U3:
"""
Class `U3` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source `source` - `TriggerTypeEnum.SourceMLPHY`.
"""
def __init__(self):
self.__trigger_mask = 1 << 2
self.__source = TriggerTypeEnum.SourceMLPHY.Standby
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.SourceMLPHY:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.SourceMLPHY` value
"""
return self.__source
@source.setter
def source(self, value: TriggerTypeEnum.SourceMLPHY):
self.__source = value
class U4:
"""
Class `U4` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source `source` - `TriggerTypeEnum.SourceMLPHY`.
"""
def __init__(self):
self.__trigger_mask = 1 << 3
self.__source = TriggerTypeEnum.SourceMLPHY.Standby
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.SourceMLPHY:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.SourceMLPHY` value
"""
return self.__source
@source.setter
def source(self, value: TriggerTypeEnum.SourceMLPHY):
self.__source = value
class U5:
"""
Class `U5` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get position `position` - `TriggerTypeEnum.SourceTypePosition`.
"""
def __init__(self):
self.__trigger_mask = 1 << 4
self.__position = TriggerTypeEnum.SourceTypePosition.InitialLT
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def position(self) -> TriggerTypeEnum.SourceTypePosition:
"""
Returns position.
Returns:
object of `TriggerTypeEnum.SourceTypePosition` value
"""
return self.__position
@position.setter
def position(self, value: TriggerTypeEnum.SourceTypePosition):
self.__position = value
class U6:
"""
Class `U6` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get position `position` - `TriggerTypeEnum.SourceTypePosition`.
"""
def __init__(self):
self.__trigger_mask = 1 << 5
self.__position = TriggerTypeEnum.SourceTypePosition.InitialLT
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def position(self) -> TriggerTypeEnum.SourceTypePosition:
"""
Returns position.
Returns:
object of `TriggerTypeEnum.SourceTypePosition` value
"""
return self.__position
@position.setter
def position(self, value: TriggerTypeEnum.SourceTypePosition):
self.__position = value
class U7:
"""
Class `U7` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get mask `mask`.
- Set and get source `source` - `TriggerTypeEnum.SourceVBIDWithMask`.
"""
def __init__(self):
self.__trigger_mask = 1 << 6
self.__source = TriggerTypeEnum.SourceVBIDWithMask.AnyVB_IDChange
self.__mask = 0
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.SourceVBIDWithMask:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.SourceVBIDWithMask` value
"""
return self.__source
@property
def mask(self) -> int:
"""
Returns mask.
Returns:
object of int value
"""
return self.__mask
@source.setter
def source(self, value: TriggerTypeEnum.SourceVBIDWithMask):
self.__source = value
@mask.setter
def mask(self, value: int):
if value < 0:
raise ValueError("Mask must be more than 0.")
self.__mask = value
class U8:
"""
Class `U8` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source `source` - `TriggerTypeEnum.SourceVBID`.
"""
def __init__(self):
self.__trigger_mask = 1 << 7
self.__source = TriggerTypeEnum.SourceVBID.BS
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.SourceVBID:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.SourceVBID` value
"""
return self.__source
@source.setter
def source(self, value: TriggerTypeEnum.SourceVBID):
self.__source = value
class U9:
"""
Class `U9` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get value for count of symbols `count`.
- Set and get value for symbol 0 `symbol_0`.
- Set and get value for symbol 1 `symbol_1`.
- Set and get value for symbol 2 `symbol_2`.
- Set and get value for symbol 3 `symbol_3`.
- Set and get value for symbol 4 `symbol_4`.
- Set and get value for symbol 5 `symbol_5`.
- Set and get value for symbol 6 `symbol_6`.
- Set and get value for symbol 7 `symbol_7`.
"""
def __init__(self):
self.__trigger_mask = 1 << 8
self.__symbol_0 = 0
self.__symbol_1 = 0
self.__symbol_2 = 0
self.__symbol_3 = 0
self.__symbol_4 = 0
self.__symbol_5 = 0
self.__symbol_6 = 0
self.__symbol_7 = 0
self.__count = 0
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def count(self) -> int:
"""
Returns count of symbols.
Returns:
object of int value
"""
return self.__count
@count.setter
def count(self, value: int):
if value < 0:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__count = value
@property
def symbol_0(self) -> int:
"""
Returns value of symbol 0.
Returns:
object of int value
"""
return self.__symbol_0
@symbol_0.setter
def symbol_0(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_0 = value
@property
def symbol_1(self) -> int:
"""
Returns value of symbol 1.
Returns:
object of int value
"""
return self.__symbol_1
@symbol_1.setter
def symbol_1(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_1 = value
@property
def symbol_2(self) -> int:
"""
Returns value of symbol 2.
Returns:
object of int value
"""
return self.__symbol_2
@symbol_2.setter
def symbol_2(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_2 = value
@property
def symbol_3(self) -> int:
"""
Returns value of symbol 3.
Returns:
object of int value
"""
return self.__symbol_3
@symbol_3.setter
def symbol_3(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_3 = value
@property
def symbol_4(self) -> int:
"""
Returns value of symbol 4.
Returns:
object of int value
"""
return self.__symbol_4
@symbol_4.setter
def symbol_4(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_4 = value
@property
def symbol_5(self) -> int:
"""
Returns value of symbol 5.
Returns:
object of int value
"""
return self.__symbol_5
@symbol_5.setter
def symbol_5(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_5 = value
@property
def symbol_6(self) -> int:
"""
Returns value of symbol 6.
Returns:
object of int value
"""
return self.__symbol_6
@symbol_6.setter
def symbol_6(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_6 = value
@property
def symbol_7(self) -> int:
"""
Returns value of symbol 7.
Returns:
object of int value
"""
return self.__symbol_7
@symbol_7.setter
def symbol_7(self, value: int):
if value < 0 or len(str(value)) >= 5:
raise ValueError("Mask must be more than 0 and length must be less than 5")
self.__symbol_7 = value
class U10:
"""
Class `U10` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get SDP type `sdp_type` - `TriggerTypeEnum.SDPTypeReceived`.
- Set and get value for HB 0 `hb0`.
- Set and get value for HB 1 `hb1`.
"""
def __init__(self):
self.__trigger_mask = 1 << 9
self.__sdp_type = TriggerTypeEnum.SDPTypeReceived.MatchHB0
self.__hb0 = 0
self.__hb1 = 0
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def sdp_type(self) -> TriggerTypeEnum.SDPTypeReceived:
"""
Returns SDP type.
Returns:
object of `TriggerTypeEnum.SDPTypeReceived` value
"""
return self.__sdp_type
@sdp_type.setter
def sdp_type(self, value: TriggerTypeEnum.SDPTypeReceived):
self.__sdp_type = value
@property
def hb0(self) -> int:
"""
Returns value of HB0.
Returns:
object of int value
"""
return self.__hb0
@hb0.setter
def hb0(self, value: int):
if value < 0:
raise ValueError("Mask must be more than 0.")
self.__hb0 = value
@property
def hb1(self) -> int:
"""
Returns value of HB1.
Returns:
object of int value
"""
return self.__hb1
@hb1.setter
def hb1(self, value: int):
if value < 0:
raise ValueError("Mask must be more than 0.")
self.__hb1 = value
class U11:
"""
Class `U11` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source `source` - `TriggerTypeEnum.MSA`.
- Set and get MSa attributes (mvid, nvid, hactive, vactive and so on.)
"""
def __init__(self):
self.__trigger_mask = 1 << 10
self.__source = TriggerTypeEnum.MSA.AnyMSAChange
self.mvid_flag = False
self.nvid_flag = False
self.hactive_flag = False
self.vactive_flag = False
self.htotal_flag = False
self.vtotal_flag = False
self.hsyncw_flag = False
self.vsyncw_flag = False
self.hsyncp_flag = False
self.vsyncp_flag = False
self.hsyncs_flag = False
self.vsyncs_flag = False
self.misc0_flag = False
self.misc1_flag = False
self.mvid = 0
self.nvid = 0
self.hactive = 0
self.vactive = 0
self.htotal = 0
self.vtotal = 0
self.hsyncw = 0
self.vsyncw = 0
self.hsyncs = 0
self.vsyncs = 0
self.misc0 = 0
self.misc1 = 0
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.MSA:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.MSA` value
"""
return self.__source
@source.setter
def source(self, value: TriggerTypeEnum.MSA):
self.__source = value
class U12:
"""
Class `U12` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get source `source` - `TriggerTypeEnum.Error8b_10b`.
"""
def __init__(self):
self.__trigger_mask = 1 << 11
self.__source = TriggerTypeEnum.Error8b_10b.CodeError
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def source(self) -> TriggerTypeEnum.Error8b_10b:
"""
Returns source.
Returns:
object of `TriggerTypeEnum.Error8b_10b` value
"""
return self.__source
@source.setter
def source(self, value: TriggerTypeEnum.Error8b_10b):
self.__source = value
class U13:
"""
Class `U13` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get position `position` - `TriggerTypeEnum.SourceTypePosition`.
"""
def __init__(self):
self.__trigger_mask = 1 << 12
self.__position = TriggerTypeEnum.SourceTypePosition.InitialLT
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def position(self) -> TriggerTypeEnum.SourceTypePosition:
"""
Returns position.
Returns:
object of `TriggerTypeEnum.SourceTypePosition` value
"""
return self.__position
@position.setter
def position(self, value: TriggerTypeEnum.SourceTypePosition):
self.__position = value
class U17:
"""
Class `U17` describes one of the possible trigger type. Allows:
- Get trigger mask `trigger_mask`.
- Set and get address `address`.
- Set and get type `type` - `TriggerTypeEnum.TypeAUX`.
"""
def __init__(self):
self.__trigger_mask = 1 << 16
self.__address = 0
self.__type = TriggerTypeEnum.TypeAUX.NativeWrite
@property
def trigger_mask(self) -> int:
"""
Returns trigger mask.
Returns:
object of int value
"""
return self.__trigger_mask
@property
def address(self) -> int:
"""
Returns address.
Returns:
object of int value
"""
return self.__address
@address.setter
def address(self, value: int):
if value < 0:
raise ValueError("Mask must be more than 0.")
self.__address = value
@property
def type(self) -> TriggerTypeEnum.TypeAUX:
"""
Returns type.
Returns:
object of `TriggerTypeEnum.TypeAUX` value
"""
return self.__type
@type.setter
def type(self, value: TriggerTypeEnum.TypeAUX):
self.__type = value
TriggerVarType = TypeVar("TriggerVarType", TriggerType.U1, TriggerType.U2, TriggerType.U3, TriggerType.U4,
TriggerType.U5, TriggerType.U6, TriggerType.U7, TriggerType.U8, TriggerType.U9,
TriggerType.U10, TriggerType.U11, TriggerType.U12, TriggerType.U13, TriggerType.U17)
def _get_trigger_value(value: Optional[TriggerVarType], caps: int):
trigger_config = [0] * 39
trigger_config_ext = [0] * 35
if value is None:
return 0, trigger_config, trigger_config_ext
if isinstance(value, TriggerType.U1) and caps & 0x1:
trigger_config[0] |= value.source_type.value
trigger_config[0] |= (value.position.value << 4)
if isinstance(value, TriggerType.U2) and (caps >> 1) & 0x1:
trigger_config[1] |= value.source_type.value
trigger_config[1] |= (value.position.value << 4)
if isinstance(value, TriggerType.U3) and (caps >> 2) & 0x1:
trigger_config[2] |= value.source.value
if isinstance(value, TriggerType.U4) and (caps >> 3) & 0x1:
trigger_config[3] |= value.source.value
if isinstance(value, TriggerType.U5) and (caps >> 4) & 0x1:
trigger_config[4] |= value.position.value << 4
if isinstance(value, TriggerType.U6) and (caps >> 5) & 0x1:
trigger_config[5] |= value.position.value << 4
if isinstance(value, TriggerType.U7) and (caps >> 6) & 0x1:
trigger_config[6] |= value.source.value
trigger_config[6] |= value.mask.value << 8
if isinstance(value, TriggerType.U8) and (caps >> 7) & 0x1:
trigger_config[7] |= value.source.value
if isinstance(value, TriggerType.U9) and (caps >> 8) & 0x1:
trigger_config[8] |= value.count
if value.symbol_0:
trigger_config_ext[0] |= value.symbol_0
if value.symbol_1:
trigger_config_ext[0] |= (value.symbol_1 << 16)
if value.symbol_0:
trigger_config_ext[1] |= value.symbol_2
if value.symbol_0:
trigger_config_ext[1] |= (value.symbol_3 << 16)
if value.symbol_0:
trigger_config_ext[2] |= value.symbol_4
if value.symbol_0:
trigger_config_ext[2] |= (value.symbol_5 << 16)
if value.symbol_0:
trigger_config_ext[3] |= value.symbol_6
if value.symbol_0:
trigger_config_ext[3] |= (value.symbol_7 << 16)
if isinstance(value, TriggerType.U10) and (caps >> 10) & 0x1:
trigger_config[9] |= value.sdp_type.value
trigger_config[9] |= (value.hb0 << 8)
trigger_config[9] |= (value.hb1 << 16)
if isinstance(value, TriggerType.U11) and (caps >> 11) & 0x1:
trigger_config[10] |= value.source.value
trigger_config[10] |= value.source
trigger_config[10] |= (1 << 8) if value.mvid_flag else 0
trigger_config[10] |= (1 << 9) if value.nvid_flag else 0
trigger_config[10] |= (1 << 10) if value.htotal_flag else 0
trigger_config[10] |= (1 << 11) if value.vtotal_flag else 0
trigger_config[10] |= (1 << 12) if value.hactive_flag else 0
trigger_config[10] |= (1 << 13) if value.vactive_flag else 0
trigger_config[10] |= (1 << 14) if value.hsyncw_flag else 0
trigger_config[10] |= (1 << 15) if value.vsyncw_flag else 0
trigger_config[10] |= (1 << 16) if value.hsyncp_flag else 0
trigger_config[10] |= (1 << 17) if value.vsyncp_flag else 0
trigger_config[10] |= (1 << 18) if value.hsyncs_flag else 0
trigger_config[10] |= (1 << 19) if value.vsyncs_flag else 0
trigger_config[10] |= (1 << 20) if value.misc0_flag else 0
trigger_config[10] |= (1 << 21) if value.misc1_flag else 0
if value.mvid_flag:
trigger_config_ext[4] |= value.mvid & 0xFFFFFF
if value.nvid_flag:
trigger_config_ext[5] |= value.nvid & 0xFFFFFF
if value.htotal_flag:
trigger_config_ext[6] |= value.htotal & 0xFFFF
if value.vtotal_flag:
trigger_config_ext[6] |= ((value.vtotal & 0xFFFF) << 16)
if value.hactive_flag:
trigger_config_ext[7] |= value.hactive & 0xFFFF
if value.vactive_flag:
trigger_config_ext[7] |= ((value.vactive & 0xFFFF) << 16)
if value.hsyncw_flag:
trigger_config_ext[8] |= value.hsyncw & 0x7FFF
if value.hsyncp_flag:
trigger_config_ext[8] |= (value.hsyncp_flag << 15)
if value.vsyncw_flag:
trigger_config_ext[8] |= ((value.vsyncw & 0x7FFF) << 16)
if value.vsyncp_flag:
trigger_config_ext[8] |= (value.vsyncp_flag << 31)
if value.hsyncs_flag:
trigger_config_ext[9] |= value.hsyncs & 0xFFFF
if value.vsyncs_flag:
trigger_config_ext[9] |= ((value.vsyncs & 0xFFFF) << 16)
if value.misc0_flag:
trigger_config_ext[10] |= value.misc0 & 0xFF
if value.misc1_flag:
trigger_config_ext[10] |= ((value.misc1 & 0xFF) << 8)
if isinstance(value, TriggerType.U12) and (caps >> 11) & 0x1:
trigger_config[11] |= value.position
if isinstance(value, TriggerType.U13) and (caps >> 12) & 0x1:
trigger_config[12] |= (value.position << 4)
if isinstance(value, TriggerType.U17) and (caps >> 16) & 0x1:
trigger_config[16] |= (value.type.value << 24) | value.address
return value.trigger_mask, trigger_config, trigger_config_ext

View File

@@ -0,0 +1,45 @@
from ctypes import Structure, c_uint8, c_uint16, c_uint32, c_uint64
class BulkHeader(Structure):
class Attributes(Structure):
_fields_ = [
('LINK_BW', c_uint8),
('LANE_COUNT', c_uint8, 4),
('Packing1', c_uint8, 1),
('Packing', c_uint8, 2),
('MST', c_uint8, 1),
('n_val', c_uint16),
]
_fields_ = [
('Sync', c_uint32),
('Tns', c_uint32),
('SourcId', c_uint32),
('Length', c_uint32),
('TimeStamp', c_uint64),
('Sync', c_uint32),
('Attribute', Attributes)
]
class CaptureCaps(Structure):
_fields_ = [
('triggers', c_uint32, 1),
('multipleTriggersSelection', c_uint32, 1),
('multipleTriggersAndingSelection', c_uint32, 1),
('alpmTriggers', c_uint32, 1),
('', c_uint32, 4),
('grab128_132', c_uint32, 1),
('', c_uint32, 3),
("bitGrabWidth", c_uint32, 4),
("", c_uint32, 16)
]
class SBlock(Structure):
_fields_ = [
('BLOCK', c_uint32),
('OFFSET', c_uint64),
('SIZE', c_uint64),
]

View File

@@ -0,0 +1,60 @@
import os
import warnings
import shutil
from ctypes import sizeof
from datetime import datetime
from UniTAP.dev.modules.capturer.result_object import ResultObject
from UniTAP.dev.modules.capturer.types import CapturedDataType
from UniTAP.dev.ports.modules.capturer.bulk.private_bulk_types import BulkHeader
class ResultBulkObject(ResultObject):
"""
Class `ResultBulkObject` inherited from class `ResultObject`.
Class `ResultBulkObject` allows saving captured data to file `save_to_bin_file`.
Also has all the `ResultObject` functionality.
"""
def __init__(self, assume_scrambler_disabled: bool = False):
super().__init__()
self.assume_scrambler_disabled = assume_scrambler_disabled
def save_to_bin_file(self, directory_name: str):
"""
Saving captured bulk data to file.
Args:
directory_name (str) - path to save
"""
if len(self.buffer) > 0:
if not os.path.exists(directory_name):
os.makedirs(directory_name)
else:
shutil.rmtree(directory_name)
os.makedirs(directory_name)
main_link_file = f'capture_{datetime.now().strftime("%Y%m%d_%H%M%S")}.mainlink.bin'
events_file = f'capture_{datetime.now().strftime("%Y%m%d_%H%M%S")}.events.bin'
for data in self.buffer:
if data.type == CapturedDataType.Event:
e_file = open(os.path.join(directory_name, events_file), 'a+b')
sync = 0x0B41550B
e_file.write(sync.to_bytes(length=4, byteorder='little'))
e_file.write(0x0.to_bytes(length=4, byteorder='little'))
e_file.write(int(data.data[13] & 0xF).to_bytes(length=4, byteorder='little'))
e_file.write(int((len(data.data) - 12) / 4).to_bytes(length=4, byteorder='little'))
e_file.write(data.data[12:])
elif data.type == CapturedDataType.Bulk:
b_file = open(os.path.join(directory_name, main_link_file), 'a+b')
if len(data.data) > sizeof(BulkHeader):
header = BulkHeader.from_buffer(data.data)
if self.assume_scrambler_disabled:
header.Attribute.Packing |= 0x2
b_file.write(bytearray(header))
b_file.write(data.data[sizeof(BulkHeader):])
else:
warnings.warn("Buffer size is equal 0, without bulk data.")
def __str__(self):
return f"Start capture time: {self.start_capture_time}\n" \
f"End capture time: {self.end_capture_time}\n" \
f"Timestamp: {self.timestamp.__str__()}\n"

View File

@@ -0,0 +1,2 @@
from .event_types import EventSDP, EventLinkPattern, EventVBID, EventMSA, EventInfoFrame, EventLCE, EventFileFormat, \
EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc

View File

@@ -0,0 +1,205 @@
import time
import warnings
import copy
from typing import Union, TypeVar, Type, List
from UniTAP.libs.lib_tsi.tsi import TSIX_TS_SetPortConfigItem
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.capturer.statuses import EventCaptureStatus
from .result_event import ResultEventObject
from .event_types import EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc, EventData
from UniTAP.version import __version__
EventFilterType = TypeVar("EventFilterType",
EventFilterDpRx,
EventFilterDpTx,
EventFilterHdRx,
EventFilterHdTx,
EventFilterUsbc
)
class EventCapturer:
"""
Class `EventCapturer` allows working with capturing events on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of capturing
`capture_result`.
"""
def __init__(self, capturer: Capturer, port_id: int, event_filter: list):
self.__device = capturer.device
self.__capturer = capturer
self.__status = EventCaptureStatus.Unknown
device_name, serial_number, fw_version, front_end, memory_size = \
capturer.device.get_prepared_fw_info().split("|")
self.__fw_info = {"bundle_version": capturer.device.get_bundle_version(),
"app_version": __version__,
"frontend_version": front_end,
"memory_size": memory_size,
"ucd_device": f"{device_name} [{serial_number.replace(' ','')}]",
"ucd_fw_version": fw_version}
self.__result = ResultEventObject(self.__fw_info)
self.__event_filter = event_filter
self.__port_id = port_id
@property
def status(self) -> EventCaptureStatus:
"""
Returns current event capturer status.
Returns:
object of `VideoCaptureStatus` type
"""
return self.__capturer.event_capturer_status
@property
def capture_result(self) -> ResultEventObject:
"""
Returns result of event capturing.
Returns:
object of `ResultEventObject` type
"""
return self.__result
def event_filter(self, event_filter_type: Type[EventFilterType]) -> EventFilterType:
"""
Returns event filter for current `EventCapturer`.
Returns:
object of one of available [EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx,
EventFilterUsbc] type
"""
for item in self.__event_filter:
if isinstance(item, event_filter_type):
return item
available_variants = '\n'.join([str(type(e)) for e in self.__event_filter])
raise TypeError(f"Unsupported type of EventFilter: {event_filter_type}. Available variants:\n"
f"{available_variants}")
def configure_capturer(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx,
EventFilterUsbc]):
"""
Configure
Args:
event_filter (Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx, EventFilterHdTx, EventFilterUsbc])
"""
if not self.__check_event_filter_type(event_filter):
available_variants = '\n'.join([str(type(e)) for e in self.__event_filter])
raise TypeError(f"Unsupported type of EventFilter: {type(event_filter)}. "
f"Available variants:\n{available_variants}")
for i, item in enumerate(self.__event_filter):
if isinstance(item, type(event_filter)):
self.__event_filter[i] = event_filter
for item in event_filter.additional_filter:
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item.ci_control, item.value,
data_count=len(item.value) if isinstance(item.value, list) else 1)
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, event_filter._control_ci,
event_filter.config)
def clear_capturer_config(self):
"""
Clear event captuter configuration (filter).
"""
for item in self.__event_filter:
item.clear()
TSIX_TS_SetPortConfigItem(self.__device.device_handle, self.__port_id, item._control_ci, 0)
def start(self, sec=0, n_elements=0):
"""
Start capturing. Possible some variants of capturing: \n
- Capture with fixed event count (will be captured fixed event count and capturing will be stopped)
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
- Capture without parameters Live capturing (for getting events you need to use functions `pop_element` and
`pop_all_elements`). Here you need to manually call the `stop` after capture.
All results can be obtained using the function `capture_result`.
Args:
n_elements (int)
sec (int)
"""
config = CaptureConfig()
config.event = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.start_event_capture()
self.__capturer.start_capture(config)
self.__result.clear()
self.__result.start_capture_time = time.time()
if n_elements > 0:
self.__result.buffer.extend(self.__capturer.capture_n_events(n_elements))
self.stop()
elif sec > 0:
time.sleep(sec)
self.stop()
self.__result.buffer.extend(self.__capturer.read_all_events())
def stop(self):
"""
Stop capture events.
"""
config = CaptureConfig()
config.event = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.stop_event_capture()
self.__capturer.stop_capture(config)
self.__result.end_capture_time = time.time()
def pop_element(self) -> EventData:
"""
Return first captured object of `EventData`.
Returns:
object of `EventData` type or `ResultEventObject`
"""
if self.status == EventCaptureStatus.Stop:
warnings.warn("Event capture is not working now. Please, turn it on.")
captured_events = self.__capturer.capture_n_events(1)
self.__result.buffer.extend(copy.deepcopy(captured_events))
return captured_events[0]
def __get_events_count(self) -> int:
return self.__capturer.get_available_events_count()
def __check_event_filter_type(self, event_filter: Union[EventFilterDpRx, EventFilterDpTx, EventFilterHdRx,
EventFilterHdTx, EventFilterUsbc]) -> bool:
for item in self.__event_filter:
if isinstance(item, type(event_filter)):
return True
return False
def pop_all_elements(self) -> List[EventData]:
"""
Return all captured event frames(objects of `EventData`).
Returns:
object of list[`EventData`] type
"""
return self.__capturer.read_all_events()
def pop_all_elements_as_result_object(self) -> ResultEventObject:
"""
Return all captured event frames(objects of `EventData`) as `ResultEventObject`.
Returns:
object of `ResultEventObject` type
"""
captured_events = self.__capturer.read_all_events()
res = ResultEventObject(self.__fw_info)
res.buffer.extend(captured_events)
return res

View File

@@ -0,0 +1,235 @@
event_entry_template = """
var TxtEntry{$num} = [
'Event Details',
'{$num}&nbsp{$event_name} ',
'{$time}<br>Entry type {$entry_type}<br>Sent from {$send_from}<br>',
'<font face="Courier">{$event_data}</font></p>',]
var ShortTxtEntry{$num} = [
'<option style="color:{$text_color};background-color:{$back_color};{$font_family}{$font_italic}{$font_bold}">{$num_just} {$time}&nbsp&nbsp {$send_from_just} {$type_just} {$event_name} </option>',
'</p>']
var Entry{$num} = [
EventProto,
TxtEntry{$num},
ShortEventProto,
ShortTxtEntry{$num} ]
"""
event_table_row = '<option style="color:{$text_color};background-color:{$back_color}";{$font_family}{$font_italic}' \
'{$font_bold} value="{$num}">{$num_just} {$time}&nbsp&nbsp {$send_from_just} {$type_just} ' \
'{$event_name} </option>'
events_template = """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html><head><title>UCD Console event monitoring report</title>
<meta http-equiv="Content-Type" content="text/html; charset=Windows-1252">
<script type="text/javascript">
var tbl_InternalStyleSheet =
[
'<style>',
'TABLE.DOCUMENT{width: 100%;table-layout: fixed;}',
'TABLE.DOCUMENT TD.CONTENTS{border-right-style: solid;border-right-color: #000000;border-right-width: thin;table-layout: fixed;width: 55%;}',
'TABLE.DOCUMENT TD.DOC{table-layout: fixed; width: 45%; word-wrap: break-word;}',
'P.CONTENTS{font-weight: bold;font-family: "Arial";font-size: 10pt;margin-left: 0.3cm;margin-top: 0.5cm;margin-bottom: 0.3cm;color: #000000;}',
'P.CONTENTS SELECT{font-weight: normal;font-family: "Courier";font-size: 10pt;width: 98%;height: 500pt;margin-top: 0;text-align: left;}',
'P.CONTENTS BUTTON{width: 49%;height: 18pt;margin-top: 0.1cm;text-align: center;}',
'P.COMPANYNAME{margin-top: 0cm;margin-bottom: 0.5cm;margin-left: 0.9cm;font-weight: medium;font-family: "Arial";font-size: 14pt;color: #000000;}',
'P.COMPANYINFO{margin-top: 0cm;margin-bottom: 0.5cm;margin-left: 1.1cm;font-weight: medium;font-family: "Arial";font-size: 10pt;color: #000000;}',
'P.HEADING{background-color: #BBC3FF;margin-top: 0;margin-bottom: 0.5cm;margin-left: 0cm;font-weight: bold;font-family: "Arial";font-size: 16pt;color: #000000; word-wrap: break-word;}',
'P.SUBHEADING{background-color: #BBC3FF;margin-top: 1.25cm;margin-bottom: 0.5cm;margin-left: 0.5cm;margin-right: 0;font-weight: bold;font-family: "Arial";font-size: 11pt;color: #000000;text-transform: uppercase; word-wrap: break-word;}',
'P.BLUESUBHEADING{background-color: #BBC3FF;margin-top: 0.5cm;margin-bottom: 0.5cm;margin-left: 0.3cm;margin-right: 0;font-weight: bold;font-family: "Arial";font-size: 11pt;color: #000000; word-wrap: break-word;}',
'P.GREENSUBHEADING{background-color: #DCFABC;margin-top: 0.5cm;margin-bottom: 0.5cm;margin-left: 0.6cm; margin-right: 0; font-weight: bold;font-family: "Arial";font-size: 11pt;color: #000000; word-wrap: break-word;}',
'P.BODYTEXT{margin-top: 0;margin-bottom: 0;margin-left: 0.9cm;font-weight: medium;font-family: "Arial";font-size: 10pt;color: #000000; word-wrap: break-word;}',
'P.INDENTBODYTEXT{margin-top: 0;margin-bottom: 0.5cm;margin-left: 1.2cm;font-weight: medium;font-family: "Arial";font-size: 10pt;color: #000000;}',
'P.PRTGREEN{background-color: #DCFABC;margin-top: 0;margin-bottom: 0.05cm;margin-left: 0.6cm; margin-right: 0; font-weight: medium;font-family: "Courier";font-size: 11pt;color: #000000;}',
'P.PRTROW{margin-top: 0;margin-bottom: 0.05cm;margin-left: 0.6cm; margin-right: 0; font-weight: medium;font-family: "Courier";font-size: 11pt;color: #000000;}',
'P.PRTBODY{margin-top: 0;margin-bottom: 0.1cm;margin-left: 0.6cm;font-weight: medium;font-family: "Courier";font-size: 10pt;color: #000000;}',
'</style>'
]
var tbl_CompanyInfo =
[
'<p class="COMPANYINFO">',
'Created with <a href="http://www.unigraf.fi">Unigraf</a> UCD Console',
'</p>'
]
var tbl_InitialRight =
[
'<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;Transaction / Event details</p>',
'<p class="BODYTEXT">Select an entry from the list to see details</p>'
]
var PacketProto =
[
'<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;',
'</p><p class="BLUESUBHEADING">',
'</p><p class="GREENSUBHEADING">Info</p><p class="BODYTEXT">',
'</p><p class="GREENSUBHEADING">HEX Dump</p><p class="BODYTEXT">',
'</p><p class="GREENSUBHEADING">Content decoder</p><p class="BODYTEXT">'
]
var EventProto =
[
'<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;',
'</p><p class="BLUESUBHEADING">',
'</p><p class="GREENSUBHEADING">Info</p><p class="BODYTEXT">',
'</p><p class="GREENSUBHEADING">Content decoder</p><p class="BODYTEXT">'
]
var TraceProto =
[
'<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;',
'</p><p class="BLUESUBHEADING">',
'</p><p class="GREENSUBHEADING">Info</p><p class="BODYTEXT">',
'</p><p class="GREENSUBHEADING">Content decoder</p><p class="BODYTEXT">'
]
var ShortPacketProto =
[
'<p class="PRTROW" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;',
'<p class="PRTBODY">'
]
var ShortEventProto =
[
'<p class="PRTROW" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;',
'</p><p class="PRTBODY">'
]
var tbl_RepDetails =
[
'<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;Report details</p>',
'<p class="GREENSUBHEADING">Unigraf test equipment</p>',
'<p class="BODYTEXT">Device: {$ucd_device}<br></p>',
'<p class="BODYTEXT">Firmware Package: {$ucd_fw_version}<br></p>',
'<p class="BODYTEXT">Console Application: {$app_version}<br></p>',
'<p class="BODYTEXT">Software Bundle: {$bundle_version}<br></p>',
'<p class="BODYTEXT">Frontend Info: {$frontend_version}<br></p>',
'<p class="BODYTEXT">Memory size: {$memory_size}<br><br></p>',
'<p class="GREENSUBHEADING">Remarks</p>',
'<p class="BODYTEXT">{$remarks}</p>',
'<p class="GREENSUBHEADING">Tested by</p>',
'<p class="BODYTEXT">{$tested}</p>',
]
{$events_data}
var tbl_AllDetails = [
{$events_list}
]
function MakeLongEntry (iEntryTable)
{
var lp;
var res = '';
for (lp in iEntryTable[0])
{
res += iEntryTable [0] [lp];
res += iEntryTable [1] [lp];
}
return res;
}
function MakeShortEntry(iEntryTable)
{
var lp;
var res = '';
for (lp in iEntryTable[2])
{
res += iEntryTable [2] [lp];
res += iEntryTable [3] [lp];
}
return res;
}
function GenerateDocument (iTable)
{
var loop;
var result = '';
for (loop in iTable)
result += iTable [loop];
return result;
}
function ShowTransaction (iTable)
{
var loop;
document.getElementById ('document').innerHTML = '<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;&nbsp;Transaction details</p>';
for(loop in iTable)
if(loop != 0)
document.getElementById ('document').innerHTML += MakeLongEntry(iTable[loop]);
}
function ListHandler (iOptionValue)
{
document.getElementById ('document').innerHTML = MakeLongEntry (tbl_AllDetails [iOptionValue]);
}
function ShowPrinterFriendlyDocument ()
{
var loop;
document.open ();
document.write ('<p class="HEADING">&nbsp;&nbsp;{$report_name}</p>');
document.write (GenerateDocument (tbl_RepDetails));
document.write (GenerateDocument (tbl_InternalStyleSheet));
document.write ('<p class="GREENSUBHEADING">Unigraf Oy contact information</p>');
document.write (GenerateDocument (tbl_CompanyInfo));
document.write ('<br><br><p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">&nbsp;&nbsp;Events list</p><br>');
for(loop in tbl_AllDetails)
document.write (MakeShortEntry (tbl_AllDetails[loop]));
ShowReportDetails ();
document.close ();
}
function ShowReportDetails ()
{
document.getElementById ('document').innerHTML = GenerateDocument (tbl_RepDetails);
}
var styleloop;
for (styleloop in tbl_InternalStyleSheet)
{
document.write (tbl_InternalStyleSheet [styleloop]);
}
</script>
</head>
<body onload="document.getElementById ('document').innerHTML = GenerateDocument (tbl_RepDetails)">
<div>
<p class="HEADING" id="top">&nbsp;&nbsp;{$report_name}</p>
</div>
<div>
<table class="DOCUMENT" cellpadding="0" cellspacing="0">
<colgroup>
</colgroup>
<tr valign="top">
<td class="CONTENTS">
<p class="SUBHEADING" style="margin-left: 0; margin-top: 0;">
&nbsp;&nbsp;&nbsp;Transactions and events
</p>
<p class="CONTENTS">
Click entry to see details</p>
<p class="CONTENTS">
&nbsp# <span style='padding-left:95px;'></span>Timestamp<span style='padding-left:62px;'></span>Source <span style='padding-left:40px;'> </span>Type <span style='padding-left:103px;'> </span> Info
<select id="testlist" size="2" onchange="ListHandler (document.getElementById ('testlist').value)" onkeyup="ListHandler (document.getElementById ('testlist').value)">
{$events_table}
</select>
<button onclick="ShowPrinterFriendlyDocument ()">Show printer friendly format ...</button>
<button onclick="ShowReportDetails ()">Show report information ...</button><br><br>
<script type="text/javascript">
document.write (GenerateDocument (tbl_CompanyInfo));
</script>
</p>
</td>
<td class="DOC">
<div id="document">
&nbsp;
</div>
</td>
</tr>
</table>
</div>
</body>
</html>
"""
packet_entry_template = """
var TxtEntry{$num} = [
'Event Details',
'{$num}&nbsp{$event_name} ',
'{$time}<br>Entry type {$entry_type}<br>Sent from {$send_from}<br>',
'{$hex}',
'<font face="Courier">{$event_data}</font></p>',]
var ShortTxtEntry{$num} = [
'<div style="color:{$text_color};margin-left: 0; margin-top: 0;background-color:{$back_color};{$font_family}{$font_italic}{$font_bold}">{$num_just} {$time}&nbsp&nbsp {$send_from_just} {$type_just} {$event_name}</div>',
'{$hex_just}</p>']
var Entry{$num} = [
PacketProto,
TxtEntry{$num},
ShortPacketProto,
ShortTxtEntry{$num} ]
"""

View File

@@ -0,0 +1,774 @@
from .event_utils import set_bit_in_vector, search_in_list
from .private_event_types import AdditionalSDPEventFilter, AdditionalLinkPatternEventFilter, \
AdditionalInfoFrameEventFilter, AdditionalVBIDEventFilter, AdditionalMSAEventFilter, AdditionalLSEEventFilter
from UniTAP.libs.lib_tsi.tsi_types import *
import warnings
class EventFileFormat(IntEnum):
"""
Describe all supported file formats for saving events:
- BIN.
- TXT (Support will be added later).
"""
UNKNOWN = -1
BIN = 0
TXT = 1
HTML = 2
CSV = 3
class EventSDP(IntEnum):
"""
Describe all supported SDP packets types:
"""
AudioTimeStamp = 0x1
AudioStream = 0x2
Extension = 0x4
AudioCopyManagement = 0x5
ISRC = 0x6
VSC = 0x7
CG0 = 0x8
CG1 = 0x9
CG2 = 0xA
CG3 = 0xB
CG4 = 0xC
CG5 = 0xD
CG6 = 0xE
CG7 = 0xF
PictureParamSet = 0x10
VSC_EXT_VESA = 0x20
VSC_EXT_CTA = 0x21
Adaptive_Sync = 0x22
VS = 0x81
AVI = 0x82
SPD = 0x83
Audio = 0x84
MPEG = 0x85
NTSC_VBI = 0x86
DRM = 0x87
class EventLinkPattern(IntEnum):
"""
Describe all supported Link Pattern packets types:
"""
TPS1Begin = 0
TPS1End = 1
TPS2Begin = 2
TPS2End = 3
TPS3Begin = 4
TPS3End = 5
TPS4Begin = 6
TPS4End = 7
IdleBegin = 10
IdleEnd = 11
ActiveBegin = 12
ActiveEnd = 13
SleepBegin = 14
SleepEnd = 15
StandbyBegin = 16
StandbyEnd = 17
EIEOSBegin = 18
EIEOSEnd = 19
CustomBegin = 20
CustomEnd = 21
Begin25201 = 22
End25201 = 23
Begin25202 = 24
End25202 = 25
PRBS7Begin = 26
PRBS7End = 27
PRBS31Begin = 28
PRBS31End = 29
class EventVBID(IntEnum):
"""
Describe all supported VBID packets types:
"""
SetVBlank = 0x1
ClearVBlank = 0x2
AnyVBlank = 0x3
SetFieldID = 0x4
CleatFieldID = 0x8
AnyFieldID = 0xC
SetInterface = 0x10
CleatInterface = 0x20
AnyInterface = 0x30
SetNoVideo = 0x40
CleatNoVideo = 0x80
AnyNoVideo = 0xC0
SetNoAudio = 0x100
CleatNoAudio = 0x200
AnyNoAudio = 0x300
SetHDCPSYNC = 0x400
CleatHDCPSYNC = 0x800
AnyHDCPSYNC = 0xC00
SetCompressed = 0x1000
CleatCompressed = 0x2000
AnyCompressed = 0x3000
SetReserved = 0x4000
CleatReserved = 0x8000
AnyReserved = 0xC000
MVID = 0x10000
MAUD = 0x20000
class EventMSA(IntEnum):
"""
Describe all supported MSA packets types:
"""
MVID = 0x1
NVID = 0x2
HTOTAL = 0x4
VTOTAL = 0x8
HSTART = 0x10
VSTART = 0x20
HSP = 0x40
HSW = 0x80
VSP = 0x100
VSW = 0x200
HWIDTH = 0x400
VHEIGHT = 0x800
MISC0 = 0x1000
MISC1 = 0x2000
class EventInfoFrame(IntEnum):
"""
Describe all supported Info Frame packets types:
"""
VS = 0x81
AVI = 0x82
SPD = 0x83
Audio = 0x84
MPEG = 0x85
NTSC_VBI = 0x86
DRM = 0x87
class EventData:
"""
Class `EventData` describe one event. Contains following information:
- `source`
- `type`
- `brief` info
- `info`
- `time`
- `duration`
- `data`
"""
def __init__(self):
self.__source = ""
self.__type = ""
self.__brief = ""
self.__info = ""
self.__time = 0
self.__duration = 0
self.__data = bytearray()
@property
def source(self):
return self.__source
@property
def type(self):
return self.__type
@property
def brief(self):
return self.__brief
@property
def info(self):
return self.__info
@property
def time(self):
return self.__time
@property
def duration(self):
return self.__duration
@property
def data(self):
return self.__data
@source.setter
def source(self, source: str):
self.__source = source
@type.setter
def type(self, _type: str):
self.__type = _type
@brief.setter
def brief(self, brief: str):
self.__brief = brief
@info.setter
def info(self, info: str):
self.__info = info
@time.setter
def time(self, time: int):
if time <= 0:
raise ValueError(f"Time must be more than 0.")
self.__time = time
@duration.setter
def duration(self, duration: int):
if duration <= 0:
raise ValueError(f"Duration must be more than 0.")
self.__duration = duration
@data.setter
def data(self, data: bytearray):
if len(data) <= 0:
raise ValueError(f"Data length must be more than 0.")
self.__data = data
class EventLCE:
"""
Describe settings for LCE USB-C events:
- `v_bus`
- `iv_bus`
- `vcc`
- `vsbu`
- `i_vconn`
"""
def __init__(self, v_bus: int, iv_bus: int, vcc: int, vsbu: int, i_vconn: int):
self.__v_bus = v_bus
self.__iv_bus = iv_bus
self.__vcc = vcc
self.__vsbu = vsbu
self.__i_vconn = i_vconn
@property
def v_bus(self):
return self.__v_bus
@property
def iv_bus(self):
return self.__iv_bus
@property
def vcc(self):
return self.__vcc
@property
def vsbu(self):
return self.__vsbu
@property
def i_vconn(self):
return self.__i_vconn
@v_bus.setter
def v_bus(self, v_bus: int):
if v_bus <= 0:
raise ValueError(f"V bus must be more than 0.")
self.__v_bus = v_bus
@iv_bus.setter
def iv_bus(self, iv_bus: int):
if iv_bus <= 0:
raise ValueError(f"I V bus must be more than 0.")
self.__iv_bus = iv_bus
@vcc.setter
def vcc(self, vcc: int):
if vcc <= 0:
raise ValueError(f"VCC must be more than 0.")
self.__vcc = vcc
@vsbu.setter
def vsbu(self, vsbu: int):
if vsbu <= 0:
raise ValueError(f"V sbu must be more than 0.")
self.__vsbu = vsbu
@i_vconn.setter
def i_vconn(self, i_vconn: int):
if i_vconn <= 0:
raise ValueError(f"I vconn must be more than 0.")
self.i_vconn = i_vconn
def values(self) -> list:
"""
Returns all values how list.
Returns:
object of list type
"""
return [self.v_bus, self.iv_bus, self.vcc, self.vsbu, self.i_vconn]
def _set_bit_in_link_pattern(enable, data, bit_no: EventLinkPattern):
_list = list(EventLinkPattern)
_ind = _list.index(bit_no)
if 0 <= _ind <= 7:
if enable:
data[0] |= (1 << bit_no.value)
else:
data[0] &= ~(1 << bit_no.value)
elif 8 <= _ind <= 17:
if enable:
data[1] |= (1 << (bit_no.value - 10))
else:
data[1] &= ~(1 << (bit_no.value - 10))
else:
if enable:
data[2] |= (1 << (bit_no.value - 20))
else:
data[2] &= ~(1 << (bit_no.value - 20))
class EventFilter:
"""
Base class of all filters.
"""
def __init__(self, hw_caps=None):
self.__config = 0
self._control_ci = 0
self._hw_caps = hw_caps
self.__additional_filter = []
@property
def config(self) -> int:
"""
Returns current configuration value of filter.
Returns:
object of int type
"""
return self.__config
@property
def additional_filter(self) -> list:
"""
Returns current additional filters for main filter.
Returns:
object of list type
"""
return self.__additional_filter
@config.setter
def config(self, config: int):
"""
Set configuration value of filter.
Args:
config (int)
"""
if config < 0:
raise ValueError(f"Config value must be more than 0.")
self.__config = config
@additional_filter.setter
def additional_filter(self, additional_filter: list):
"""
Set additional filters of main filter.
Args:
additional_filter (list)
"""
self.__additional_filter = additional_filter
def clear(self):
"""
Clear all config.
"""
self.config = 0
for item in self.additional_filter:
if isinstance(item.value, int):
item.value = 0
elif isinstance(item.value, list):
item.value = [0] * len(item.value)
class EventFilterDpRx(EventFilter):
"""
Class `EventFilterDpRx` allows setting filter for DPRX available events: `config_hpd_events`, `config_aux_events`,
`config_sdp_events`, `config_link_pattern_events`, `config_vb_id_events`, `config_msa_events`,
`config_aux_bw_events`.
Inherited from class `EventFilter`.
"""
def __init__(self, hw_caps):
super().__init__(hw_caps)
self._control_ci = TSI_DPRX_LOG_CONTROL
self.additional_filter = [AdditionalSDPEventFilter()]
if hw_caps.link_pat_log:
self.additional_filter.append(AdditionalLinkPatternEventFilter())
if hw_caps.vbid_hw_log:
self.additional_filter.append(AdditionalVBIDEventFilter())
if self._hw_caps.msa_hw_log:
self.additional_filter.append(AdditionalMSAEventFilter())
def config_hpd_events(self, enable: bool):
"""
Configure HDP events.
Args:
enable (bool) - enable/disable HDP events
"""
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_HPD
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_HPD
def config_aux_events(self, enable: bool):
"""
Configure AUX events.
Args:
enable (bool) - enable/disable AUX events
"""
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_AUX
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_AUX
def config_sdp_events(self, enable: bool, *args: EventSDP):
"""
Configure SDP events.
Args:
enable (bool) - enable/disable SDP events
*args (`EventSDP`) - SDP packet types
"""
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_SDP
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_SDP
if len(args) > 0:
for item in args:
if isinstance(item, EventSDP):
set_bit_in_vector(enable, self.additional_filter[0].value, item.value)
def config_link_pattern_events(self, enable: bool, *args: EventLinkPattern):
"""
Configure Link Pattern events.
Args:
enable (bool) - enable/disable Link Pattern events
*args (`EventLinkPattern`) - Link Pattern packet types
"""
if self._hw_caps is not None and self._hw_caps.link_pat_log:
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_LINK_PAT
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_LINK_PAT
if len(args) > 0:
for item in args:
if isinstance(item, EventLinkPattern):
_set_bit_in_link_pattern(enable,
self.additional_filter[search_in_list(AdditionalLinkPatternEventFilter,
self.additional_filter)].value,
item)
else:
warnings.warn("HW does not support Link Pattern Logger")
def config_vb_id_events(self, enable: bool, *args: EventVBID):
"""
Configure VB ID events.
Args:
enable (bool) - enable/disable VB ID events
*args (`EventVBID`) - VB ID packet types
"""
if self._hw_caps is not None and self._hw_caps.vbid_hw_log:
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_VBID_HW
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_VBID_HW
if len(args) > 0:
for item in args:
if isinstance(item, EventVBID):
if enable:
self.additional_filter[search_in_list(AdditionalVBIDEventFilter,
self.additional_filter)].value |= item.value
else:
self.additional_filter[search_in_list(AdditionalVBIDEventFilter,
self.additional_filter)].value &= ~item.value
else:
warnings.warn("HW does not support Vbid Hw Logger")
def config_msa_events(self, enable: bool, *args: EventMSA):
"""
Configure MSA events.
Args:
enable (bool) - enable/disable MSA events
*args (`EventMSA`) - MSA packet types
"""
if self._hw_caps is not None and self._hw_caps.msa_hw_log:
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_MSA_HW
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_MSA_HW
if len(args) > 0:
for item in args:
if isinstance(item, EventMSA):
if enable:
self.additional_filter[search_in_list(AdditionalMSAEventFilter,
self.additional_filter)].value |= item.value
else:
self.additional_filter[search_in_list(AdditionalMSAEventFilter,
self.additional_filter)].value &= ~item.value
else:
warnings.warn("HW does not support Msa Hw Logger")
def config_aux_bw_events(self, enable: bool):
"""
Configure AUX BW events.
Args:
enable (bool) - enable/disable AUX BW events
"""
if self._hw_caps is not None and self._hw_caps.aux_bw_log:
if enable:
self.config |= TSI_DPRX_LOG_CTRL_VALUE_AUX_BW
else:
self.config &= ~TSI_DPRX_LOG_CTRL_VALUE_AUX_BW
else:
warnings.warn("HW does not support Aux Bw Logger")
class EventFilterDpTx(EventFilter):
"""
Class `EventFilterDpTx` allows setting filter for DPTX available events: `config_hpd_events`, `config_aux_events`.
Inherited from class `EventFilter`.
"""
def __init__(self, hw_caps):
super().__init__(hw_caps)
self._control_ci = TSI_DPTX_LOG_CONTROL
def config_hpd_events(self, enable: bool):
"""
Configure HDP events.
Args:
enable (bool) - enable/disable HDP events
"""
if enable:
self.config |= TSI_DPTX_LOG_CONTROL_VALUE_HPD
else:
self.config &= ~TSI_DPTX_LOG_CONTROL_VALUE_HPD
def config_aux_events(self, enable: bool):
"""
Configure AUX events.
Args:
enable (bool) - enable/disable AUX events
"""
if enable:
self.config |= TSI_DPTX_LOG_CONTROL_VALUE_AUX
else:
self.config &= ~TSI_DPTX_LOG_CONTROL_VALUE_AUX
class EventFilterHdRx(EventFilter):
"""
Class `EventFilterHdRx` allows setting filter for HDRX available events: `config_hpd_events`,
`config_packets_events`, `config_i2c_events`, `config_cec_events`.
Inherited from class `EventFilter`.
"""
def __init__(self, hw_caps):
super().__init__(hw_caps)
self.additional_filter = [AdditionalInfoFrameEventFilter()]
self._control_ci = TSI_HDRX_LOG_CONTROL
def config_hpd_events(self, enable: bool):
"""
Configure HPD events.
Args:
enable (bool) - enable/disable HPD events
"""
if enable:
self.config |= TSI_HDRX_LOG_CTRL_VALUE_HPD
else:
self.config &= ~TSI_HDRX_LOG_CTRL_VALUE_HPD
def config_packets_events(self, enable: bool, *args: EventInfoFrame):
"""
Configure InfoFrame events.
Args:
enable (bool) - enable/disable InfoFrame events
*args (`EventInfoFrame`) - InfoFrame packet types
"""
if enable:
self.config |= TSI_HDRX_LOG_CTRL_VALUE_INFO
else:
self.config &= ~TSI_HDRX_LOG_CTRL_VALUE_INFO
if len(args) > 0:
for item in args:
if isinstance(item, EventInfoFrame):
set_bit_in_vector(enable, self.additional_filter[0].value, item.value)
def config_i2c_events(self, enable: bool):
"""
Configure I2C events.
Args:
enable (bool) - enable/disable I2C events
"""
if enable:
self.config |= TSI_HDRX_LOG_CTRL_VALUE_I2C
else:
self.config &= ~TSI_HDRX_LOG_CTRL_VALUE_I2C
def config_cec_events(self, enable: bool):
"""
Configure CEC events.
Args:
enable (bool) - enable/disable CEC events
"""
if enable:
self.config |= TSI_HDRX_LOG_CTRL_VALUE_CEC
else:
self.config &= ~TSI_HDRX_LOG_CTRL_VALUE_CEC
class EventFilterHdTx(EventFilter):
"""
Class `EventFilterHdTx` allows setting filter for HDTX available events: `config_hpd_events`, `config_i2c_events`,
`config_cec_events`.
Inherited from class `EventFilter`.
"""
def __init__(self, hw_caps):
super().__init__(hw_caps)
self._control_ci = TSI_HDTX_LOG_CONTROL
def config_hpd_events(self, enable: bool):
"""
Configure HPD events.
Args:
enable (bool) - enable/disable HPD events
"""
if enable:
self.config |= TSI_HDTX_LOG_CTRL_VALUE_HPD
else:
self.config &= ~TSI_HDTX_LOG_CTRL_VALUE_HPD
def config_i2c_events(self, enable: bool):
"""
Configure I2C events.
Args:
enable (bool) - enable/disable I2C events
"""
if enable:
self.config |= TSI_HDTX_LOG_CTRL_VALUE_I2C
else:
self.config &= ~TSI_HDTX_LOG_CTRL_VALUE_I2C
def config_cec_events(self, enable: bool):
"""
Configure CEC events.
Args:
enable (bool) - enable/disable CEC events
"""
if enable:
self.config |= TSI_HDTX_LOG_CTRL_VALUE_CEC
else:
self.config &= ~TSI_HDTX_LOG_CTRL_VALUE_CEC
class EventFilterUsbc(EventFilter):
"""
Class `EventFilterUsbc` allows setting filter for USB-C available events: `config_pd_events`,
`config_voltage_events`, `config_usbc_events`, `config_port_state_events`.
Inherited from class `EventFilter`.
"""
def __init__(self, hw_caps):
super().__init__(hw_caps)
self._control_ci = TSI_PDC_LOG_CONTROL
if hw_caps is not None and hw_caps.voltage:
self.additional_filter.append(AdditionalLSEEventFilter())
def config_pd_events(self, enable: bool):
"""
Configure PD events.
Args:
enable (bool) - enable/disable PD events
"""
if enable:
self.config |= TSI_PDC_LOG_CTRL_VALUE_PD
else:
self.config &= ~TSI_PDC_LOG_CTRL_VALUE_PD
def config_voltage_events(self, enable: bool, value: EventLCE):
"""
Configure LCE (voltage) events.
Args:
enable (bool) - enable/disable LCE events
value (`EventLCE`) - LCE packet types
"""
if self._hw_caps is not None and self._hw_caps.voltage:
if enable:
self.config |= TSI_PDC_LOG_CTRL_VALUE_USBC_VOLTAGE
else:
self.config &= ~TSI_PDC_LOG_CTRL_VALUE_USBC_VOLTAGE
for i in enumerate(value.values()):
self.additional_filter[search_in_list(AdditionalLSEEventFilter,
self.additional_filter)].value[i[0]] = i[1]
else:
warnings.warn("HW does not support 'LCE voltage' events")
def config_usbc_events(self, enable: bool):
"""
Configure USB-C events.
Args:
enable (bool) - enable/disable USB-C events
"""
if self._hw_caps is not None and self._hw_caps.events:
if enable:
self.config |= TSI_PDC_LOG_CTRL_VALUE_USBC_EVENT
else:
self.config &= ~TSI_PDC_LOG_CTRL_VALUE_USBC_EVENT
else:
warnings.warn("HW does not support 'USB-C' events")
def config_port_state_events(self, enable: bool):
"""
Configure USB-C state events.
Args:
enable (bool) - enable/disable USB-C state events
"""
if self._hw_caps is not None and self._hw_caps.states:
if enable:
self.config |= TSI_PDC_LOG_CTRL_VALUE_USBC_STATE
else:
self.config &= ~TSI_PDC_LOG_CTRL_VALUE_USBC_STATE
else:
warnings.warn("HW does not support 'USB-C Port State' events")

View File

@@ -0,0 +1,193 @@
import csv
import warnings
from .event_report_template import *
def set_bit_in_vector(enable, data, bit_no):
bits_in_word = 32
total_words = len(data)
selected_word = int(bit_no / bits_in_word)
bit_offset = bits_in_word - (((selected_word + 1) * bits_in_word) - bit_no)
if selected_word >= total_words:
warnings.warn(f"Bit {bit_no} is out of provided vector's range {total_words} words")
return
if enable:
data[selected_word] |= (1 << bit_offset)
else:
data[selected_word] &= ~(1 << bit_offset)
def search_in_list(_type, _list):
for item in enumerate(_list):
if isinstance(item[1], _type):
return item[0]
return None
def byterarray_to_hex_str(data: bytearray) -> str:
return ' '.join(format(x, '02x') for x in data)
def content_to_str(data: str) -> str:
data = data.replace("\r\n", "<br>")
data = data.replace("\n\r", "<br>")
data = data.replace("\r", "<br>")
data = data.replace("\n", "<br>")
data = data.replace("'", "\\'")
data = data.replace(" ", " &nbsp")
return data
def timestamp_to_str(value) -> str:
nsec = value % 1000
value -= nsec
value /= 1000
mcsec = value % 1000
value -= mcsec
value /= 1000
msec = value % 1000
value -= msec
value /= 1000
sec = value % 60
value -= sec
value /= 60
min_v = value % 60
value -= min_v
hour = value / 60
return "{:02d}:{:02d}:{:02d}.{:03d}.{:03d}.{:03d}".format(int(hour), int(min_v), int(sec), int(msec), int(mcsec),
int(nsec))
def save_to_txt_file(path: str, events: list, index=None):
file = open(path, 'w')
if index is not None:
if len(events[index]) == 6:
file.write(f"{events[index].get('content')}\n{events[index].get('brief')}\n"
f"{byterarray_to_hex_str(events[index].get('data'))}\n"
f"{events[index].get('source')}\n")
else:
warnings.warn("Empty event")
else:
for data in events:
if len(data) == 6:
file.write(f"{data.get('content')}\n{data.get('brief')}\n"
f"{byterarray_to_hex_str(data.get('data'))}\n"
f"{data.get('source')}\n")
else:
warnings.warn("Empty event")
file.close()
def save_to_bin_file(path: str, events: list, index=None):
file = open(path, 'wb')
if index is not None:
data = events[index].data
if len(data) > 14:
sync = 0x0B41550B
file.write(sync.to_bytes(length=4, byteorder='little'))
file.write(0x0.to_bytes(length=4, byteorder='little'))
file.write(int(data[13] & 0xF).to_bytes(length=4, byteorder='little'))
file.write(int((len(data) - 12) / 4).to_bytes(length=4, byteorder='little'))
file.write(data[12:])
else:
for data in events:
if len(data.data) > 14:
sync = 0x0B41550B
file.write(sync.to_bytes(length=4, byteorder='little'))
file.write(0x0.to_bytes(length=4, byteorder='little'))
file.write(int(data.data[13] & 0xF).to_bytes(length=4, byteorder='little'))
file.write(int((len(data.data) - 12) / 4).to_bytes(length=4, byteorder='little'))
file.write(data.data[12:])
file.close()
def save_to_csv_file(path: str, events: list):
with (open(path, 'w', newline='') as file):
writer = csv.writer(file, delimiter=';')
writer.writerow(["Source", "Type", "Start", "Brief info", "Full info"])
for data in events:
if len(data) == 6:
processed_info = data.get('content')[:-1] + '.'
processed_info = processed_info.replace("\n", "; ")
writer.writerow([
data.get('source'),
data.get('type'),
timestamp_to_str(data.get('timestamp')),
data.get('brief'),
f'"{processed_info}"'
])
else:
warnings.warn("Empty event")
def save_to_html_file(path: str, events: list, fw_info: dict, long_type_string: bool):
file = open(path, 'w')
new_events_template = events_template.replace("{$report_name}", "Unigraf UCD Console event monitoring report").\
replace("{$bundle_version}", fw_info.get('bundle_version')).\
replace("{$app_version}", fw_info.get('app_version')).\
replace("{$frontend_version}", fw_info.get('frontend_version')).\
replace("{$memory_size}", fw_info.get('memory_size')).\
replace("{$ucd_device}", fw_info.get('ucd_device')).\
replace("{$ucd_fw_version}", fw_info.get('ucd_fw_version')).\
replace("{$remarks}", '').\
replace("{$tested}", "")
events_data = []
entries_list = []
entries_rows = []
for index, item in enumerate(events):
new_entry_value = event_entry_template.replace("{$event_name}", item.get('brief')).\
replace("{$time}", f'{timestamp_to_str(item.get("timestamp"))}').\
replace("{$entry_type}", "").\
replace("{$send_from}", item.get('type')).\
replace("{$num}", f'{index}').\
replace("{$hex}", byterarray_to_hex_str(item.get('data'))).\
replace("{$hex_just}", byterarray_to_hex_str(item.get('data'))).\
replace("{$event_data}", content_to_str(item.get('content'))).\
replace("{$num_just}", f"{index}".ljust(6, " ").replace(" ", "&nbsp")).\
replace("{$send_from_just}", f"{item.get('source')}".ljust(10, " ").replace(" ", "&nbsp")).\
replace("{$type_just}", f"{item.get('type')}".ljust(16 if long_type_string else 8, " ").replace(" ", "&nbsp")).\
replace("{$text_color}", "#000000").\
replace("{$back_color}", '#ffffff').\
replace("{$font_bold}", "").\
replace("{$font_italic}", "").\
replace("{$font_family}", "")
new_event_table_row = event_table_row.replace("{$text_color}", "#000000").\
replace("{$back_color}", '#ffffff').\
replace("{$font_family}", "").\
replace("{$font_italic}", "").\
replace("{$font_bold}", "").\
replace("{$num}", f'{index}').\
replace("{$num_just}", f"{index}".ljust(6, " ").replace(" ", "&nbsp")).\
replace("{$time}", f'{timestamp_to_str(item.get("timestamp"))}').\
replace("{$send_from_just}", f"{item.get('source')}".ljust(10, " ").replace(" ", "&nbsp")).\
replace("{$type_just}", f"{item.get('type')}".ljust(16 if long_type_string else 8, " ").replace(" ", "&nbsp")).\
replace("{$event_name}", item.get('brief'))
events_data.append(new_entry_value)
entries_rows.append(new_event_table_row)
entries_list.append(f"Entry{index},")
new_events_template = new_events_template.replace("{$events_data}", "\n".join(e for e in events_data)).\
replace("{$events_list}", "\n".join(e for e in entries_list)).\
replace("{$events_table}", "\n".join(e for e in entries_rows))
file.write(new_events_template)
file.close()

View File

@@ -0,0 +1,75 @@
from UniTAP.libs.lib_tsi.tsi_types import TSI_DPRX_LOG_IF_SEL, TSI_HDRX_LOG_IF_SEL, TSI_DPRX_LOG_CTRL_VALUE_LINK_PAT, \
TSI_DPRX_VBID_LOG_SEL, TSI_DPRX_MSA_LOG_SEL, TSI_PDC_USBC_LOG_THRESH
class AdditionalEventFilter:
def __init__(self):
self.__ci_control = 0
self.__value = None
@property
def ci_control(self):
return self.__ci_control
@property
def value(self):
return self.__value
@ci_control.setter
def ci_control(self, ci_control: int):
if ci_control <= 0:
raise ValueError(f"CI control must be more than 0.")
self.__ci_control = ci_control
@value.setter
def value(self, value):
self.__value = value
class AdditionalSDPEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_DPRX_LOG_IF_SEL
self.value = [0] * 8
class AdditionalInfoFrameEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_HDRX_LOG_IF_SEL
self.value = [0] * 8
class AdditionalLinkPatternEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_DPRX_LOG_CTRL_VALUE_LINK_PAT
self.value = [0] * 3
class AdditionalVBIDEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_DPRX_VBID_LOG_SEL
self.value = 0
class AdditionalMSAEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_DPRX_MSA_LOG_SEL
self.value = 0
class AdditionalLSEEventFilter(AdditionalEventFilter):
def __init__(self):
super().__init__()
self.ci_control = TSI_PDC_USBC_LOG_THRESH
self.value = [0] * 5

View File

@@ -0,0 +1,108 @@
import warnings
from UniTAP.dev.modules.capturer.result_object import ResultObject
from UniTAP.libs.lib_tsi.tsi import TSIEventParser
from .event_utils import save_to_bin_file, save_to_csv_file, save_to_txt_file, save_to_html_file
from .event_types import EventFileFormat
class ResultEventObject(ResultObject):
"""
Class `ResultEventObject` inherited from class `ResultObject`.
Class `ResultEventObject` allows saving captured events to file `save_to_file_selected_event` or
`save_to_file_all_events`.
Also has all the `ResultObject` functionality.
"""
def __init__(self, fw_info: dict):
super().__init__()
self.__parsed_buffer = []
self.__fw_info = fw_info
self.__long_type_string = False
def __str__(self):
return f"Start capture time: {self.start_capture_time}\n" \
f"End capture time: {self.end_capture_time}\n" \
f"Timestamp: {self.timestamp.__str__()}\n"
def save_to_file_selected_event(self, file_format: EventFileFormat, path: str, index: int):
"""
Saving selected event to file. Supported file formats describe in `EventFileFormat`.
Args:
file_format (`PictureFileFormat`) - file format
path (str) - path to save
index (int) - number of event in list
"""
if len(self.buffer) > 0:
if file_format == EventFileFormat.BIN:
save_to_bin_file(path=path, events=self.buffer, index=index)
elif file_format == EventFileFormat.TXT:
if len(self.__parsed_buffer) == 0:
self.__parse_buffer()
save_to_txt_file(path=path, events=self.__parsed_buffer, index=index)
elif file_format == EventFileFormat.HTML:
raise NotImplementedError('Temporary not supported to txt files.')
else:
raise ValueError(f"Incorrect file format. Available formats: "
f"{EventFileFormat.BIN.name}, {EventFileFormat.TXT.name}, "
f"{EventFileFormat.HTML.name}.\n"
f"Transferred audio format: {file_format.name}")
self.__parsed_buffer.clear()
else:
warnings.warn("Buffer size is equal 0.")
def save_to_file_all_events(self, file_format: EventFileFormat, path: str):
"""
Saving all events to file. Supported file formats describe in `EventFileFormat`.
Args:
file_format (`EventFileFormat`) - file format
path (str) - path to save
"""
if len(self.buffer) > 0:
if file_format == EventFileFormat.BIN:
if path.find(".bin") == -1:
path += ".bin"
save_to_bin_file(path=path, events=self.buffer)
elif file_format == EventFileFormat.CSV:
if path.find(".csv") == -1:
path += ".csv"
if len(self.__parsed_buffer) == 0:
self.__parse_buffer()
save_to_csv_file(path=path, events=self.__parsed_buffer)
elif file_format == EventFileFormat.TXT:
if path.find(".txt") == -1:
path += ".txt"
if len(self.__parsed_buffer) == 0:
self.__parse_buffer()
save_to_txt_file(path=path, events=self.__parsed_buffer)
elif file_format == EventFileFormat.HTML:
if path.find(".html") == -1:
path += ".html"
if len(self.__parsed_buffer) == 0:
self.__parse_buffer()
save_to_html_file(path=path, events=self.__parsed_buffer, fw_info=self.__fw_info,
long_type_string=self.__long_type_string)
else:
raise ValueError(f"Incorrect file format. Available formats: "
f"{EventFileFormat.BIN.name}, {EventFileFormat.TXT.name}, "
f"{EventFileFormat.HTML.name}.\n"
f"Transferred audio format: {file_format.name}")
self.__parsed_buffer.clear()
else:
warnings.warn("Buffer size is equal 0.")
def __parse_buffer(self):
with TSIEventParser() as parser:
for index, item in enumerate(self.buffer):
if len(item.data) > 12:
res = parser.parse(item.data, to_dict=True)
if res[0] < 0:
warnings.warn(f"Cannot parse element {index}. Error {res[0]}")
else:
self.__parsed_buffer.append(res[1])
if len(res[1].get('type')) > 8 and self.__long_type_string is False:
self.__long_type_string = True

View File

@@ -0,0 +1 @@
from .result_video import PictureFileFormat

View File

@@ -0,0 +1,63 @@
import warnings
from UniTAP.dev.modules.capturer.result_object import ResultObject
from UniTAP.dev.ports.modules.internal_utils.image_formats import PictureFileFormat, VideoFileFormat
from UniTAP.dev.ports.modules.internal_utils.image_utils import save_video_to_bin, save_video_to_mp4
from UniTAP.utils.uicl_api import video_frame_save_to_file, ImageFileFormat
class ResultVideoObject(ResultObject):
"""
Class `ResultVideoObject` inherited from class `ResultObject`.
Class `ResultVideoObject` allows saving captured frames to image `save_image_to_file`.
Also has all the `ResultObject` functionality.
"""
def __init__(self):
super().__init__()
def save_image_to_file(self, file_format: PictureFileFormat, path: str, index: int):
"""
Saving selected video frame to file. Supported file formats describe in `PictureFileFormat`.
Args:
file_format (`PictureFileFormat`) - file format
path (str) - path to save
index (int) - number of video frame in list
"""
if len(self.buffer) > 0:
if file_format == PictureFileFormat.BIN:
video_frame_save_to_file(video_frame=self.buffer[index], path=path, file_type=ImageFileFormat.IFF_BIN)
elif file_format == PictureFileFormat.BMP:
video_frame_save_to_file(video_frame=self.buffer[index], path=path, file_type=ImageFileFormat.IFF_BMP)
elif file_format == PictureFileFormat.PPM:
video_frame_save_to_file(video_frame=self.buffer[index], path=path, file_type=ImageFileFormat.IFF_PPM)
elif file_format == PictureFileFormat.DSC:
video_frame_save_to_file(video_frame=self.buffer[index], path=path, file_type=ImageFileFormat.IFF_DSC)
else:
raise ValueError(f"Incorrect image format. Available formats: "
f"{PictureFileFormat.BIN.name}, {PictureFileFormat.BMP.name}, "
f"{PictureFileFormat.PPM.name}.\n"
f"Transferred image format: {file_format.name}")
else:
warnings.warn("Buffer size is equal 0.")
def __save_to_video_file(self, file_format: VideoFileFormat, path: str):
"""
Will be implemented later.
"""
if len(self.buffer) > 0:
if file_format == VideoFileFormat.BIN:
save_video_to_bin(path=path, data=self.buffer)
elif file_format == VideoFileFormat.MP4:
save_video_to_mp4(path=path, data=self.buffer)
else:
raise ValueError(f"Incorrect video format. Available formats: "
f"{VideoFileFormat.BIN.name}, {VideoFileFormat.MP4.name}.\n"
f"Transferred video format: {file_format.name}")
else:
warnings.warn("Buffer size is equal 0.")
def __str__(self):
return f"Start capture time: {self.start_capture_time}\n" \
f"End capture time: {self.end_capture_time}\n" \
f"Timestamp: {self.timestamp.__str__()}\n"

View File

@@ -0,0 +1,224 @@
import time
import warnings
import copy
from typing import Union, List
from UniTAP.dev.modules.capturer.capture import Capturer, CaptureConfig
from UniTAP.dev.modules.capturer.statuses import VideoCaptureStatus
from UniTAP.common import VideoFrame, VideoFrameDSC
from .result_video import ResultVideoObject
class VideoCapturer:
"""
Class `VideoCapturer` allows working with capturing video frames on Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
capturing `capture_result`.
"""
def __init__(self, capturer: Capturer, max_stream_number: int):
self.__capturer = capturer
self.__result = ResultVideoObject()
self.__max_stream_number = max_stream_number
@property
def status(self) -> VideoCaptureStatus:
"""
Returns current video capturer status.
Returns:
object of `VideoCaptureStatus` type
"""
return self.__capturer.video_capturer_status
@property
def capture_result(self) -> ResultVideoObject:
"""
Returns result of video capturing.
Returns:
object of `ResultVideoObject` type
"""
return self.__result
@property
def max_stream_number(self) -> int:
"""
Returns max stream number supported for capturing.
Returns:
object of `int` type
"""
return self.__max_stream_number
def stop(self):
"""
Stop capture video.
"""
config = CaptureConfig()
config.video = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.stop_capture(config)
self.__result.end_capture_time = time.time()
def pop_element(self) -> Union[VideoFrame, VideoFrameDSC]:
"""
Return first object of `VideoFrame` or `VideoFrameDSC`.
Returns:
object of `VideoFrame` or `VideoFrameDSC` type
"""
if self.status == VideoCaptureStatus.Idle:
warnings.warn("Video capture is not working now. Please, turn it on.")
captured_video_frames = self.__capturer.capture_video_by_n_frames(1)
self.__result.buffer.extend(copy.deepcopy(captured_video_frames))
return captured_video_frames[0]
def pop_element_as_result_object(self) -> ResultVideoObject:
"""
Return captured video frame(objects of `VideoFrame` or VideoFrameDSC`) as `ResultVideoObject`.
Returns:
object of `ResultVideoObject` type
"""
available_frame_count = self.__capturer.get_available_video_frame_count()
captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count)
res = ResultVideoObject()
res.buffer.extend(captured_video_frames)
return res
def pop_all_elements(self) -> Union[List[VideoFrame], List[VideoFrameDSC]]:
"""
Return all captured video frames(objects of `VideoFrame` or `VideoFrameDSC`).
Returns:
object of list[`VideoFrame` or `VideoFrameDSC`] type
"""
if self.status == VideoCaptureStatus.Idle:
warnings.warn("Video capture is not working now. Please, turn it on.")
available_frame_count = self.__capturer.get_available_video_frame_count()
print(f"Available frames count {available_frame_count}")
captured_video_frames = self.__capturer.capture_video_by_n_frames(available_frame_count)
self.__result.buffer.extend(copy.deepcopy(captured_video_frames))
return captured_video_frames
def get_crc(self, crc_frame_count: int = 1) -> List[tuple[int, int, int]]:
"""
Returns captured crc values.
Returns:
list[tuple[int, int, int]]
"""
return self.__capturer.capture_crc(crc_frame_count)
class VideoCapturerDP(VideoCapturer):
"""
Class `VideoCapturerDP` inherited from class `VideoCapturer` and also allows working with capturing video frames
on DP Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
capturing `capture_result`.
"""
def __init__(self, capturer: Capturer, max_stream_number: int):
super().__init__(capturer, max_stream_number)
self.__capturer = capturer
def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0,
capture_type: CaptureConfig.Type = CaptureConfig.Type.LIVE):
"""
Start capturing. Possible some variants of capturing:
- Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped).
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
- Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and
`pop_all_elements`)
All results can be obtained using the function `capture_result`.
Args:
frames_count (int)
sec (int)
stream_number (int)
capture_type (CaptureConfig.Type)
"""
if stream_number >= self.max_stream_number:
assert ValueError(f"Incorrect index of stream. Value must be from available options: "
f"{', '.join([str(i) for i in range(self.max_stream_number)])}")
self.__capturer.set_video_stream_number(stream_number)
config = CaptureConfig()
config.video = True
config.type = capture_type
self.__capturer.start_capture(config)
self.capture_result.clear()
self.capture_result.start_capture_time = time.time()
if frames_count > 0:
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count, capture_type))
elif sec > 0:
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec))
def get_buffer_capacity(self, stream_number: int = 0):
return self.__capturer.get_buffer_capacity(stream_number)
class VideoCapturerHDMI(VideoCapturer):
"""
Class `VideoCapturerHDMI` inherited from class `VideoCapturer` and also allows working with capturing video frames
on HDMI Sink (RX - receiver) side.
You can `start` capturing in several modes, `stop` capturing, getting current `status` and result of
capturing `capture_result`.
"""
def __init__(self, capturer: Capturer, max_stream_number: int):
super().__init__(capturer, max_stream_number)
self.__capturer = capturer
def start(self, frames_count: int = 0, sec: int = 0, stream_number: int = 0):
"""
Start capturing. Possible some variants of capturing:
- Capture with fixed frames count (will be captured fixed frames count and capturing will be stopped).
- Capture with fixed time (capturing will be continued fixed seconds and capturing will be stopped).
- Capture without parameters - Live capturing (for getting frames you need to use functions `pop_element` and
`pop_all_elements`)
All results can be obtained using the function `capture_result`.
Args:
frames_count (int)
sec (int)
stream_number (int)
"""
if stream_number >= self.max_stream_number:
assert ValueError(f"Incorrect index of stream. Value must be from available options: "
f"{', '.join([str(i) for i in range(self.max_stream_number)])}")
config = CaptureConfig()
config.video = True
config.type = CaptureConfig.Type.LIVE
self.__capturer.start_capture(config)
self.capture_result.clear()
self.capture_result.start_capture_time = time.time()
if frames_count > 0:
if frames_count > 1:
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count))
else:
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_frames(frames_count))
elif sec > 0:
self.capture_result.buffer.extend(self.__capturer.capture_video_by_n_sec(sec))
def get_buffer_capacity(self):
return self.__capturer.get_buffer_capacity()