Files
pqAutomationApp/drivers/UCD323_Function.py

690 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: UTF-8 -*-
import UniTAP
import time
import gc
from drivers.UCD323_Enum import UCDEnum
class UCDController:
"""UCD323信号发生器控制类"""
def __init__(self):
self.lUniTAP = UniTAP.TsiLib()
self.dev = None
self.role = None
self.timing_manager = None
self.config = None
self.color_mode = None
self.status = False
self.current_interface = "HDMI"
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
def search_device(self):
"""搜索可用设备"""
available_devices = self.lUniTAP.get_list_of_available_devices()
return available_devices if available_devices else []
def open(self, device_name):
"""打开设备"""
temp_dev = None
try:
if self.dev is not None or self.status:
self._force_cleanup()
device_id = int(device_name.split(":")[0])
temp_dev = self.lUniTAP.open(device_id)
try:
self.role = temp_dev.select_role(UniTAP.dev.UCD323.HDMISource)
self.dev = temp_dev
self.current_interface = "HDMI"
except Exception as role_error:
self._close_device_object(temp_dev)
raise role_error
pg, _ = self.get_tx_modules()
self.timing_manager = pg.timing_manager
self.color_mode = UniTAP.ColorInfo()
self.status = True
return True
except Exception as e:
self._force_cleanup()
return False
def close(self):
"""关闭设备"""
try:
if self.dev:
self._close_device_object(self.dev)
self.dev = None
self.role = None
self.status = False
self.timing_manager = None
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.current_interface = "HDMI"
self.lUniTAP = None
for i in range(3):
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
return True
except Exception as e:
self.dev = None
self.role = None
self.status = False
self.timing_manager = None
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.current_interface = "HDMI"
try:
self.lUniTAP = None
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
except Exception as init_error:
pass
return False
def _close_device_object(self, dev_obj):
"""显式关闭设备对象"""
try:
if dev_obj is None:
return
if self.lUniTAP and hasattr(self.lUniTAP, "close"):
try:
self.lUniTAP.close(dev_obj)
except Exception as e:
pass
dev_obj = None
gc.collect()
time.sleep(1.0)
except Exception as e:
pass
def _force_cleanup(self):
"""强制清理所有状态"""
try:
if self.dev:
self._close_device_object(self.dev)
self.dev = None
self.role = None
self.status = False
self.timing_manager = None
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.current_interface = "HDMI"
except Exception as e:
pass
def get_tx_modules(self):
"""根据当前接口返回 (pg, ag) 模块。"""
if not self.role:
raise RuntimeError("UCD 未打开,无法获取 TX 模块")
interface = getattr(self, "current_interface", None)
if interface in (None, "HDMI"):
return self.role.hdtx.pg, self.role.hdtx.ag
if interface in ("DP", "Type-C"):
return self.role.dptx.pg, self.role.dptx.ag
raise ValueError(f"不支持的接口类型: {interface}")
def _resolve_timing(self, pg=None):
"""优先从 current_timing 读取 timing必要时回退到 TX 模块。"""
if self.current_timing is not None:
return self.current_timing
if pg is not None:
get_vm = getattr(pg, "get_vm", None)
if callable(get_vm):
try:
vm = get_vm()
return getattr(vm, "timing", None)
except Exception:
pass
return None
def get_current_resolution(self, default=(3840, 2160)):
"""从当前 timing 获取 (width, height),失败时返回 default。"""
try:
pg, _ = self.get_tx_modules()
timing = self._resolve_timing(pg)
if timing and hasattr(timing, "h_active") and hasattr(timing, "v_active"):
return timing.h_active, timing.v_active
except Exception:
pass
return default
def _cleanup(self):
"""清理设备资源"""
try:
if self.dev:
self._close_device_object(self.dev)
self.dev = None
if hasattr(self.lUniTAP, "cleanup"):
self.lUniTAP.cleanup()
except Exception as e:
pass
def set_ucd_params(self, config):
"""设置UCD323参数"""
self.config = config
test_type = self.config.current_test_type
pg, _ = self.get_tx_modules()
self.timing_manager = pg.timing_manager
if test_type == "hdr_movie":
color_format_str = self.config.current_test_types[test_type]["color_format"]
color_format = UCDEnum.ColorInfo.get_color_format(color_format_str)
if color_format:
self.color_mode.color_format = color_format
else:
return False
else:
color_format = self.config.current_test_types[test_type]["color_format"]
bpc = self.config.current_test_types[test_type]["bpc"]
colorimetry = self.config.current_test_types[test_type]["colorimetry"]
if not self.set_color_mode(color_format, bpc, colorimetry):
return False
timing_str = self.config.current_test_types[test_type]["timing"]
self.set_timing_from_string(timing_str)
self.current_pattern_index = 0
pattern_mode = self.config.current_pattern["pattern_mode"]
pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode)
if pattern is None:
return False
self.current_pattern = pattern
self.current_pattern_params = self.config.current_pattern["pattern_params"]
return True
def run(self):
"""运行设备"""
self.apply_video_mode()
self.apply_pattern()
pg, _ = self.get_tx_modules()
pg.apply()
return True
def send_image_pattern(self, image_path):
"""发送图片 Pattern依赖当前 timing/color_mode 状态)。"""
if not self.status or not self.role:
return False
try:
pg, _ = self.get_tx_modules()
self.apply_video_mode()
pg.set_pattern(pattern=image_path)
pg.apply()
return True
except Exception:
return False
def send_solid_rgb_pattern(self, rgb):
"""发送纯色 RGB Pattern依赖当前 timing/color_mode 状态)。"""
if not self.status or not self.role:
return False
try:
self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor")
if self.current_pattern is None:
return False
return self.send_current_pattern_params(list(rgb))
except Exception:
return False
def send_current_pattern_params(self, pattern_params):
"""发送当前已配置的 pattern并可附带当前 pattern 参数。"""
if not self.status or not self.role:
return False
try:
if self.current_pattern is None:
return False
if pattern_params is not None and not self.set_pattern(
self.current_pattern,
pattern_params,
):
return False
self.run()
return True
except Exception:
return False
def set_color_mode(self, cf, bpc, cm):
"""设置颜色模式"""
current_dynamic_range = None
current_transfer_characteristic = None
if hasattr(self.color_mode, "dynamic_range"):
current_dynamic_range = self.color_mode.dynamic_range
if hasattr(self.color_mode, "transfer_characteristic"):
current_transfer_characteristic = self.color_mode.transfer_characteristic
color_format = UCDEnum.ColorInfo.get_color_format(cf)
if color_format is None:
return False
if not isinstance(bpc, int) or bpc <= 0:
return False
colorimetry = UCDEnum.ColorInfo.get_colorimetry(cm)
if colorimetry is None:
return False
self.color_mode.color_format = color_format
self.color_mode.bpc = bpc
self.color_mode.colorimetry = colorimetry
if current_dynamic_range is not None:
self.color_mode.dynamic_range = current_dynamic_range
if current_transfer_characteristic is not None:
self.color_mode.transfer_characteristic = current_transfer_characteristic
return True
def apply_video_mode(self):
"""应用当前colormode和timing"""
if self.current_timing:
self.set_video_mode()
return True
return False
def set_video_mode(self):
"""设置视频模式"""
video_mode = UniTAP.VideoMode(
timing=self.current_timing, color_info=self.color_mode
)
pg, _ = self.get_tx_modules()
pg.set_vm(vm=video_mode)
return True
def set_pattern(self, pattern, pattern_params=None):
"""设置pattern"""
if self.current_timing:
if (
pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor
or pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips
or pattern
== UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes
or pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern
or pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow
and pattern_params
):
self.set_pattern_params(pattern, pattern_params)
return True
return False
def set_next_pattern(self):
"""设置下一个pattern"""
if self.current_pattern_index < len(self.current_pattern_params):
p = self.current_pattern_params[self.current_pattern_index]
self.set_pattern(self.current_pattern, p)
self.current_pattern_index += 1
else:
error_msg = (
f"No more patterns to set. (已设置 {self.current_pattern_index} 个图案)"
)
raise IndexError(error_msg)
def set_pattern_params(self, pattern, pattern_params):
"""设置pattern参数"""
if pattern:
if pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor:
self.current_pattern_param = UniTAP.SolidColorParams(
first=pattern_params[0],
second=pattern_params[1],
third=pattern_params[2],
)
return True
return False
def apply_pattern(self):
"""应用当前pattern"""
if self.current_pattern:
pg, _ = self.get_tx_modules()
pg.set_pattern(self.current_pattern)
if self.current_pattern_param:
pg.set_pattern_params(self.current_pattern_param)
return True
return False
def search_timing(self, width, height, refresh_rate, resolution_type=None):
"""根据分辨率参数搜索合适的timing"""
if resolution_type:
resolution_type = resolution_type.lower()
standard = None
if resolution_type == "dmt":
standard = UniTAP.common.timing.Timing.Standard.SD_DMT
elif resolution_type == "cta":
standard = UniTAP.common.timing.Timing.Standard.SD_CTA
elif resolution_type == "cvt":
standard = UniTAP.common.timing.Timing.Standard.SD_CVT
timing = self.timing_manager.search(
h_active=width,
v_active=height,
f_rate=int(refresh_rate) * 1000,
standard=standard,
)
if timing:
return timing
else:
for res_type in ["dmt", "cta", "cvt", "ovt"]:
result = self.search_timing(width, height, refresh_rate, res_type)
if result:
return result
return None
def parse_formatted_timing(self, timing_str):
"""解析格式化的timing字符串"""
if not isinstance(timing_str, str):
raise ValueError("timing_str 必须是字符串")
s = " ".join(timing_str.strip().split())
s = s.replace(" x", "x").replace("x ", "x")
parts = s.split(" ", 1)
if len(parts) < 2:
raise ValueError(f"无法解析timing: {timing_str}")
type_str = parts[0].strip().upper()
rest = parts[1].strip()
if "@" not in rest:
raise ValueError(f"无法解析timing(缺少 @): {timing_str}")
left, right = [p.strip() for p in rest.split("@", 1)]
if "x" not in left:
raise ValueError(f"无法解析分辨率(缺少 x): {timing_str}")
wh = left.split("x")
if len(wh) != 2:
raise ValueError(f"无法解析分辨率: {timing_str}")
try:
width = int(wh[0])
height = int(wh[1])
except Exception:
raise ValueError(f"分辨率数字解析失败: {timing_str}")
hz_str = right.replace("Hz", "").replace("HZ", "").strip()
try:
refresh_rate = float(hz_str)
except Exception:
raise ValueError(f"刷新率解析失败: {timing_str}")
rtype_map = {
"DMT": "dmt",
"CTA": "cta",
"CVT": "cvt",
"OVT": "ovt",
}
if type_str not in rtype_map:
raise ValueError(f"未知的分辨率类型: {type_str}")
resolution_type = rtype_map[type_str]
def find_best_id_in_dict(res_map):
best_id, best_diff = None, float("inf")
for rid, info in res_map.items():
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
def find_best_id_in_list_map(res_map):
best_id, best_diff = None, float("inf")
for rid, infos in res_map.items():
for info in infos:
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
resolution_id = None
if resolution_type == "dmt":
resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.dmt_resolution_map)
elif resolution_type == "cta":
resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.cta_resolution_map)
elif resolution_type == "cvt":
resolution_id = find_best_id_in_list_map(
UCDEnum.TimingInfo.cvt_resolution_map
)
elif resolution_type == "ovt":
resolution_id = find_best_id_in_list_map(
UCDEnum.TimingInfo.ovt_resolution_map
)
result = {
"resolution_type": resolution_type,
"width": width,
"height": height,
"refresh_rate": refresh_rate,
"resolution_id": resolution_id,
}
return result
def set_timing_from_string(self, timing_str):
"""根据格式化timing字符串设置设备timing"""
spec = self.parse_formatted_timing(timing_str)
rtype = spec["resolution_type"]
width = spec["width"]
height = spec["height"]
fr = spec["refresh_rate"]
if rtype != "ovt":
timing = self.search_timing(width, height, fr, rtype)
if timing:
self.current_timing = timing
return True
return False
def set_timing_from_id(self, rtype, rid):
"""根据(type, id)设置设备timing"""
timing = None
if rtype.lower() == "dmt":
timing = self.timing_manager.get_dmt(rid)
elif rtype.lower() == "cta":
timing = self.timing_manager.get_cta(rid)
elif rtype.lower() == "cvt":
timing = self.timing_manager.get_cvt(rid)
else:
raise ValueError(f"不支持的分辨率类型: {rtype}")
if timing:
self.current_timing = timing
return True
else:
return False
def set_sdr_format(
self, color_space=None, gamma=None, data_range=None, bit_depth=None
):
"""设置SDR信号格式"""
def _get_colorimetry_from_color_space(color_space_name):
"""将色彩空间名称转换为UniTAP colorimetry枚举值"""
try:
colorimetry_map = {
"BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709,
"BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601,
"BT.2020": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB,
}
result = colorimetry_map.get(color_space_name)
return result if result else UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709
except Exception as e:
return UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709
def _set_gamma_transfer_characteristic(gamma_value_str):
"""设置Gamma传输特性"""
try:
gamma_value = float(gamma_value_str)
if abs(gamma_value - 2.2) < 0.1:
self.color_mode.transfer_characteristic = (
UniTAP.ColorInfo.TransferCharacteristic.TRANSFER_BT709
)
return True
elif abs(gamma_value - 2.4) < 0.1:
if hasattr(
UniTAP.ColorInfo.TransferCharacteristic, "TRANSFER_GAMMA24"
):
self.color_mode.transfer_characteristic = (
UniTAP.ColorInfo.TransferCharacteristic.TRANSFER_GAMMA24
)
return True
else:
self.color_mode.transfer_characteristic = (
UniTAP.ColorInfo.TransferCharacteristic.TRANSFER_BT709
)
return False
elif abs(gamma_value - 2.6) < 0.1:
if hasattr(
UniTAP.ColorInfo.TransferCharacteristic, "TRANSFER_GAMMA26"
):
self.color_mode.transfer_characteristic = (
UniTAP.ColorInfo.TransferCharacteristic.TRANSFER_GAMMA26
)
return True
else:
self.color_mode.transfer_characteristic = (
UniTAP.ColorInfo.TransferCharacteristic.TRANSFER_BT709
)
return False
else:
return False
except Exception as e:
return False
try:
if color_space:
colorimetry = _get_colorimetry_from_color_space(color_space)
if colorimetry:
self.color_mode.colorimetry = colorimetry
if gamma:
_set_gamma_transfer_characteristic(gamma)
if data_range:
if data_range == "Full":
self.color_mode.dynamic_range = (
UniTAP.ColorInfo.DynamicRange.DR_VESA
)
elif data_range == "Limited":
self.color_mode.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
if bit_depth:
bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth)
self.color_mode.bpc = bpc
if self.current_timing:
self.set_video_mode()
return True
except Exception as e:
return False
def set_hdr_format(
self,
color_space=None,
data_range=None,
bit_depth=None,
max_cll=None,
max_fall=None,
):
"""设置HDR信号格式"""
try:
if color_space:
colorimetry = self._get_colorimetry_from_color_space(color_space)
if colorimetry:
self.color_mode.colorimetry = colorimetry
if data_range:
if data_range == "Full":
self.color_mode.dynamic_range = (
UniTAP.ColorInfo.DynamicRange.DR_VESA
)
elif data_range == "Limited":
self.color_mode.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
if bit_depth:
bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth)
self.color_mode.bpc = bpc
if self.current_timing:
self.set_video_mode()
return True
except Exception as e:
return False
def _get_colorimetry_from_color_space(self, color_space):
"""将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry"""
colorimetry_map = {
"BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709,
"BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601,
"BT.2020": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB,
"DCI-P3": UniTAP.ColorInfo.Colorimetry.CM_DCI_P3,
}
return colorimetry_map.get(color_space)