Files
pqAutomationApp/drivers/UCD323_Function.py

722 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: UTF-8 -*-
import logging
import UniTAP
import time
import gc
from drivers.UCD323_Enum import UCDEnum
log = logging.getLogger(__name__)
class UCDController:
"""UCD323信号发生器控制类"""
def __init__(self):
self.lUniTAP = UniTAP.TsiLib()
self.dev = None
self.role = None
self.timing_manager = None
self.config = None
self.color_info = None
self.status = False
self.current_interface = "HDMI"
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.last_error = None
def search_device(self):
"""搜索可用设备"""
available_devices = self.lUniTAP.get_list_of_available_devices()
return available_devices if available_devices else []
def open(self, device_name):
"""打开设备"""
temp_dev = None
try:
if self.dev is not None or self.status:
self._force_cleanup()
device_id = int(device_name.split(":")[0])
temp_dev = self.lUniTAP.open(device_id)
try:
self.role = temp_dev.select_role(UniTAP.dev.UCD323.HDMISource)
self.dev = temp_dev
self.current_interface = "HDMI"
except Exception as role_error:
self._close_device_object(temp_dev)
raise role_error
pg, _ = self.get_tx_modules()
self.timing_manager = pg.timing_manager
self.color_info = UniTAP.ColorInfo()
self.status = True
return True
except Exception as e:
self._force_cleanup()
return False
def _reset_state(self):
"""重置所有运行时状态(不关闭设备句柄)"""
self.dev = None
self.role = None
self.status = False
self.timing_manager = None
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.current_interface = "HDMI"
def close(self):
"""关闭设备"""
try:
if self.dev:
self._close_device_object(self.dev)
self._reset_state()
self.lUniTAP = None
for i in range(3):
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
return True
except Exception as e:
self._reset_state()
try:
self.lUniTAP = None
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
except Exception as init_error:
pass
return False
def _close_device_object(self, dev_obj):
"""显式关闭设备对象"""
try:
if dev_obj is None:
return
if self.lUniTAP and hasattr(self.lUniTAP, "close"):
try:
self.lUniTAP.close(dev_obj)
except Exception as e:
pass
dev_obj = None
gc.collect()
time.sleep(1.0)
except Exception as e:
pass
def _force_cleanup(self):
"""强制清理所有状态"""
try:
if self.dev:
self._close_device_object(self.dev)
self._reset_state()
except Exception as e:
pass
def get_tx_modules(self):
"""根据当前接口返回 (pg, ag) 模块。"""
if not self.role:
raise RuntimeError("UCD 未打开,无法获取 TX 模块")
interface = getattr(self, "current_interface", None)
log.info("UCDController.get_tx_modules interface=%s", interface)
if interface in (None, "HDMI"):
return self.role.hdtx.pg, self.role.hdtx.ag
if interface in ("DP", "Type-C"):
return self.role.dptx.pg, self.role.dptx.ag
raise ValueError(f"不支持的接口类型: {interface}")
def _resolve_timing(self, pg=None):
"""优先从 current_timing 读取 timing必要时回退到 TX 模块。"""
if self.current_timing is not None:
return self.current_timing
if pg is not None:
get_vm = getattr(pg, "get_vm", None)
if callable(get_vm):
try:
vm = get_vm()
return getattr(vm, "timing", None)
except Exception:
pass
return None
def get_current_resolution(self, default=(3840, 2160)):
"""从当前 timing 获取 (width, height),失败时返回 default。"""
try:
pg, _ = self.get_tx_modules()
timing = self._resolve_timing(pg)
if timing and hasattr(timing, "h_active") and hasattr(timing, "v_active"):
return timing.h_active, timing.v_active
except Exception:
pass
return default
def _cleanup(self):
"""清理设备资源"""
try:
if self.dev:
self._close_device_object(self.dev)
self.dev = None
if hasattr(self.lUniTAP, "cleanup"):
self.lUniTAP.cleanup()
except Exception as e:
pass
def set_ucd_params(self, config):
"""设置UCD323参数"""
self.last_error = None
self.config = config
test_type = self.config.current_test_type
pg, _ = self.get_tx_modules()
self.timing_manager = pg.timing_manager
color_format = self.config.current_test_types[test_type]["color_format"]
bpc = self.config.current_test_types[test_type]["bpc"]
colorimetry = self.config.current_test_types[test_type]["colorimetry"]
if not self.set_color_mode(color_format, bpc, colorimetry):
self.last_error = (
f"set_color_mode failed: color_format={color_format}, bpc={bpc}, colorimetry={colorimetry}"
)
log.error(
"UCDController.set_ucd_params set_color_mode failed test_type=%s color_format=%s bpc=%s colorimetry=%s",
test_type,
color_format,
bpc,
colorimetry,
)
return False
timing_str = self.config.current_test_types[test_type]["timing"]
if not self.set_timing_from_string(timing_str):
self.last_error = f"set_timing_from_string failed: timing={timing_str}"
log.error(
"UCDController.set_ucd_params set_timing_from_string failed test_type=%s timing=%s",
test_type,
timing_str,
)
return False
self.current_pattern_index = 0
pattern_mode = self.config.current_pattern["pattern_mode"]
pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode)
if pattern is None:
self.last_error = f"get_video_pattern failed: pattern_mode={pattern_mode}"
return False
self.current_pattern = pattern
self.current_pattern_params = self.config.current_pattern["pattern_params"]
return True
def run(self):
"""运行设备"""
log.info(
"UCDController.run start current_pattern=%s has_pattern_param=%s",
getattr(self.current_pattern, "name", self.current_pattern),
self.current_pattern_param is not None,
)
self.apply_video_mode()
self.apply_pattern()
pg, _ = self.get_tx_modules()
log.info("UCDController.run calling pg.apply()")
pg.apply()
log.info("UCDController.run done")
return True
def send_image_pattern(self, image_path):
"""发送图片 Pattern依赖当前 timing/color_info 状态)。"""
if not self.status or not self.role:
return False
try:
pg, _ = self.get_tx_modules()
self.apply_video_mode()
pg.set_pattern(pattern=image_path)
pg.apply()
return True
except Exception:
return False
def send_solid_rgb_pattern(self, rgb):
"""发送纯色 RGB Pattern依赖当前 timing/color_info 状态)。"""
if not self.status or not self.role:
return False
try:
log.info("UCDController.send_solid_rgb_pattern rgb=%s", rgb)
self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor")
if self.current_pattern is None:
log.error("UCDController.send_solid_rgb_pattern failed: solidcolor pattern not found")
return False
return self.send_current_pattern_params(list(rgb))
except Exception:
log.exception("UCDController.send_solid_rgb_pattern exception")
return False
def send_current_pattern_params(self, pattern_params):
"""发送当前已配置的 pattern并可附带当前 pattern 参数。"""
if not self.status or not self.role:
return False
try:
if self.current_pattern is None:
log.error("UCDController.send_current_pattern_params failed: current_pattern is None")
return False
log.info(
"UCDController.send_current_pattern_params pattern=%s params=%s",
getattr(self.current_pattern, "name", self.current_pattern),
pattern_params,
)
if pattern_params is not None and not self.set_pattern(
self.current_pattern,
pattern_params,
):
log.error("UCDController.send_current_pattern_params failed: set_pattern returned False")
return False
log.info("UCDController.send_current_pattern_params calling run()")
self.run()
log.info("UCDController.send_current_pattern_params done")
return True
except Exception:
log.exception("UCDController.send_current_pattern_params exception")
return False
def set_color_mode(self, cf, bpc, cm):
"""设置颜色模式"""
current_dynamic_range = self.color_info.dynamic_range
color_format = UCDEnum.ColorInfo.get_color_format(cf)
if color_format is None:
return False
if not isinstance(bpc, int) or bpc <= 0:
return False
colorimetry = UCDEnum.ColorInfo.get_colorimetry(cm)
if colorimetry is None:
return False
self.color_info.color_format = color_format
self.color_info.bpc = bpc
self.color_info.colorimetry = colorimetry
self.color_info.dynamic_range = current_dynamic_range
return True
def apply_video_mode(self):
"""应用当前 color_info 和 timing"""
if self.current_timing:
log.info("UCDController.apply_video_mode start timing=%s", self.current_timing)
self.set_video_mode()
log.info("UCDController.apply_video_mode done")
return True
log.warning("UCDController.apply_video_mode skipped: current_timing is None")
return False
def set_video_mode(self):
"""设置视频模式"""
# 对比上次发出的配置,判断是否会触发电视重新锁定信号
current_config = (
self.current_timing,
self.color_info.color_format,
self.color_info.colorimetry,
self.color_info.dynamic_range,
self.color_info.bpc,
)
self.format_changed = (current_config != getattr(self, "_last_sent_config", None))
log.info(
"UCDController.set_video_mode format_changed=%s color_format=%s colorimetry=%s dynamic_range=%s bpc=%s",
self.format_changed,
self.color_info.color_format,
self.color_info.colorimetry,
self.color_info.dynamic_range,
self.color_info.bpc,
)
video_mode = UniTAP.VideoMode(
timing=self.current_timing, color_info=self.color_info
)
pg, _ = self.get_tx_modules()
log.info("UCDController.set_video_mode calling pg.set_vm()")
pg.set_vm(vm=video_mode)
log.info("UCDController.set_video_mode done")
self._last_sent_config = current_config
return True
def set_pattern(self, pattern, pattern_params=None):
"""设置pattern"""
if self.current_timing is None:
# Pattern-only updates (e.g. Calman patch click) can still be applied on
# an already active output mode. Missing timing should not block pattern staging.
log.warning("UCDController.set_pattern current_timing is None; continue with pattern-only apply")
needs_params = {
UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor,
UCDEnum.VideoPatternInfo.VideoPattern.SolidColor,
UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips,
UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes,
UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern,
UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow,
}
log.info(
"UCDController.set_pattern pattern=%s pattern_params=%s needs_params=%s",
getattr(pattern, "name", pattern),
pattern_params,
pattern in needs_params,
)
if pattern in needs_params and pattern_params is not None:
self.set_pattern_params(pattern, pattern_params)
return True
def set_next_pattern(self):
"""设置下一个pattern"""
if self.current_pattern_index < len(self.current_pattern_params):
p = self.current_pattern_params[self.current_pattern_index]
self.set_pattern(self.current_pattern, p)
self.current_pattern_index += 1
else:
error_msg = (
f"No more patterns to set. (已设置 {self.current_pattern_index} 个图案)"
)
raise IndexError(error_msg)
def set_pattern_params(self, pattern, pattern_params):
"""设置pattern参数"""
if pattern is not None:
solid_color_patterns = {
UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor,
UCDEnum.VideoPatternInfo.VideoPattern.SolidColor,
}
if pattern in solid_color_patterns:
log.info("UCDController.set_pattern_params solid_color rgb=%s", pattern_params)
self.current_pattern_param = UniTAP.SolidColorParams(
first=pattern_params[0],
second=pattern_params[1],
third=pattern_params[2],
)
return True
log.warning("UCDController.set_pattern_params unsupported pattern=%s", getattr(pattern, "name", pattern))
return False
def apply_pattern(self):
"""应用当前pattern"""
if self.current_pattern is not None:
log.info(
"UCDController.apply_pattern start pattern=%s has_params=%s",
getattr(self.current_pattern, "name", self.current_pattern),
self.current_pattern_param is not None,
)
pg, _ = self.get_tx_modules()
log.info("UCDController.apply_pattern calling pg.set_pattern()")
pg.set_pattern(self.current_pattern)
if self.current_pattern_param is not None:
log.info("UCDController.apply_pattern calling pg.set_pattern_params()")
pg.set_pattern_params(self.current_pattern_param)
log.info("UCDController.apply_pattern done")
return True
log.warning("UCDController.apply_pattern skipped: current_pattern is None")
return False
def search_timing(self, width, height, refresh_rate, resolution_type=None):
"""根据分辨率参数搜索合适的timing"""
if resolution_type:
resolution_type = resolution_type.lower()
standard = None
if resolution_type == "dmt":
standard = UniTAP.common.timing.Timing.Standard.SD_DMT
elif resolution_type == "cta":
standard = UniTAP.common.timing.Timing.Standard.SD_CTA
elif resolution_type == "cvt":
standard = UniTAP.common.timing.Timing.Standard.SD_CVT
rr = float(refresh_rate)
# Try both exact and NTSC-compatible rates (e.g. 120000 / 119880).
f_rate_candidates = [
int(round(rr * 1000)),
int(rr * 1000),
int(round((rr * 1000.0) * 1000.0 / 1001.0)),
]
# 去重并保持顺序
f_rate_candidates = list(dict.fromkeys(f_rate_candidates))
standards = [standard]
if standard is not None:
standards.append(None)
for std in standards:
for f_rate in f_rate_candidates:
timing = self.timing_manager.search(
h_active=width,
v_active=height,
f_rate=f_rate,
standard=std,
)
if timing:
return timing
else:
for res_type in ["dmt", "cta", "cvt", "ovt"]:
result = self.search_timing(width, height, refresh_rate, res_type)
if result:
return result
return None
def parse_formatted_timing(self, timing_str):
"""解析格式化的timing字符串"""
if not isinstance(timing_str, str):
raise ValueError("timing_str 必须是字符串")
s = " ".join(timing_str.strip().split())
s = s.replace(" x", "x").replace("x ", "x")
parts = s.split(" ", 1)
if len(parts) < 2:
raise ValueError(f"无法解析timing: {timing_str}")
type_str = parts[0].strip().upper()
rest = parts[1].strip()
if "@" not in rest:
raise ValueError(f"无法解析timing(缺少 @): {timing_str}")
left, right = [p.strip() for p in rest.split("@", 1)]
if "x" not in left:
raise ValueError(f"无法解析分辨率(缺少 x): {timing_str}")
wh = left.split("x")
if len(wh) != 2:
raise ValueError(f"无法解析分辨率: {timing_str}")
try:
width = int(wh[0])
height = int(wh[1])
except Exception:
raise ValueError(f"分辨率数字解析失败: {timing_str}")
hz_str = right.replace("Hz", "").replace("HZ", "").strip()
try:
refresh_rate = float(hz_str)
except Exception:
raise ValueError(f"刷新率解析失败: {timing_str}")
rtype_map = {
"DMT": "dmt",
"CTA": "cta",
"CVT": "cvt",
"OVT": "ovt",
}
if type_str not in rtype_map:
raise ValueError(f"未知的分辨率类型: {type_str}")
resolution_type = rtype_map[type_str]
def find_best_id_in_dict(res_map):
best_id, best_diff = None, float("inf")
for rid, info in res_map.items():
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
def find_best_id_in_list_map(res_map):
best_id, best_diff = None, float("inf")
for rid, infos in res_map.items():
for info in infos:
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
resolution_id = None
if resolution_type == "dmt":
resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.dmt_resolution_map)
elif resolution_type == "cta":
resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.cta_resolution_map)
elif resolution_type == "cvt":
resolution_id = find_best_id_in_list_map(
UCDEnum.TimingInfo.cvt_resolution_map
)
elif resolution_type == "ovt":
resolution_id = find_best_id_in_list_map(
UCDEnum.TimingInfo.ovt_resolution_map
)
result = {
"resolution_type": resolution_type,
"width": width,
"height": height,
"refresh_rate": refresh_rate,
"resolution_id": resolution_id,
}
return result
def set_timing_from_string(self, timing_str):
"""根据格式化timing字符串设置设备timing"""
try:
spec = self.parse_formatted_timing(timing_str)
except Exception:
log.exception("UCDController.set_timing_from_string parse failed timing=%s", timing_str)
return False
rtype = spec["resolution_type"]
rid = spec.get("resolution_id")
width = spec["width"]
height = spec["height"]
fr = spec["refresh_rate"]
if rid is not None and self.set_timing_from_id(rtype, rid):
log.info(
"UCDController.set_timing_from_string success by id timing=%s parsed=(%s id=%s)",
timing_str,
rtype,
rid,
)
return True
# Respect selected timing family first (DMT/CTA/CVT/OVT).
timing = self.search_timing(width, height, fr, rtype)
if timing is None:
# Fallback only for robustness: some SDKs may not classify a timing
# exactly as requested family even though width/height/fps matches.
timing = self.search_timing(width, height, fr, None)
if timing:
self.current_timing = timing
log.info(
"UCDController.set_timing_from_string success timing=%s parsed=(%s %sx%s@%s)",
timing_str,
rtype,
width,
height,
fr,
)
return True
log.error(
"UCDController.set_timing_from_string no timing matched timing=%s parsed=(%s %sx%s@%s)",
timing_str,
rtype,
width,
height,
fr,
)
return False
def set_timing_from_id(self, rtype, rid):
"""根据(type, id)设置设备timing"""
timing = None
if rtype.lower() == "dmt":
timing = self.timing_manager.get_dmt(rid)
elif rtype.lower() == "cta":
timing = self.timing_manager.get_cta(rid)
elif rtype.lower() == "cvt":
timing = self.timing_manager.get_cvt(rid)
elif rtype.lower() == "ovt":
get_ovt = getattr(self.timing_manager, "get_ovt", None)
if callable(get_ovt):
timing = get_ovt(rid)
else:
return False
else:
raise ValueError(f"不支持的分辨率类型: {rtype}")
if timing:
self.current_timing = timing
return True
else:
return False
def apply_signal_format(
self, color_space=None, data_range=None, bit_depth=None, color_format=None, **_
):
"""统一设置信号格式color_format / colorimetry / dynamic_range / bpc
Gamma/EOTF 传输特性在 ColorInfo API 中不存在;
max_cll / max_fall 暂无对应 SDK 接口,通过 **_ 接收后忽略。
"""
try:
if color_format:
fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(color_format)
cf = UCDEnum.ColorInfo.get_color_format(fmt_key)
if cf is not None:
self.color_info.color_format = cf
if color_space:
colorimetry = self._get_colorimetry_from_color_space(color_space, color_format)
if colorimetry:
self.color_info.colorimetry = colorimetry
if data_range:
if data_range == "Full":
self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA
elif data_range == "Limited":
self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
if bit_depth:
bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth)
self.color_info.bpc = bpc
if self.current_timing:
self.set_video_mode()
return True
except Exception:
return False
def _get_colorimetry_from_color_space(self, color_space, color_format=None):
"""将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry。
BT.2020 在 YCbCr 输出时使用 CM_ITUR_BT2020_YCbCrRGB 输出时使用 CM_ITUR_BT2020_RGB。
"""
is_ycbcr = UCDEnum.SignalFormat.OutputFormat.is_ycbcr(color_format)
bt2020_cm = (
UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_YCbCr
if is_ycbcr
else UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB
)
colorimetry_map = {
"sRGB": UniTAP.ColorInfo.Colorimetry.CM_sRGB,
"BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709,
"BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601,
"BT.2020": bt2020_cm,
"DCI-P3": UniTAP.ColorInfo.Colorimetry.CM_DCI_P3,
}
return colorimetry_map.get(color_space)