Files
pqAutomationApp/app/ucd/device.py

1267 lines
46 KiB
Python
Raw Normal View History

2026-06-11 16:29:36 +08:00
"""UCD 设备层UniTAP SDK 后端与 IUcdDevice 实现。
文件结构
--------
§1 模块级辅助函数
§2 ``_UcdSdkBackend`` UniTAP 直接调用内部类上层勿用
§3 ``DeviceInfo`` / ``list_devices``
§4 ``IUcdDevice`` 抽象接口
§5 ``UCD323Device`` 生产实现线程锁 + 状态机 + EventBus
"""
from __future__ import annotations
import gc
import logging
import threading
import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
import UniTAP
from app.ucd.domain import (
Colorimetry,
ConnectionChanged,
DeviceKind,
DynamicRange,
EventBus,
Interface,
PatternApplied,
PatternKind,
PatternSpec,
SignalApplied,
SignalFormat,
TimingSpec,
UcdApplyFailed,
UcdConfigError,
UcdNotConnected,
UcdSdkError,
UcdState,
UcdStateError,
assert_transition,
is_ycbcr,
)
from app.ucd.enum import UCDEnum
log = logging.getLogger(__name__)
_DEVICE_LOCK_TIMEOUT_SECONDS = 8.0
# ─── §1 模块级辅助函数 ─────────────────────────────────────────
def _find_best_id_in_dict(res_map, width, height, refresh_rate):
best_id, best_diff = None, float("inf")
for rid, info in res_map.items():
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
def _find_best_id_in_list_map(res_map, width, height, refresh_rate):
best_id, best_diff = None, float("inf")
for rid, infos in res_map.items():
for info in infos:
if info["width"] == width and info["height"] == height:
diff = abs(float(info["refresh_rate"]) - refresh_rate)
if diff < best_diff:
best_diff = diff
best_id = rid
return best_id if best_diff <= 1.0 else None
def _colorimetry_to_legacy_key(signal: SignalFormat) -> str:
"""domain.Colorimetry → UCDEnum.ColorInfo 查找 key。"""
cm = signal.colorimetry
ycbcr = is_ycbcr(signal.color_format)
if cm is Colorimetry.BT2020:
return "bt2020ycbcr" if ycbcr else "bt2020rgb"
return {
Colorimetry.SRGB: "srgb",
Colorimetry.BT709: "bt709",
Colorimetry.BT601: "bt601",
Colorimetry.DCI_P3: "dcip3",
Colorimetry.ADOBE_RGB: "adobergb",
}.get(cm, "srgb")
# ─── §2 _UcdSdkBackendUniTAP──────────────────────────────────
class _UcdSdkBackend:
"""UniTAP SDK 封装。仅由 UCD323Device 调用。"""
def __init__(self):
self.lUniTAP = UniTAP.TsiLib()
self.dev = None
self.role = None
self.timing_manager = None
self.config = None
self.color_info = None
self.status = False
self.current_interface = "HDMI"
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.last_error = None
def search_device(self):
"""搜索可用设备"""
available_devices = self.lUniTAP.get_list_of_available_devices()
return available_devices if available_devices else []
def open(self, device_name):
"""打开设备"""
temp_dev = None
try:
if self.dev is not None or self.status:
self._force_cleanup()
device_id = int(device_name.split(":")[0])
temp_dev = self.lUniTAP.open(device_id)
try:
self.role = temp_dev.select_role(UniTAP.dev.UCD323.HDMISource)
self.dev = temp_dev
self.current_interface = "HDMI"
except Exception as role_error:
self._close_device_object(temp_dev)
raise role_error
pg, ag = self.get_tx_modules()
self.timing_manager = pg.timing_manager
self.color_info = UniTAP.ColorInfo()
self._stop_audio_output(ag)
self.status = True
return True
except Exception as e:
self._force_cleanup()
return False
def close(self):
"""关闭设备"""
try:
if self.dev:
try:
self._stop_audio_output()
except Exception:
pass
self._close_device_object(self.dev)
self._reset_state()
self.lUniTAP = None
for i in range(3):
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
return True
except Exception as e:
self._reset_state()
try:
self.lUniTAP = None
gc.collect()
time.sleep(2.0)
self.lUniTAP = UniTAP.TsiLib()
except Exception as init_error:
pass
return False
def _reset_state(self):
"""重置所有运行时状态(不关闭设备句柄)"""
self.dev = None
self.role = None
self.status = False
self.timing_manager = None
self.current_timing = None
self.current_pattern = None
self.current_pattern_param = None
self.current_pattern_params = None
self.current_pattern_index = 0
self.current_interface = "HDMI"
def _force_cleanup(self):
"""强制清理所有状态"""
try:
if self.dev:
self._close_device_object(self.dev)
self._reset_state()
except Exception as e:
pass
def _close_device_object(self, dev_obj):
"""显式关闭设备对象"""
try:
if dev_obj is None:
return
if self.lUniTAP and hasattr(self.lUniTAP, "close"):
try:
self.lUniTAP.close(dev_obj)
except Exception as e:
pass
dev_obj = None
gc.collect()
time.sleep(1.0)
except Exception as e:
pass
def _cleanup(self):
"""清理设备资源"""
try:
if self.dev:
self._close_device_object(self.dev)
self.dev = None
if hasattr(self.lUniTAP, "cleanup"):
self.lUniTAP.cleanup()
except Exception as e:
pass
def get_tx_modules(self):
"""根据当前接口返回 (pg, ag) 模块。"""
if not self.role:
raise RuntimeError("UCD 未打开,无法获取 TX 模块")
interface = getattr(self, "current_interface", None)
log.info("UcdSdk.get_tx_modules interface=%s", interface)
if interface in (None, "HDMI"):
return self.role.hdtx.pg, self.role.hdtx.ag
if interface in ("DP", "Type-C"):
return self.role.dptx.pg, self.role.dptx.ag
raise ValueError(f"不支持的接口类型: {interface}")
def _stop_audio_output(self, ag=None) -> None:
"""关闭 HDMI/DP 音频发生器。PQ 测试仅需视频图案,避免电视持续输出测试音。"""
if not self.status or not self.role:
return
try:
if ag is None:
_, ag = self.get_tx_modules()
ag.stop_generate()
log.info("UcdSdk._stop_audio_output done")
except Exception:
log.exception("UcdSdk._stop_audio_output failed")
def _apply_pg_output(self, pg) -> bool:
"""提交 PG 输出,并确保音频发生器处于关闭状态。"""
try:
ok = bool(pg.apply())
except Exception:
log.exception("UcdSdk._apply_pg_output pg.apply failed")
return False
self._stop_audio_output()
return ok
def _resolve_timing(self, pg=None):
"""优先从 current_timing 读取 timing必要时回退到 TX 模块。"""
if self.current_timing is not None:
return self.current_timing
if pg is not None:
get_vm = getattr(pg, "get_vm", None)
if callable(get_vm):
try:
vm = get_vm()
return getattr(vm, "timing", None)
except Exception:
pass
return None
def get_current_resolution(self, default=(3840, 2160)):
"""从当前 timing 获取 (width, height),失败时返回 default。"""
try:
pg, _ = self.get_tx_modules()
timing = self._resolve_timing(pg)
if timing and hasattr(timing, "h_active") and hasattr(timing, "v_active"):
return timing.h_active, timing.v_active
except Exception:
pass
return default
def set_color_mode(self, cf, bpc, cm):
"""设置颜色模式"""
current_dynamic_range = self.color_info.dynamic_range
color_format = UCDEnum.ColorInfo.get_color_format(cf)
if color_format is None:
fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(cf)
color_format = UCDEnum.ColorInfo.get_color_format(fmt_key)
if color_format is None:
return False
if not isinstance(bpc, int) or bpc <= 0:
return False
colorimetry = UCDEnum.ColorInfo.get_colorimetry(cm)
if colorimetry is None:
return False
self.color_info.color_format = color_format
self.color_info.bpc = bpc
self.color_info.colorimetry = colorimetry
self.color_info.dynamic_range = current_dynamic_range
return True
def apply_signal_format(
self, color_space=None, data_range=None, bit_depth=None, color_format=None, **_
):
"""统一设置信号格式color_format / colorimetry / dynamic_range / bpc
Gamma/EOTF 传输特性在 ColorInfo API 中不存在
max_cll / max_fall 暂无对应 SDK 接口通过 **_ 接收后忽略
"""
try:
if color_format:
fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(color_format)
cf = UCDEnum.ColorInfo.get_color_format(fmt_key)
if cf is not None:
self.color_info.color_format = cf
if color_space:
colorimetry = self._get_colorimetry_from_color_space(color_space, color_format)
if colorimetry:
self.color_info.colorimetry = colorimetry
if data_range:
if data_range == "Full":
self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA
elif data_range == "Limited":
self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
if bit_depth:
bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth)
self.color_info.bpc = bpc
if self.current_timing:
self.set_video_mode()
return True
except Exception:
return False
def _get_colorimetry_from_color_space(self, color_space, color_format=None):
"""将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry。
BT.2020 YCbCr 输出时使用 CM_ITUR_BT2020_YCbCrRGB 输出时使用 CM_ITUR_BT2020_RGB
"""
is_ycbcr = UCDEnum.SignalFormat.OutputFormat.is_ycbcr(color_format)
bt2020_cm = (
UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_YCbCr
if is_ycbcr
else UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB
)
colorimetry_map = {
"sRGB": UniTAP.ColorInfo.Colorimetry.CM_sRGB,
"BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709,
"BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601,
"BT.2020": bt2020_cm,
"DCI-P3": UniTAP.ColorInfo.Colorimetry.CM_DCI_P3,
}
return colorimetry_map.get(color_space)
def apply_video_mode(self):
"""应用当前 color_info 和 timing"""
if self.current_timing:
log.info("UcdSdk.apply_video_mode start timing=%s", self.current_timing)
self.set_video_mode()
log.info("UcdSdk.apply_video_mode done")
return True
log.warning("UcdSdk.apply_video_mode skipped: current_timing is None")
return False
def set_video_mode(self):
"""设置视频模式"""
# 对比上次发出的配置,判断是否会触发电视重新锁定信号
current_config = (
self.current_timing,
self.color_info.color_format,
self.color_info.colorimetry,
self.color_info.dynamic_range,
self.color_info.bpc,
)
self.format_changed = (current_config != getattr(self, "_last_sent_config", None))
log.info(
"UcdSdk.set_video_mode format_changed=%s color_format=%s colorimetry=%s dynamic_range=%s bpc=%s",
self.format_changed,
self.color_info.color_format,
self.color_info.colorimetry,
self.color_info.dynamic_range,
self.color_info.bpc,
)
if not self.format_changed:
log.info("UcdSdk.set_video_mode skipped pg.set_vm(): config unchanged")
return True
video_mode = UniTAP.VideoMode(
timing=self.current_timing, color_info=self.color_info
)
pg, _ = self.get_tx_modules()
log.info("UcdSdk.set_video_mode calling pg.set_vm()")
pg.set_vm(vm=video_mode)
self._stop_audio_output()
log.info("UcdSdk.set_video_mode done")
self._last_sent_config = current_config
return True
def parse_formatted_timing(self, timing_str):
"""解析格式化的timing字符串"""
if not isinstance(timing_str, str):
raise ValueError("timing_str 必须是字符串")
s = " ".join(timing_str.strip().split())
s = s.replace(" x", "x").replace("x ", "x")
parts = s.split(" ", 1)
if len(parts) < 2:
raise ValueError(f"无法解析timing: {timing_str}")
type_str = parts[0].strip().upper()
rest = parts[1].strip()
if "@" not in rest:
raise ValueError(f"无法解析timing(缺少 @): {timing_str}")
left, right = [p.strip() for p in rest.split("@", 1)]
if "x" not in left:
raise ValueError(f"无法解析分辨率(缺少 x): {timing_str}")
wh = left.split("x")
if len(wh) != 2:
raise ValueError(f"无法解析分辨率: {timing_str}")
try:
width = int(wh[0])
height = int(wh[1])
except Exception:
raise ValueError(f"分辨率数字解析失败: {timing_str}")
hz_str = right.replace("Hz", "").replace("HZ", "").strip()
try:
refresh_rate = float(hz_str)
except Exception:
raise ValueError(f"刷新率解析失败: {timing_str}")
rtype_map = {
"DMT": "dmt",
"CTA": "cta",
"CVT": "cvt",
"OVT": "ovt",
}
if type_str not in rtype_map:
raise ValueError(f"未知的分辨率类型: {type_str}")
resolution_type = rtype_map[type_str]
resolution_id = None
if resolution_type == "dmt":
resolution_id = _find_best_id_in_dict(
UCDEnum.TimingInfo.dmt_resolution_map, width, height, refresh_rate
)
elif resolution_type == "cta":
resolution_id = _find_best_id_in_dict(
UCDEnum.TimingInfo.cta_resolution_map, width, height, refresh_rate
)
elif resolution_type == "cvt":
resolution_id = _find_best_id_in_list_map(
UCDEnum.TimingInfo.cvt_resolution_map, width, height, refresh_rate
)
elif resolution_type == "ovt":
resolution_id = _find_best_id_in_list_map(
UCDEnum.TimingInfo.ovt_resolution_map, width, height, refresh_rate
)
result = {
"resolution_type": resolution_type,
"width": width,
"height": height,
"refresh_rate": refresh_rate,
"resolution_id": resolution_id,
}
return result
def search_timing(self, width, height, refresh_rate, resolution_type=None):
"""根据分辨率参数搜索合适的timing"""
if resolution_type:
resolution_type = resolution_type.lower()
standard = None
if resolution_type == "dmt":
standard = UniTAP.common.timing.Timing.Standard.SD_DMT
elif resolution_type == "cta":
standard = UniTAP.common.timing.Timing.Standard.SD_CTA
elif resolution_type == "cvt":
standard = UniTAP.common.timing.Timing.Standard.SD_CVT
rr = float(refresh_rate)
# Try both exact and NTSC-compatible rates (e.g. 120000 / 119880).
f_rate_candidates = [
int(round(rr * 1000)),
int(rr * 1000),
int(round((rr * 1000.0) * 1000.0 / 1001.0)),
]
# 去重并保持顺序
f_rate_candidates = list(dict.fromkeys(f_rate_candidates))
standards = [standard]
if standard is not None:
standards.append(None)
for std in standards:
for f_rate in f_rate_candidates:
timing = self.timing_manager.search(
h_active=width,
v_active=height,
f_rate=f_rate,
standard=std,
)
if timing:
return timing
else:
for res_type in ["dmt", "cta", "cvt", "ovt"]:
result = self.search_timing(width, height, refresh_rate, res_type)
if result:
return result
return None
def set_timing_from_id(self, rtype, rid):
"""根据(type, id)设置设备timing"""
timing = None
if rtype.lower() == "dmt":
timing = self.timing_manager.get_dmt(rid)
elif rtype.lower() == "cta":
timing = self.timing_manager.get_cta(rid)
elif rtype.lower() == "cvt":
timing = self.timing_manager.get_cvt(rid)
elif rtype.lower() == "ovt":
get_ovt = getattr(self.timing_manager, "get_ovt", None)
if callable(get_ovt):
timing = get_ovt(rid)
else:
return False
else:
raise ValueError(f"不支持的分辨率类型: {rtype}")
if timing:
self.current_timing = timing
return True
else:
return False
def set_timing_from_string(self, timing_str):
"""根据格式化timing字符串设置设备timing"""
try:
spec = self.parse_formatted_timing(timing_str)
except Exception:
log.exception("UcdSdk.set_timing_from_string parse failed timing=%s", timing_str)
return False
rtype = spec["resolution_type"]
rid = spec.get("resolution_id")
width = spec["width"]
height = spec["height"]
fr = spec["refresh_rate"]
if rid is not None and self.set_timing_from_id(rtype, rid):
log.info(
"UcdSdk.set_timing_from_string success by id timing=%s parsed=(%s id=%s)",
timing_str,
rtype,
rid,
)
return True
# Respect selected timing family first (DMT/CTA/CVT/OVT).
timing = self.search_timing(width, height, fr, rtype)
if timing is None:
# Fallback only for robustness: some SDKs may not classify a timing
# exactly as requested family even though width/height/fps matches.
timing = self.search_timing(width, height, fr, None)
if timing:
self.current_timing = timing
log.info(
"UcdSdk.set_timing_from_string success timing=%s parsed=(%s %sx%s@%s)",
timing_str,
rtype,
width,
height,
fr,
)
return True
log.error(
"UcdSdk.set_timing_from_string no timing matched timing=%s parsed=(%s %sx%s@%s)",
timing_str,
rtype,
width,
height,
fr,
)
return False
def set_pattern(self, pattern, pattern_params=None):
"""设置pattern"""
if self.current_timing is None:
# Pattern-only updates (e.g. Calman patch click) can still be applied on
# an already active output mode. Missing timing should not block pattern staging.
log.warning("UcdSdk.set_pattern current_timing is None; continue with pattern-only apply")
needs_params = {
UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor,
UCDEnum.VideoPatternInfo.VideoPattern.SolidColor,
UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips,
UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes,
UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern,
UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow,
}
log.info(
"UcdSdk.set_pattern pattern=%s pattern_params=%s needs_params=%s",
getattr(pattern, "name", pattern),
pattern_params,
pattern in needs_params,
)
if pattern in needs_params and pattern_params is not None:
self.set_pattern_params(pattern, pattern_params)
return True
def set_pattern_params(self, pattern, pattern_params):
"""设置pattern参数"""
if pattern is not None:
solid_color_patterns = {
UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor,
UCDEnum.VideoPatternInfo.VideoPattern.SolidColor,
}
if pattern in solid_color_patterns:
log.info("UcdSdk.set_pattern_params solid_color rgb=%s", pattern_params)
self.current_pattern_param = UniTAP.SolidColorParams(
first=pattern_params[0],
second=pattern_params[1],
third=pattern_params[2],
)
return True
log.warning("UcdSdk.set_pattern_params unsupported pattern=%s", getattr(pattern, "name", pattern))
return False
def apply_pattern(self):
"""应用当前pattern"""
if self.current_pattern is not None:
log.info(
"UcdSdk.apply_pattern start pattern=%s has_params=%s",
getattr(self.current_pattern, "name", self.current_pattern),
self.current_pattern_param is not None,
)
pg, _ = self.get_tx_modules()
log.info("UcdSdk.apply_pattern calling pg.set_pattern()")
pg.set_pattern(self.current_pattern)
if self.current_pattern_param is not None:
log.info("UcdSdk.apply_pattern calling pg.set_pattern_params()")
pg.set_pattern_params(self.current_pattern_param)
log.info("UcdSdk.apply_pattern done")
return True
log.warning("UcdSdk.apply_pattern skipped: current_pattern is None")
return False
def set_next_pattern(self):
"""设置下一个pattern"""
if self.current_pattern_index < len(self.current_pattern_params):
p = self.current_pattern_params[self.current_pattern_index]
self.set_pattern(self.current_pattern, p)
self.current_pattern_index += 1
else:
error_msg = (
f"No more patterns to set. (已设置 {self.current_pattern_index} 个图案)"
)
raise IndexError(error_msg)
def set_ucd_params(self, config):
"""设置UCD323参数"""
self.last_error = None
self.config = config
test_type = self.config.current_test_type
pg, _ = self.get_tx_modules()
self.timing_manager = pg.timing_manager
color_format = self.config.current_test_types[test_type]["color_format"]
bpc = self.config.current_test_types[test_type]["bpc"]
colorimetry = self.config.current_test_types[test_type]["colorimetry"]
if not self.set_color_mode(color_format, bpc, colorimetry):
self.last_error = (
f"set_color_mode failed: color_format={color_format}, bpc={bpc}, colorimetry={colorimetry}"
)
log.error(
"UcdSdk.set_ucd_params set_color_mode failed test_type=%s color_format=%s bpc=%s colorimetry=%s",
test_type,
color_format,
bpc,
colorimetry,
)
return False
timing_str = self.config.current_test_types[test_type]["timing"]
if not self.set_timing_from_string(timing_str):
self.last_error = f"set_timing_from_string failed: timing={timing_str}"
log.error(
"UcdSdk.set_ucd_params set_timing_from_string failed test_type=%s timing=%s",
test_type,
timing_str,
)
return False
self.current_pattern_index = 0
pattern_mode = self.config.current_pattern["pattern_mode"]
pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode)
if pattern is None:
self.last_error = f"get_video_pattern failed: pattern_mode={pattern_mode}"
return False
self.current_pattern = pattern
self.current_pattern_params = self.config.current_pattern["pattern_params"]
return True
def run(self):
"""运行设备"""
log.info(
"UcdSdk.run start current_pattern=%s has_pattern_param=%s",
getattr(self.current_pattern, "name", self.current_pattern),
self.current_pattern_param is not None,
)
self.apply_video_mode()
self.apply_pattern()
pg, _ = self.get_tx_modules()
log.info("UcdSdk.run calling pg.apply()")
ok = self._apply_pg_output(pg)
log.info("UcdSdk.run done ok=%s", ok)
return ok
def send_current_pattern_params(self, pattern_params):
"""发送当前已配置的 pattern并可附带当前 pattern 参数。"""
if not self.status or not self.role:
return False
try:
if self.current_pattern is None:
log.error("UcdSdk.send_current_pattern_params failed: current_pattern is None")
return False
log.info(
"UcdSdk.send_current_pattern_params pattern=%s params=%s",
getattr(self.current_pattern, "name", self.current_pattern),
pattern_params,
)
if pattern_params is not None and not self.set_pattern(
self.current_pattern,
pattern_params,
):
log.error("UcdSdk.send_current_pattern_params failed: set_pattern returned False")
return False
log.info("UcdSdk.send_current_pattern_params calling run()")
self.run()
log.info("UcdSdk.send_current_pattern_params done")
return True
except Exception:
log.exception("UcdSdk.send_current_pattern_params exception")
return False
def send_solid_rgb_pattern(self, rgb):
"""发送纯色 RGB Pattern依赖当前 timing/color_info 状态)。"""
if not self.status or not self.role:
return False
try:
log.info("UcdSdk.send_solid_rgb_pattern rgb=%s", rgb)
self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor")
if self.current_pattern is None:
log.error("UcdSdk.send_solid_rgb_pattern failed: solidcolor pattern not found")
return False
return self.send_current_pattern_params(list(rgb))
except Exception:
log.exception("UcdSdk.send_solid_rgb_pattern exception")
return False
def send_image_pattern(self, image_path):
"""发送图片 Pattern依赖当前 timing/color_info 状态)。"""
if not self.status or not self.role:
return False
try:
pg, _ = self.get_tx_modules()
# 仅切换图案,不重复 set_vm重复 apply video mode 会触发电视 HDMI 重锁发声。
if getattr(self, "_last_sent_config", None) is None:
self.apply_video_mode()
pg.set_pattern(pattern=image_path)
return self._apply_pg_output(pg)
except Exception:
return False
# ─── §3 DeviceInfo / list_devices ────────────────────────────────
@dataclass(frozen=True)
class DeviceInfo:
"""UCD 设备发现条目。
``display`` SDK 给出的完整字符串``"0: UCD-323 #12345678"``
``index`` / ``serial`` / ``model`` 通过解析得到解析失败时为 None
"""
display: str
index: int | None = None
serial: str | None = None
model: str | None = None
@classmethod
def parse(cls, display: str) -> "DeviceInfo":
idx: int | None = None
model: str | None = None
serial: str | None = None
try:
head, rest = display.split(":", 1)
idx = int(head.strip())
rest = rest.strip()
# 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)"
tokens = rest.split()
if tokens:
model = tokens[0]
for tok in tokens[1:]:
if tok.startswith("#") and len(tok) >= 2:
serial = tok.lstrip("#")
break
except Exception: # noqa: BLE001 - 解析失败保留原 display 即可
pass
return cls(display=display, index=idx, serial=serial, model=model)
def list_devices(sdk: "_UcdSdkBackend") -> list[DeviceInfo]:
"""通过给定的 SDK 后端枚举可用 UCD 设备。"""
try:
raw_list = sdk.search_device()
except Exception as exc: # noqa: BLE001
raise UcdSdkError("枚举 UCD 设备失败") from exc
return [DeviceInfo.parse(s) for s in (raw_list or [])]
# ─── §4 IUcdDevice 抽象接口 ──────────────────────────────────────
class IUcdDevice(ABC):
"""UCD 信号发生器抽象设备。
上层Service / GUI****通过本接口操作硬件不得穿透到
UniTAP SDK 或具体实现细节
"""
@property
@abstractmethod
def state(self) -> UcdState: ...
@property
@abstractmethod
def info(self) -> DeviceInfo | None: ...
@abstractmethod
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
"""打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。"""
@abstractmethod
def close(self) -> None:
"""关闭设备(幂等)。"""
@abstractmethod
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
"""写入信号格式与 timing未 apply。返回 ``format_changed``。"""
@abstractmethod
def set_pattern(self, pattern: PatternSpec) -> None:
"""设置当前图案(未 apply"""
@abstractmethod
def apply(self) -> None:
"""将已配置的信号格式 + 图案一次性提交给硬件。"""
@abstractmethod
def current_resolution(self) -> tuple[int, int]:
"""读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。"""
@abstractmethod
def search_devices(self) -> list[str]:
"""枚举可用设备的 SDK 显示字符串列表。"""
@property
@abstractmethod
def format_changed(self) -> bool:
"""最近一次视频模式提交是否相对上次发生变化。"""
@property
@abstractmethod
def last_error(self) -> str | None:
"""最近一次配置/应用失败时的错误描述。"""
@abstractmethod
# --- 测试 / GUI 遗留快捷 API ---
def apply_signal_format(
self,
*,
color_space: str | None = None,
data_range: str | None = None,
bit_depth: str | None = None,
color_format: str | None = None,
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
"""仅更新信号格式(沿用当前 timing不切换图案。"""
@abstractmethod
def set_ucd_params(self, config) -> bool:
"""按 PQConfig stage 色彩 / Timing / Pattern 类型(不 apply 输出)。"""
@abstractmethod
def send_current_pattern_params(self, pattern_params) -> bool:
"""更新当前 pattern 参数并 apply 到硬件。"""
@abstractmethod
def apply_config_and_run(self, config, pattern_params) -> bool:
"""set_ucd_params + set_pattern + run 复合操作。"""
# ─── §5 UCD323Device 真实实现 ────────────────────────────────────
class UCD323Device(IUcdDevice):
"""生产环境实现。内部委托给本模块的 :class:`_UcdSdkBackend`。"""
def __init__(self, bus: EventBus, sdk: "_UcdSdkBackend | None" = None):
self._bus = bus
self._sdk: "_UcdSdkBackend" = sdk or _UcdSdkBackend()
self._state: UcdState = UcdState.CLOSED
self._info: DeviceInfo | None = None
self._interface: Interface = Interface.HDMI
self._lock = threading.RLock()
self._lock_owner_tid: int | None = None
self._lock_owner_name: str | None = None
self._curr_signal: SignalFormat | None = None
self._curr_timing: TimingSpec | None = None
self._curr_pattern: PatternSpec | None = None
self._last_applied: tuple[SignalFormat, TimingSpec] | None = None
@contextmanager
def _acquire_device_lock(self, op_name: str):
current = threading.current_thread()
log.info(
"UCD323Device.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s",
op_name,
_DEVICE_LOCK_TIMEOUT_SECONDS,
threading.get_ident(),
current.name,
self._lock_owner_tid,
self._lock_owner_name,
)
acquired = self._lock.acquire(timeout=_DEVICE_LOCK_TIMEOUT_SECONDS)
if not acquired:
raise UcdStateError(
"UCD device busy: lock timeout in "
f"UCD323Device.{op_name}, "
f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}"
)
prev_owner_tid = self._lock_owner_tid
prev_owner_name = self._lock_owner_name
self._lock_owner_tid = threading.get_ident()
self._lock_owner_name = current.name
log.info(
"UCD323Device.%s lock acquired tid=%s thread=%s",
op_name,
self._lock_owner_tid,
self._lock_owner_name,
)
try:
yield
finally:
self._lock_owner_tid = prev_owner_tid
self._lock_owner_name = prev_owner_name
self._lock.release()
# --- 状态查询 ---
@property
def state(self) -> UcdState:
return self._state
@property
def info(self) -> DeviceInfo | None:
return self._info
# --- 生命周期 ---
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
with self._acquire_device_lock("open"):
assert_transition(self._state, UcdState.OPENED)
if interface is not Interface.HDMI:
# SDK open() 当前写死 HDMISource 角色。
raise UcdConfigError(
f"暂不支持接口 {interface.value};当前仅实现 HDMI"
)
try:
ok = self._sdk.open(info.display)
except Exception as exc: # noqa: BLE001
raise UcdSdkError(f"打开设备失败: {info.display}") from exc
if not ok:
raise UcdSdkError(f"打开设备失败: {info.display}")
self._info = info
self._interface = interface
self._state = UcdState.OPENED
self._bus.publish(ConnectionChanged(DeviceKind.UCD, True, info.serial))
def close(self) -> None:
with self._acquire_device_lock("close"):
if self._state == UcdState.CLOSED:
return
try:
self._sdk.close()
except Exception: # noqa: BLE001
log.exception("关闭 UCD 时发生异常")
self._state = UcdState.CLOSED
self._curr_signal = None
self._curr_timing = None
self._curr_pattern = None
self._last_applied = None
prev_serial = self._info.serial if self._info else None
self._info = None
self._bus.publish(ConnectionChanged(DeviceKind.UCD, False, prev_serial))
# --- 信号 / 图案配置 ---
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
with self._acquire_device_lock("configure"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 configure")
try:
# 颜色模式color_format / bpc / colorimetry
if not self._sdk.set_color_mode(
signal.color_format.value,
int(signal.bpc),
_colorimetry_to_legacy_key(signal),
):
raise UcdConfigError(
f"set_color_mode 失败: {signal!r}"
)
# dynamic_range 在新接口中是一等公民
self._apply_dynamic_range(signal)
# Timing
if not self._sdk.set_timing_from_string(str(timing)):
raise UcdConfigError(f"set_timing_from_string 失败: {timing}")
except UcdConfigError:
raise
except Exception as exc: # noqa: BLE001
raise UcdSdkError("configure 异常") from exc
self._curr_signal = signal
self._curr_timing = timing
self._state = UcdState.CONFIGURED
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
with self._acquire_device_lock("set_pattern"):
# Phase 2 过渡:允许从 OPENED 直接 set_pattern——遗留路径
# test_runner 等)通过旧 controller.apply_signal_format 写入
# 信号格式,未经过本设备的 configure。此时 self._state 仍为
# OPENED但硬件实际已处于可接收 pattern 状态。
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 set_pattern")
self._curr_pattern = pattern
# 仅本地暂存,真正写硬件在 apply()
def apply(self) -> None:
with self._acquire_device_lock("apply"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 apply")
if self._curr_pattern is None:
raise UcdStateError("apply 前必须先 set_pattern")
try:
ok = self._apply_pattern_via_controller(self._curr_pattern)
except Exception as exc: # noqa: BLE001
raise UcdSdkError(
f"apply 异常: {type(exc).__name__}: {exc}"
) from exc
if not ok:
raise UcdApplyFailed(
f"apply 失败: pattern={self._curr_pattern.kind.value}"
)
# SignalApplied 事件仅在通过新 API configure 过时发出;
# 遗留路径下 self._curr_signal/_curr_timing 可能为 None。
if self._curr_signal is not None and self._curr_timing is not None:
changed = (self._curr_signal, self._curr_timing) != self._last_applied
self._last_applied = (self._curr_signal, self._curr_timing)
self._bus.publish(
SignalApplied(self._curr_signal, self._curr_timing, changed)
)
self._state = UcdState.APPLIED
self._bus.publish(PatternApplied(self._curr_pattern))
# --- 查询 ---
def current_resolution(self) -> tuple[int, int]:
try:
return self._sdk.get_current_resolution((3840, 2160))
except Exception: # noqa: BLE001
return (3840, 2160)
def search_devices(self) -> list[str]:
try:
return self._sdk.search_device() or []
except Exception as exc: # noqa: BLE001
raise UcdSdkError("枚举 UCD 设备失败") from exc
@property
def format_changed(self) -> bool:
return bool(getattr(self._sdk, "format_changed", True))
@property
def last_error(self) -> str | None:
err = getattr(self._sdk, "last_error", None)
return str(err) if err else None
# --- 测试 / GUI 遗留快捷 API ---
def apply_signal_format(
self,
*,
color_space: str | None = None,
data_range: str | None = None,
bit_depth: str | None = None,
color_format: str | None = None,
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
with self._acquire_device_lock("apply_signal_format"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 apply_signal_format")
return bool(
self._sdk.apply_signal_format(
color_space=color_space,
data_range=data_range,
bit_depth=bit_depth,
color_format=color_format,
max_cll=max_cll,
max_fall=max_fall,
)
)
def set_ucd_params(self, config) -> bool:
with self._acquire_device_lock("set_ucd_params"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 set_ucd_params")
return bool(self._sdk.set_ucd_params(config))
def send_current_pattern_params(self, pattern_params) -> bool:
with self._acquire_device_lock("send_current_pattern_params"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 send_current_pattern_params")
ok = bool(self._sdk.send_current_pattern_params(pattern_params))
if ok:
self._state = UcdState.APPLIED
return ok
def apply_config_and_run(self, config, pattern_params) -> bool:
with self._acquire_device_lock("apply_config_and_run"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 apply_config_and_run")
ctrl = self._sdk
if not ctrl.set_ucd_params(config):
return False
if not ctrl.set_pattern(ctrl.current_pattern, pattern_params):
return False
ok = bool(ctrl.run())
if ok:
self._state = UcdState.APPLIED
return ok
# --- 内部 ---
def _apply_dynamic_range(self, signal: SignalFormat) -> None:
ci = self._sdk.color_info
if ci is None:
return
if signal.dynamic_range is DynamicRange.FULL:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA
else:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool:
"""根据 PatternKind 走最合适的旧 controller 路径。"""
if pattern.kind is PatternKind.IMAGE:
if not pattern.image_path:
raise UcdConfigError("IMAGE pattern 必须提供 image_path")
return bool(self._sdk.send_image_pattern(pattern.image_path))
# 预定义图案路径:复用 controller.set_pattern + run()
video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value)
if video_pattern is None:
raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}")
self._sdk.current_pattern = video_pattern
params: list[int] | None = None
if pattern.kind is PatternKind.SOLID:
if pattern.solid_rgb is None:
raise UcdConfigError("SOLID pattern 必须提供 solid_rgb")
params = list(pattern.solid_rgb)
elif pattern.extras:
params = list(pattern.extras)
if not self._sdk.set_pattern(video_pattern, params):
raise UcdApplyFailed("controller.set_pattern 返回 False")
# Skip apply_video_mode() (i.e. pg.set_vm) the video format is already
# configured by the main signal panel and re-applying it blocks until the
# device re-locks, causing an apparent UI freeze for pattern-only sends.
if not self._sdk.apply_pattern():
raise UcdApplyFailed("controller.apply_pattern 返回 False")
if getattr(self._sdk, "current_timing", None) is None:
raise UcdConfigError(
"current_timing is None; please apply selected test profile/timing before sending pattern"
)
try:
pg, _ = self._sdk.get_tx_modules()
if not self._sdk._apply_pg_output(pg):
raise UcdApplyFailed("controller.apply_pg_output 返回 False")
except UcdApplyFailed:
raise
except Exception as exc:
raise UcdSdkError("pg.apply() 失败") from exc
return True
__all__ = [
"DeviceInfo",
"list_devices",
"IUcdDevice",
"UCD323Device",
]