Files
pqAutomationApp/UniTAP/dev/ports/modules/vtg/pg.py
xinzhu.yin c157e774e5 1.1.0版本
2026-04-16 16:51:05 +08:00

899 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import copy
import os.path
from typing import Union, List, Optional
from PIL import Image
from UniTAP.utils import function_scheduler
from UniTAP.libs.lib_tsi.tsi import *
from UniTAP.libs.lib_tsi.tsi_io import PortIO
from UniTAP.dev.modules import MemoryManager
from UniTAP.dev.ports.modules.internal_utils.math import aligned
from UniTAP.common import VideoMode, Timing, ColorInfo, VideoFrame, VideoFrameDSC
from UniTAP.dev.ports.modules.internal_utils import uicl_image_from_vm_and_data, uicl_image_convert_to_vm
from .types import VideoPattern, PGVideoMode, PGAdaptiveSyncPatternType, PGColorimetry, \
PGColorInfo, PGColorDepth, PGStandard, PGDynamicRange, PGAspectRatio, ConstantASParams, SquareASParams, \
ZigzagASParams, FixedASParams, ASParams
from .private_types import get_as_params_value, PgCaps, CustomTimingFlags, PGPatternID, get_pattern_params_value, \
PGPatternType
from .pg_utils import pg_timingflags_from_vm, pg_pixel_format_from_vm, pg_colorformat_to_ci_colorformat, \
pg_dynamicrange_to_ci_dynamicrange, pg_colorimetry_to_ci_colorimetry, pg_bpc_to_ci_bpc
from .timing_manager import TimingManager
from .pg_pattern_params import *
from ..panel_replay import *
# Override MAX_IMAGE_PIXELS
Image.MAX_IMAGE_PIXELS = None
def _read_predefined_timings(io: PortIO) -> List[Timing]:
count = io.get(TSI_R_PG_PREDEF_TIMING_COUNT)[1]
predefined_list = []
for i in range(count):
io.set(TSI_W_PG_PREDEF_TIMING_SELECT, i)
timing = Timing()
timing.htotal = io.get(TSI_PG_CUSTOM_TIMING_HTOTAL)[1]
timing.vtotal = io.get(TSI_PG_CUSTOM_TIMING_VTOTAL)[1]
timing.hactive = io.get(TSI_PG_CUSTOM_TIMING_HACTIVE)[1]
timing.vactive = io.get(TSI_PG_CUSTOM_TIMING_VACTIVE)[1]
timing.hstart = io.get(TSI_PG_CUSTOM_TIMING_HSTART)[1]
timing.vstart = io.get(TSI_PG_CUSTOM_TIMING_VSTART)[1]
timing.hswidth = io.get(TSI_PG_CUSTOM_TIMING_HSYNCW)[1]
timing.vswidth = io.get(TSI_PG_CUSTOM_TIMING_VSYNCW)[1]
timing.id = io.get(TSI_PG_CUSTOM_TIMING_ID)[1]
# TODO: add conversion from pg AR to Timing AR
timing.aspect_ratio = io.get(TSI_PG_CUSTOM_TIMING_ASPECT_RATIO)[1]
timing.frame_rate = io.get(TSI_PG_CUSTOM_TIMING_FIELD_RATE)[1]
timing.standard = Timing.Standard(io.get(TSI_PG_CUSTOM_TIMING_STANDARD)[1])
timing_flags = io.get(TSI_PG_CUSTOM_TIMING_FLAGS)[1]
timing.hswidth *= (-1 if timing_flags >> 9 & 1 else 1)
timing.vswidth *= (-1 if timing_flags >> 10 & 1 else 1)
timing.reduce_blanking = Timing.ReduceBlanking((timing_flags >> 24) & 0x3)
predefined_list.append(timing)
return predefined_list
class PGStatus:
"""
Class `PGStatus` describes possible states of `PatternGenerator`.
"""
class PGError(IntEnum):
"""
Class `PGError` contains codes of errors with the possibility of string representation.
"""
NotReady = -1
OK = 0
HWFault = 1
PixelClock = 2
MemoryError = 3
DscFileZero = 4
DscPixelClockExceeds = 5
DscSourceNotSupport = 6
DscSinkNotSupport = 7
DscFailReadDpcd = 8
DscFailWriteDpcd = 9
WrongColorFormat = 10
WrongColorimetry = 11
WrongBitsPerComponent = 12
WrongDynamicRange = 13
NotEnoughMemoryForPattern = 14
def __str__(self):
if self.value == self.OK:
return "OK"
if self.value == self.HWFault:
return "HW Fault."
if self.value == self.PixelClock:
return "Pixel clock exceed HW Caps."
if self.value == self.MemoryError:
return "Not enough memory for custom pattern."
if self.value == self.DscFileZero:
return "DSC isnt enabled. DSC file has zero size."
if self.value == self.DscPixelClockExceeds:
return "DSC isnt enabled. Video pixel clock exceeds hardware capabilities."
if self.value == self.DscSourceNotSupport:
return "DSC isnt enabled. Source does not support DSC."
if self.value == self.DscSinkNotSupport:
return "DSC isnt enabled. Sink does not support DSC."
if self.value == self.DscFailReadDpcd:
return "DSC isnt enabled. Fail to read DSC DPCD 0x160."
if self.value == self.DscFailWriteDpcd:
return "DSC isnt enabled. Fail to write DSC DPCD 0x160."
if self.value == self.WrongColorFormat:
return "Selected Color Format does not match the pattern."
if self.value == self.WrongColorimetry:
return "Selected Colorimetry does not match the pattern."
if self.value == self.WrongBitsPerComponent:
return "Selected BPC does not match the port capabilities."
if self.value == self.WrongDynamicRange:
return "Selected Dynamic Range does not match the pattern."
else:
return f"Unknown error: {self.value}"
def __init__(self, value: int):
self._error = self.PGError(value & 0xFF)
self._is_video_produced = (value >> 30) & 1 != 0
self._non_applied_changes = (value >> 31) & 1 != 0
@property
def error(self) -> PGError:
"""
Returns pg error.
Returns:
object of `PGError`
"""
return self._error
@property
def is_video_produced(self) -> bool:
"""
Returns state of video produced.
Returns:
object of bool - is video produced or not
"""
return self._is_video_produced
@property
def non_applied_changes(self) -> bool:
"""
Returns state of applied changes.
Returns:
object of bool - were there any changes or not
"""
return self._non_applied_changes
class PatternGenerator:
"""
Main class `PatternGenerator` allows working with PG functionality on the device: set different types of pattern
`set_pattern`, set different video modes `set_vm`, set additional parameters for some patterns `set_pattern_params`,
get information about current video mode on stream `get_stream_video_mode`, `apply` all transferred setting,
`reset` settings and read pattern generator `status`.
"""
__available_patterns = '\n'.join([e.name for e in VideoPattern])
__hardware_generated_patterns = ''.join([(e.name + '\n') if i <= 7 else '' for i, e in enumerate(VideoPattern)])
_MAP_PATTERN_PARAMETER = {PGPatternID.SolidColor: SolidColorParams,
PGPatternID.WhiteVStrips: WhiteVStripsParams,
PGPatternID.GradientRGBStripes: GradientStripsParams,
PGPatternID.Motion_Pattern: MotionParams,
PGPatternID.SquareWindow: SquareWindowParams}
def __init__(self, port_io: PortIO, memory_manager: MemoryManager, stream: int):
self.__io = port_io
self.__memory_manager = memory_manager
self.__stream = stream
self.__caps = self.__read_caps()
self._pattern = None
self._pattern_id = PGPatternID.Disabled
self._pattern_params = None
self._vm = None
self.__panel_replay = PanelReplay(port_io, self.__caps)
def __read_caps(self):
self._become_active()
return self.__io.get(TSI_PG_CAPS_R, PgCaps)[1]
def _caps(self) -> PgCaps:
return self.__caps
def __data_from_image_file(self, target_video_mode: VideoMode, path: str) -> bytearray:
if target_video_mode is None:
target_video_mode = self.get_stream_video_mode()
current_video_mode = copy.deepcopy(target_video_mode)
target_size = (target_video_mode.timing.hactive, target_video_mode.timing.vactive)
image = Image.open(path)
image = image.resize(target_size)
image = image.convert('RGB')
data = bytearray([x for sets in list(image.getdata()) for x in sets])
current_video_mode.color_info.color_format = ColorInfo.ColorFormat.CF_RGB
current_video_mode.color_info.bpc = 8
current_video_mode.color_info.colorimetry = ColorInfo.Colorimetry.CM_sRGB
if current_video_mode != target_video_mode:
image = uicl_image_from_vm_and_data(current_video_mode, data)
data = uicl_image_convert_to_vm(image, target_video_mode)
return data
@staticmethod
def __data_from_dsc_file(target_video_mode: VideoMode, path: str) -> bytearray:
with open(path, "rb") as dsc_file:
data = bytearray(dsc_file.read())
assert data[:4] == b"DSCF", 'DSC File missing \'DSCF\' header.'
return data
def __load_custom_image(self, video_mode: VideoMode, content_data: Union[str, bytearray, VideoFrame]):
if video_mode is None:
video_mode = self.get_stream_video_mode()
if isinstance(content_data, str):
data = self.__data_from_image_file(video_mode, str(content_data))
elif isinstance(content_data, VideoFrame):
data = content_data.data
else:
image = uicl_image_from_vm_and_data(video_mode, content_data)
data = uicl_image_convert_to_vm(image, video_mode)
block_sizes = PatternGenerator.__calculate_frame_size(video_mode.timing.hactive,
video_mode.timing.vactive)
self.__memory_manager.set_memory_layout([block_sizes])
self.__memory_manager.set_memory_block_index(MemoryManager.MemoryOwner.MO_PatternGenerator)
self.__io.set(TSI_PG_CUSTOM_PATTERN_MEMORY_BLOCK_INDEX, 0)
self.__io.set(TSI_PG_CUSTOM_PATTERN_WIDTH, video_mode.timing.hactive)
self.__io.set(TSI_PG_CUSTOM_PATTERN_HEIGHT, video_mode.timing.vactive)
self.__io.set(TSI_PG_CUSTOM_PATTERN_PIXEL_FORMAT, pg_pixel_format_from_vm(video_mode))
self.__io.set(TSI_PG_CUSTOM_PATTERN_DATA, data, c_uint8, data_count=len(data))
def __load_dsc_image(self, video_mode: VideoMode, content_data: Union[str, bytearray, VideoFrameDSC]):
if isinstance(content_data, str):
data = self.__data_from_dsc_file(video_mode, str(content_data))
elif isinstance(content_data, VideoFrameDSC):
data = content_data.data
else:
data = content_data
block_sizes = len(data)
self.__memory_manager.set_memory_layout([block_sizes])
self.__memory_manager.set_memory_block_index(MemoryManager.MemoryOwner.MO_PatternGenerator)
self.__io.set(TSI_PG_CUSTOM_PATTERN_MEMORY_BLOCK_INDEX, 0)
self.__io.set(TSI_PG_CUSTOM_PATTERN_DATA, data, c_uint8, data_count=len(data))
@staticmethod
def __calculate_frame_size(width: int, height: int) -> int:
line_stride_bytes = width * 4
line_stride_aligned_bytes = aligned(line_stride_bytes, 1024)
line_stride_bytes2 = width * 2
line_stride_aligned_bytes2 = aligned(line_stride_bytes2, 1024)
return int((line_stride_aligned_bytes + line_stride_aligned_bytes2) * height)
def set_pattern(self, pattern: Union[VideoPattern, str, bytearray, VideoFrame, VideoFrameDSC]):
"""
Allows setting video pattern on current stream.
Possible variants:
- type `VideoPattern` - value from enum `VideoPattern` (one of th e possible predefined patterns).
- type str - path to image (bmp, png, jpeg, dsc and so on).
- type bytearray - raw image data, which will be loaded to device memory.
- type `VideoFrame` - object of class that contains the image data.
- type `VideoFrameDSC` - object of class that contains dsc image data.
Args:
pattern (Union[`VideoPattern`, str, bytearray, `VideoFrame`, `VideoFrameDSC`])
"""
self._pattern = pattern
def _setup_pattern(self):
if self._pattern is not None:
if isinstance(self._pattern, VideoPattern):
self._pattern_id = PGPatternID(self._pattern.value)
if not self.__caps.flags.custom_vp and self._pattern_id in [PGPatternID.WhiteVStrips,
PGPatternID.GradientRGBStripes,
PGPatternID.ColorRamp,
PGPatternID.ColorSquares,
PGPatternID.Motion_Pattern]:
raise ValueError(f'Current PG (stream {self.__stream}) does not support pattern '
f'{self._pattern.name}. Please select another supported pattern from '
f'"VideoPattern":\n{self.__hardware_generated_patterns}')
elif isinstance(self._pattern, str) and os.path.exists(self._pattern):
filename, file_extension = os.path.splitext(self._pattern)
if file_extension.lower() == '.dsc' and self.__support_dsc():
self.__load_dsc_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.DscImage
else:
self.__load_custom_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.ImageFile
elif isinstance(self._pattern, VideoFrame):
if self._pattern.color_info.color_format == ColorInfo.ColorFormat.CF_DSC and self.__support_dsc():
self.__load_dsc_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.DscImage
else:
self.__load_custom_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.ImageFile
elif isinstance(self._pattern, VideoFrameDSC):
if self._pattern.data[:4] == b'DSCF' and self.__support_dsc():
self.__load_dsc_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.DscImage
elif isinstance(self._pattern, bytearray):
if self._pattern[:4] == b'DSCF' and self.__support_dsc():
self.__load_dsc_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.DscImage
else:
self.__load_custom_image(self._vm, self._pattern)
self._pattern_id = PGPatternID.ImageFile
else:
raise TypeError(f"Incorrect input value: {self._pattern}.\n"
f"If you want to select pattern, see following list of available patterns:\n"
f"{self.__available_patterns}")
self.__io.set(TSI_PG_PREDEF_PATTERN_SELECT, self._pattern_id.value)
return True
return False
def __support_dsc(self) -> bool:
if not self.__caps.flags.dsc_supported:
warnings.warn("Display Stream Compression is not supported.")
return False
return True
def set_vm(self, vm: VideoMode):
"""
Allows setting `VideoMode` on current stream.
Args:
vm (VideoMode)
"""
self._vm = vm
def _setup_vm(self):
if self._vm is None:
self.__read_pg_settings()
return False
if not self._vm.is_valid():
raise ValueError(f"Incorrect Video Mode")
if self.__caps.max_h_active >= self._vm.timing.hactive and \
self.__caps.max_v_active >= self._vm.timing.vactive and \
self.__caps.max_h_total >= self._vm.timing.htotal and \
self.__caps.max_v_total >= self._vm.timing.vtotal and \
self.__caps.max_frame >= int(self._vm.timing.frame_rate / 1000):
self.__io.set(TSI_PG_CUSTOM_TIMING_FLAGS, pg_timingflags_from_vm(self._vm))
self.__io.set(TSI_PG_CUSTOM_TIMING_HTOTAL, self._vm.timing.htotal, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_VTOTAL, self._vm.timing.vtotal, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_HACTIVE, self._vm.timing.hactive, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_VACTIVE, self._vm.timing.vactive, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_HSTART, self._vm.timing.hstart, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_VSTART, self._vm.timing.vstart, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_HSYNCW, abs(self._vm.timing.hswidth), c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_VSYNCW, abs(self._vm.timing.vswidth), c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_FIELD_RATE, self._vm.timing.frame_rate, c_uint)
self.__io.set(TSI_PG_CUSTOM_TIMING_STANDARD, self._vm.timing.standard.value, c_uint)
return True
else:
raise ValueError(f"Incorrect Video Mode (incorrect resolution and(or) frame rate")
def set_pattern_params(self, pattern_params: PGPatternParams):
"""
Allows setting additional parameters for some patters on current stream.
See available `PGPatternParams` types: `SolidColorParams`, `WhiteVStripsParams`, `GradientStripsParams`,
`MotionParams`,`SquareWindowParams` (see in pg pattern params).
Args:
pattern_params (PGPatternParams)
"""
self._pattern_params = pattern_params
def _setup_pattern_params(self):
if self._pattern_params is not None and isinstance(self._pattern, VideoPattern):
if self._MAP_PATTERN_PARAMETER.get(PGPatternID(self._pattern_id.value)) is not None:
if isinstance(self._pattern_params,
self._MAP_PATTERN_PARAMETER.get(PGPatternID(self._pattern_id.value))):
self.__io.set(TSI_PG_PREDEF_PATTERN_PARAMS, get_pattern_params_value(self._pattern_params,
self._vm.color_info.bpc),
data_count=len(get_pattern_params_value(self._pattern_params)))
return True
else:
warnings.warn(f"Incorrect pattern name {self._pattern} and "
f"pattern params {type(self._pattern_params)}.\n"
f"For {self._pattern} you need to use "
f"{self._MAP_PATTERN_PARAMETER.get(self._pattern_id)}")
return False
def __get_custom_timing_flags_to_video_mode(self) -> (ColorInfo, bool, bool):
timing_flags = self.__io.get(TSI_PG_CUSTOM_TIMING_FLAGS, CustomTimingFlags)[1]
vm_color_format = pg_colorformat_to_ci_colorformat(PGColorInfo(timing_flags.color_space))
vm_colorimetry = pg_colorimetry_to_ci_colorimetry(PGColorInfo(timing_flags.color_space),
PGColorimetry(timing_flags.colorimetry))
vm_dynamic_range = pg_dynamicrange_to_ci_dynamicrange(PGDynamicRange(timing_flags.dynamic_range))
vm_h_sync_polarity = bool(timing_flags.h_sync_polarity)
vm_v_sync_polarity = bool(timing_flags.v_sync_polarity)
vm_bpc = pg_bpc_to_ci_bpc(timing_flags.color_depth)
color_info = ColorInfo()
color_info.color_format = vm_color_format
color_info.colorimetry = vm_colorimetry
color_info.dynamic_range = vm_dynamic_range
color_info.bpc = vm_bpc
return color_info, vm_h_sync_polarity, vm_v_sync_polarity
def get_stream_video_mode(self) -> VideoMode:
"""
Returns `VideoMode` information about current stream.
Returns:
object of VideoMode type
"""
self.__read_pg_settings()
video_mode = VideoMode()
video_mode.timing.hactive = self.__io.get(TSI_PG_CUSTOM_TIMING_HACTIVE, c_uint32)[1]
video_mode.timing.vactive = self.__io.get(TSI_PG_CUSTOM_TIMING_VACTIVE, c_uint32)[1]
video_mode.timing.htotal = self.__io.get(TSI_PG_CUSTOM_TIMING_HTOTAL, c_uint32)[1]
video_mode.timing.vtotal = self.__io.get(TSI_PG_CUSTOM_TIMING_VTOTAL, c_uint32)[1]
video_mode.timing.hstart = self.__io.get(TSI_PG_CUSTOM_TIMING_HSTART, c_uint32)[1]
video_mode.timing.vstart = self.__io.get(TSI_PG_CUSTOM_TIMING_VSTART, c_uint32)[1]
video_mode.color_info, h_sync_polarity, v_sync_polarity = self.__get_custom_timing_flags_to_video_mode()
video_mode.timing.hswidth = self.__io.get(TSI_PG_CUSTOM_TIMING_HSYNCW, c_uint32)[1] * (
-1 if h_sync_polarity else 1)
video_mode.timing.vswidth = self.__io.get(TSI_PG_CUSTOM_TIMING_VSYNCW, c_uint32)[1] * (
-1 if v_sync_polarity else 1)
video_mode.timing.frame_rate = self.__io.get(TSI_PG_CUSTOM_TIMING_FIELD_RATE, c_uint32)[1]
return video_mode
def _become_active(self):
self.__io.set(TSI_PG_STREAM_SELECT, self.__stream)
def __read_pg_settings(self):
self.__io.set(TSI_PG_COMMAND_W, 2)
def apply(self) -> bool:
"""
Apply all settings on current stream.
Returns:
object of bool type - settings were set successfully or not
"""
self.__io.set(TSI_PG_COMMAND_W, 3)
def is_apply_pg_success(pg: PatternGenerator):
return pg.status().error == PGStatus.PGError.OK
return function_scheduler(is_apply_pg_success, self, interval=1, timeout=10)
def status(self) -> PGStatus:
"""
Returns `PGStatus` of current stream.
Returns:
object of PGStatus type.
"""
self._become_active()
return PGStatus(self.__io.get(TSI_PG_STATUS_R, c_uint32)[1])
def reset(self):
"""
Reset all setting on current stream.
"""
self._vm = VideoMode()
self._pattern = None
self._pattern_id = PGPatternID.Disabled
self._pattern_params = None
def get_pixel_rate(self) -> int:
"""
Returns current pixel rate.
Returns:
object of int type
"""
vm = self.get_stream_video_mode()
return vm.timing.htotal * vm.timing.vtotal * int(vm.timing.frame_rate / 1000)
def panel_replay(self) -> Optional[PanelReplay]:
"""
Returns object of PanelReplay if device supports this feature.
Returns:
object of 'PanelReplay' type or None
"""
if self.__caps.flags.panel_replay != 0 or self.__caps.flags.psr != 0:
return self.__panel_replay
return None
class HdmiPatternGenerator(PatternGenerator):
"""
Class `HdmiPatternGenerator` inherited from class `PatternGenerator`.
Allows getting `timing_manager`, `max_stream_count`, `apply` PG settings.
Also has all the `PatternGenerator` functionality.
"""
def __init__(self, port_io: PortIO, memory_manager: MemoryManager):
super().__init__(port_io, memory_manager, 0)
self.__timing_manager = TimingManager(_read_predefined_timings(port_io))
@property
def timing_manager(self) -> TimingManager:
"""
Should be used for working with available timings on device.
Returns:
object of `TimingManager` type.
"""
return self.__timing_manager
@property
def max_stream_count(self) -> int:
"""
Returns maximum count of available streams.
Returns:
object of int type.
"""
return 1
def apply(self) -> bool:
"""
Apply all settings.
Returns:
object of bool type - settings were set successfully or not
"""
self._become_active()
res = self._setup_vm()
res += self._setup_pattern()
res += self._setup_pattern_params()
if res != 0:
return super().apply()
else:
return False
class DpPatternGenerator(PatternGenerator):
"""
Class `DpPatternGenerator` inherited from class `PatternGenerator`.
Allows getting `timing_manager`, `adaptive_sync_status`, `apply` and `reset` PG settings and set additional settings:
`set_as_config`, `set_scrolling_params`.
Also has all the `PatternGenerator` functionality.
"""
def __init__(self, port_io: PortIO, memory_manager: MemoryManager, stream: int):
super().__init__(port_io, memory_manager, stream)
self.__scrolling_params = None
self.__as_config = None
self.__io = port_io
self.__timing_manager = TimingManager(_read_predefined_timings(port_io))
@property
def timing_manager(self) -> TimingManager:
"""
Should be used for working with available timings on device.
Returns:
object of `TimingManager` type.
"""
return self.__timing_manager
def set_as_config(self, as_config: ASParams):
"""
Allows setting adaptive sync configuration.
See available `ASParams` types: `ConstantASParams`, `SquareASParams`, `ZigzagASParams`, `FixedASParams`
(see in types).
Args:
as_config (ASParams)
"""
self.__as_config = as_config
def set_scrolling_params(self, scrolling_params: PGScrollingParams):
"""
Allows setting additional configuration for "Scrolling pattern". See available `PGScrollingParams` types:
`StepsScrollingParams` (see in pg pattern params).
Args:
scrolling_params (PGScrollingParams)
"""
self.__scrolling_params = scrolling_params
def _setup_as_config(self):
if self.__as_config is not None:
if not self._caps().flags.adaptive_sync:
warnings.warn("AdaptiveSync is not supported")
self.__as_config = None
return False
self.__io.set(TSI_PG_ADAPTIVE_SYNC_CTRL,
get_as_params_value(self.__as_config),
data_count=len(get_as_params_value(self.__as_config)))
return True
return False
def _setup_scrolling_params(self):
if self.__scrolling_params is not None:
if not self._caps().flags.scrolling_pattern:
warnings.warn("Scrolling params is not supported")
self.__scrolling_params = None
return False
if self._vm.color_info.color_format == ColorInfo.ColorFormat.CF_YCbCr_422 and \
self.__scrolling_params.horizontally % 2:
warnings.warn(f"Wrong horizontally param. Number must be even. Will be used param - 1: "
f"{self.__scrolling_params.horizontally - 1}")
self.__scrolling_params.horizontally -= 1
elif self._vm.color_info.color_format == ColorInfo.ColorFormat.CF_YCbCr_420 and \
self.__scrolling_params.vertically % 2:
warnings.warn(f"Wrong vertically param. Number must be even. Will be used param - 1: "
f"{self.__scrolling_params.vertically - 1}")
self.__scrolling_params.vertically -= 1
if not (self._caps().patterns & (1 << self._pattern_id.value)):
warnings.warn(f"Patterns scrolling feature is not supported on pattern {self._pattern_id.name}")
return False
value = 1
value |= (1 << 31)
self.__io.set(TSI_PG_CUSTOM_PATTERN_SCROLLING_CONTROL, value, c_uint32)
self.__io.set(TSI_PG_CUSTOM_PATTERN_SCROLLING_CONFIG, get_pattern_params_value(self.__scrolling_params),
data_count=len(get_pattern_params_value(self.__scrolling_params)))
return True
return False
def adaptive_sync_status(self) -> bool:
"""
Returns work status of adaptive sync.
Returns:
object of bool type - adaptive sync enabled or not
"""
self._become_active()
res = self.__io.get(TSI_PG_ADAPTIVE_SYNC_STS, c_uint32)
if res[0] < TSI_SUCCESS:
return False
return res[1] & 1 != 0
def apply(self) -> bool:
"""
Apply all settings.
Returns:
object of bool type - settings were set successfully or not
"""
self._become_active()
res = self._setup_vm()
res += self._setup_pattern()
res += self._setup_pattern_params()
res += self._setup_scrolling_params()
if res != 0:
res = super().apply()
else:
res = False
self._setup_as_config()
return res
def reset(self):
"""
Reset all setting.
"""
self.__scrolling_params = None
self.__as_config = None
super().reset()
class DpMstPatternGenerator:
"""
Class `DpMstPatternGenerator` allows working with one of the supported streams on the device (contains list of the
`DpPatternGenerator` objects). To access the selected stream, use an override of `[ ]`.
Also, allows working with stream number 0 directly and applying all settings of all streams together.
"""
def __init__(self, port_io: PortIO, memory_manager: MemoryManager, max_stream_count: int):
self.__pg_list = []
self.__io = port_io
self.__MAX_STREAM_COUNT = max_stream_count
for i in range(self.max_stream_count):
self.__pg_list.append(DpPatternGenerator(port_io, memory_manager, i))
self.__timing_manager = self.__pg_list[0].timing_manager
def __getitem__(self, index: int):
"""
Allows selecting one of the supported streams.
Example:
`'object of device role'.pg[index]`
"""
if not (0 <= index < self.max_stream_count):
raise ValueError(f"Index of PG must be in range [0..{self.max_stream_count}].")
return self.__pg_list[index]
@property
def timing_manager(self) -> TimingManager:
"""
Should be used for working with available timings on device.
Returns:
object of `TimingManager` type.
"""
return self.__timing_manager
@property
def max_stream_count(self) -> int:
"""
Returns maximum count of available streams.
Returns:
object of int type.
"""
return self.__MAX_STREAM_COUNT
def set_pattern(self, pattern: Union[VideoPattern, str, bytearray, VideoFrame, VideoFrameDSC]):
"""
Allows setting video pattern on stream number 0 of pattern generator.
Possible variants:
- type `VideoPattern` - value from enum `VideoPattern` (one of th e possible predefined patterns).
- type str - path to image (bmp, png, jpeg, dsc and so on).
- type bytearray - raw image data, which will be loaded to device memory.
- type `VideoFrame` - object of class that contains the image data.
- type `VideoFrameDSC` - object of class that contains the dsc image data.
Args:
pattern (Union[`VideoPattern`, str, bytearray, `VideoFrame`, `VideoFrameDSC`])
"""
self.__pg_list[0].set_pattern(pattern)
def set_vm(self, vm: VideoMode):
"""
Allows setting `VideoMode` on stream number 0 of pattern generator.
Args:
vm (VideoMode)
"""
self.__pg_list[0].set_vm(vm)
def set_pattern_params(self, pattern_params: PGPatternParams):
"""
Allows setting additional parameters for some patters on stream number 0 of pattern generator.
See available `PGPatternParams` types: `SolidColorParams`, `WhiteVStripsParams`, `GradientStripsParams`,
`MotionParams`,`SquareWindowParams` (see in pg pattern params).
Args:
pattern_params (PGPatternParams)
"""
self.__pg_list[0].set_pattern_params(pattern_params)
def set_as_config(self, as_config: ASParams):
"""
Allows setting adaptive sync configuration on stream number 0 of pattern generator.
See available `ASParams` types: `ConstantASParams`, `SquareASParams`, `ZigzagASParams`, `FixedASParams`.
(see in types).
Args:
as_config (ASParams)
"""
self.__pg_list[0].set_as_config(as_config)
def set_scrolling_params(self, scrolling_params: PGScrollingParams):
"""
Allows setting additional configuration for "Scrolling pattern" on stream number 0 of pattern generator.
See available `PGScrollingParams` types: `StepsScrollingParams` (see in pg pattern params).
Args:
scrolling_params (PGScrollingParams)
"""
self.__pg_list[0].set_scrolling_params(scrolling_params)
def get_stream_video_mode(self) -> VideoMode:
"""
Returns `VideoMode` information about current configuration of PG on stream 0.
Returns:
object of VideoMode type
"""
return self.__pg_list[0].get_stream_video_mode()
def apply(self) -> bool:
"""
Apply all setting on stream number 0 of pattern generator.
Returns:
object of bool type - settings were set successfully or not
"""
return self.__pg_list[0].apply()
def status(self) -> PGStatus:
"""
Returns `PGStatus` on stream number 0 of pattern generator.
Returns:
object of PGStatus type.
"""
return self.__pg_list[0].status()
def reset(self):
"""
Reset all setting on stream number 0 of pattern generator.
"""
self.__pg_list[0].reset()
def apply_all(self):
"""
Apply all setting on all supported streams of pattern generator.
"""
for i in range(self.max_stream_count):
self.__pg_list[i].apply()
def get_pixel_rate(self, stream: int = 0) -> int:
"""
Returns current pixel rate for selected stream.
Returns:
object of int type
"""
return self.__pg_list[stream].get_pixel_rate()
@property
def panel_replay(self) -> PanelReplay:
"""
Returns object of PanelReplay if device supports this feature (first stream).
Returns:
object of 'PanelReplay' type or None
"""
return self.__pg_list[0].panel_replay()
def __read_max_stream_count(self) -> int:
result = self.__io.get(TSI_PG_MAX_STREAM_COUNT_R, c_uint32)
return result[1]