diff --git a/app/device/connection.py b/app/device/connection.py index 15abe17..0f2e143 100644 --- a/app/device/connection.py +++ b/app/device/connection.py @@ -6,9 +6,8 @@ "把模块级函数当类方法装到 PQAutomationApp 上" 的写法。 - 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式, 分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径。 -- UCD 这一侧不再直接调用旧 ``UCDController``,而是通过 - :class:`UCD323Device` + :class:`EventBus`;指示器更新由订阅 - :class:`ConnectionChanged` 事件触发,与 GUI 解耦。 +- UCD 经由 :class:`UCD323Device` + :class:`EventBus` 管理; + 指示灯由 GUI 订阅带 :class:`DeviceKind` 的 :class:`ConnectionChanged` 事件更新。 模块底部仍保留 7 个旧名字函数作为薄兼容层(``check_com_connections`` 等),供 ``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接, @@ -22,9 +21,8 @@ import time from tkinter import messagebox from typing import TYPE_CHECKING -from app.ucd_domain import ConnectionChanged, UcdError +from app.ucd import ConnectionChanged, DeviceKind, DeviceInfo, UCD323Device, UcdError from drivers.caSerail import CASerail -from drivers.ucd_driver import DeviceInfo from app.views.modern_styles import get_theme_palette from typing import TYPE_CHECKING @@ -34,8 +32,7 @@ if TYPE_CHECKING: if TYPE_CHECKING: - from app.ucd_domain import EventBus - from drivers.ucd_driver import UCD323Device + from app.ucd import EventBus # ─── ConnectionController ──────────────────────────────────────── @@ -133,7 +130,7 @@ class ConnectionController: channel_value = self._app.ca_channel_var.get() ca.setChannel(f"{int(channel_value):02d}") self._app.ca = ca - self._bus.publish(ConnectionChanged(True, None)) + self._bus.publish(ConnectionChanged(DeviceKind.CA, True, None)) return True except Exception as exc: # noqa: BLE001 self._log(f"CA410 连接失败: {exc}", level="error") @@ -147,7 +144,7 @@ class ConnectionController: except Exception: # noqa: BLE001 pass self._app.ca = None - self._bus.publish(ConnectionChanged(False, None)) + self._bus.publish(ConnectionChanged(DeviceKind.CA, False, None)) self._log("CA连接已断开", level="info") # -- 一次性入口 ---------------------------------------------- diff --git a/app/services/__init__.py b/app/services/__init__.py index a27abe7..4ea00b4 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -1 +1,3 @@ -from app.services.pattern_service import PatternService, PatternSession +from app.ucd import PatternService, PatternSession + +__all__ = ["PatternService", "PatternSession"] diff --git a/app/services/ucd_service.py b/app/services/ucd_service.py deleted file mode 100644 index d4b9d8b..0000000 --- a/app/services/ucd_service.py +++ /dev/null @@ -1,219 +0,0 @@ -"""UCD 信号 / 图案应用服务层。 - -服务层是 GUI ↔ Driver 的唯一通道,负责: -- 将 UI 字符串("BT.709"、"10bit"、"YCbCr 4:4:4" 等)翻译成 :class:`SignalFormat`; -- 将各 panel 的 timing 字符串翻译成 :class:`TimingSpec`; -- 协调 :meth:`IUcdDevice.configure` / ``set_pattern`` / ``apply`` 的调用顺序; -- 通过 :class:`EventBus` 让 GUI 订阅状态变化,而非主动轮询。 - -本层不直接 import UniTAP,也不读取 :mod:`tkinter` 变量; -所有输入都是显式参数,便于单测。 - -线程安全由 :class:`UCD323Device` 的设备锁统一保证,本层不再重复加锁。 -""" - -from __future__ import annotations - -import logging - -from app.ucd_domain import ( - Colorimetry, - DynamicRange, - EventBus, - PatternKind, - PatternSpec, - SignalFormat, - TimingSpec, - UcdError, - UcdState, - bit_depth_str_to_bpc, - color_space_to_colorimetry, - data_range_to_dynamic_range, - output_format_to_color_format, - parse_timing_str, -) -from drivers.ucd_driver import IUcdDevice - -log = logging.getLogger(__name__) - - -# ─── 视图字符串 → 值对象 转换工具 ──────────────────────────────── - - -def build_signal_format( - *, - color_space: str, - output_format: str, - bit_depth: str, - data_range: str = "Full", -) -> SignalFormat: - """根据下拉框字符串组装 :class:`SignalFormat`。 - - 各参数解析失败抛 :class:`UcdConfigError`。 - """ - return SignalFormat( - color_format=output_format_to_color_format(output_format), - colorimetry=color_space_to_colorimetry(color_space), - bpc=bit_depth_str_to_bpc(bit_depth), - dynamic_range=data_range_to_dynamic_range(data_range), - ) - - -def build_signal_format_from_profile( - *, - color_space: str, - color_format: str, - bpc: int, - data_range: str = "Full", -) -> SignalFormat: - """从 PQConfig test_type 条目组装 :class:`SignalFormat`。""" - bit_depth = f"{int(bpc)}bit" - return build_signal_format( - color_space=color_space, - output_format=color_format, - bit_depth=bit_depth, - data_range=data_range, - ) - - -def build_timing(timing_str: str) -> TimingSpec: - """``"DMT 3840x2160@60Hz"`` → :class:`TimingSpec`。""" - return parse_timing_str(timing_str) - - -def solid_rgb_pattern(rgb: tuple[int, int, int] | list[int]) -> PatternSpec: - r, g, b = rgb[0], rgb[1], rgb[2] - return PatternSpec(kind=PatternKind.SOLID, solid_rgb=(int(r), int(g), int(b))) - - -def image_pattern(path: str) -> PatternSpec: - return PatternSpec(kind=PatternKind.IMAGE, image_path=path) - - -# ─── 服务 ──────────────────────────────────────────────────────── - - -class SignalService: - """协调 SignalFormat / Timing / Pattern 的写入与提交。""" - - def __init__(self, device: IUcdDevice, bus: EventBus): - self._dev = device - self._bus = bus - - # -- 高层接口 ------------------------------------------------ - - def apply( - self, - *, - signal: SignalFormat, - timing: TimingSpec, - pattern: PatternSpec, - ) -> bool: - """一次性提交信号格式 + timing + 图案。 - - Returns: - ``format_changed``——本次相对上一次 :meth:`apply` 是否变化。 - """ - log.info( - "SignalService.apply signal=%s timing=%s pattern=%s", - signal, - timing, - pattern.kind.value, - ) - changed = self._dev.configure(signal, timing) - self._dev.set_pattern(pattern) - self._dev.apply() - return changed - - def send_pattern(self, pattern: PatternSpec) -> None: - """在已 configure 的信号上仅更新图案后 apply。""" - log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) - self._dev.set_pattern(pattern) - self._dev.apply() - - def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None: - self.send_pattern(solid_rgb_pattern(rgb)) - - def send_image(self, path: str) -> None: - self.send_pattern(image_pattern(path)) - - def update_signal_format( - self, - *, - color_space: str, - output_format: str, - bit_depth: str, - data_range: str = "Full", - max_cll: int | None = None, - max_fall: int | None = None, - ) -> bool: - """仅将信号格式提交到 SDK(沿用上一次的 timing),不切换图案。 - - UI 字符串先经域层解析做参数校验;解析失败抛 :class:`UcdConfigError`。 - """ - _ = build_signal_format( - color_space=color_space, - output_format=output_format, - bit_depth=bit_depth, - data_range=data_range, - ) - return self._dev.apply_signal_format( - color_space=color_space, - color_format=output_format, - bit_depth=bit_depth, - data_range=data_range, - max_cll=max_cll, - max_fall=max_fall, - ) - - # -- 透传给上层的查询 --------------------------------------- - - @property - def device(self) -> IUcdDevice: - return self._dev - - def current_resolution(self) -> tuple[int, int]: - return self._dev.current_resolution() - - @property - def is_connected(self) -> bool: - """UCD 设备是否已打开。供 GUI 做前置校验。""" - return self._dev.state != UcdState.CLOSED - - @property - def format_changed(self) -> bool: - """最近一次视频模式提交是否相对上次发生变化。""" - return self._dev.format_changed - - @property - def last_error(self) -> str | None: - return self._dev.last_error - - def apply_config(self, config) -> bool: - """按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern(不 apply 输出)。""" - return bool(self._dev.set_ucd_params(config)) - - def send_pattern_params(self, params) -> bool: - """以 ``params`` 更新当前 pattern 的参数并 apply。""" - return bool(self._dev.send_current_pattern_params(params)) - - def apply_and_run(self, config, pattern_params) -> bool: - """``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。""" - return bool(self._dev.apply_config_and_run(config, pattern_params)) - - -__all__ = [ - "SignalService", - "build_signal_format", - "build_signal_format_from_profile", - "build_timing", - "solid_rgb_pattern", - "image_pattern", - "SignalFormat", - "TimingSpec", - "PatternSpec", - "PatternKind", - "Colorimetry", - "DynamicRange", - "UcdError", -] diff --git a/app/ucd/__init__.py b/app/ucd/__init__.py new file mode 100644 index 0000000..283ef4e --- /dev/null +++ b/app/ucd/__init__.py @@ -0,0 +1,28 @@ +"""UCD 信号发生器 — domain / enum / device / service。 + +GUI 与测试代码通常只需:: + + from app.ucd import SignalService, UCD323Device, EventBus +""" +from app.ucd.domain import * # noqa: F403 +from app.ucd.enum import UCDEnum +from app.ucd.device import ( + DeviceInfo, + IUcdDevice, + UCD323Device, + list_devices, +) +from app.ucd.service import PatternService, PatternSession, SignalService + +__all__ = [ + "SignalService", + "PatternService", + "PatternSession", + "UCD323Device", + "IUcdDevice", + "DeviceInfo", + "UCDEnum", + "EventBus", + "ConnectionChanged", + "DeviceKind", +] diff --git a/app/ucd/device.py b/app/ucd/device.py new file mode 100644 index 0000000..9296b65 --- /dev/null +++ b/app/ucd/device.py @@ -0,0 +1,1267 @@ +"""UCD 设备层:UniTAP SDK 后端与 IUcdDevice 实现。 + +文件结构 +-------- +§1 模块级辅助函数 +§2 ``_UcdSdkBackend`` — UniTAP 直接调用(内部类,上层勿用) +§3 ``DeviceInfo`` / ``list_devices`` +§4 ``IUcdDevice`` 抽象接口 +§5 ``UCD323Device`` 生产实现(线程锁 + 状态机 + EventBus) +""" +from __future__ import annotations + +import gc +import logging +import threading +import time +from abc import ABC, abstractmethod +from contextlib import contextmanager +from dataclasses import dataclass + +import UniTAP + +from app.ucd.domain import ( + Colorimetry, + ConnectionChanged, + DeviceKind, + DynamicRange, + EventBus, + Interface, + PatternApplied, + PatternKind, + PatternSpec, + SignalApplied, + SignalFormat, + TimingSpec, + UcdApplyFailed, + UcdConfigError, + UcdNotConnected, + UcdSdkError, + UcdState, + UcdStateError, + assert_transition, + is_ycbcr, +) +from app.ucd.enum import UCDEnum + +log = logging.getLogger(__name__) +_DEVICE_LOCK_TIMEOUT_SECONDS = 8.0 + +# ─── §1 模块级辅助函数 ───────────────────────────────────────── + + +def _find_best_id_in_dict(res_map, width, height, refresh_rate): + best_id, best_diff = None, float("inf") + for rid, info in res_map.items(): + if info["width"] == width and info["height"] == height: + diff = abs(float(info["refresh_rate"]) - refresh_rate) + if diff < best_diff: + best_diff = diff + best_id = rid + return best_id if best_diff <= 1.0 else None + + +def _find_best_id_in_list_map(res_map, width, height, refresh_rate): + best_id, best_diff = None, float("inf") + for rid, infos in res_map.items(): + for info in infos: + if info["width"] == width and info["height"] == height: + diff = abs(float(info["refresh_rate"]) - refresh_rate) + if diff < best_diff: + best_diff = diff + best_id = rid + return best_id if best_diff <= 1.0 else None + + +def _colorimetry_to_legacy_key(signal: SignalFormat) -> str: + """domain.Colorimetry → UCDEnum.ColorInfo 查找 key。""" + cm = signal.colorimetry + ycbcr = is_ycbcr(signal.color_format) + if cm is Colorimetry.BT2020: + return "bt2020ycbcr" if ycbcr else "bt2020rgb" + return { + Colorimetry.SRGB: "srgb", + Colorimetry.BT709: "bt709", + Colorimetry.BT601: "bt601", + Colorimetry.DCI_P3: "dcip3", + Colorimetry.ADOBE_RGB: "adobergb", + }.get(cm, "srgb") + +# ─── §2 _UcdSdkBackend(UniTAP)────────────────────────────────── + +class _UcdSdkBackend: + """UniTAP SDK 封装。仅由 UCD323Device 调用。""" + + def __init__(self): + self.lUniTAP = UniTAP.TsiLib() + self.dev = None + self.role = None + self.timing_manager = None + self.config = None + self.color_info = None + self.status = False + self.current_interface = "HDMI" + + self.current_timing = None + self.current_pattern = None + self.current_pattern_param = None + self.current_pattern_params = None + self.current_pattern_index = 0 + self.last_error = None + + def search_device(self): + """搜索可用设备""" + available_devices = self.lUniTAP.get_list_of_available_devices() + return available_devices if available_devices else [] + + def open(self, device_name): + """打开设备""" + temp_dev = None + + try: + if self.dev is not None or self.status: + self._force_cleanup() + + device_id = int(device_name.split(":")[0]) + temp_dev = self.lUniTAP.open(device_id) + + try: + self.role = temp_dev.select_role(UniTAP.dev.UCD323.HDMISource) + self.dev = temp_dev + self.current_interface = "HDMI" + + except Exception as role_error: + self._close_device_object(temp_dev) + raise role_error + + pg, ag = self.get_tx_modules() + self.timing_manager = pg.timing_manager + self.color_info = UniTAP.ColorInfo() + self._stop_audio_output(ag) + self.status = True + + return True + + except Exception as e: + self._force_cleanup() + return False + + def close(self): + """关闭设备""" + try: + if self.dev: + try: + self._stop_audio_output() + except Exception: + pass + self._close_device_object(self.dev) + + self._reset_state() + self.lUniTAP = None + + for i in range(3): + gc.collect() + + time.sleep(2.0) + + self.lUniTAP = UniTAP.TsiLib() + + return True + + except Exception as e: + self._reset_state() + + try: + self.lUniTAP = None + gc.collect() + time.sleep(2.0) + self.lUniTAP = UniTAP.TsiLib() + except Exception as init_error: + pass + + return False + + def _reset_state(self): + """重置所有运行时状态(不关闭设备句柄)""" + self.dev = None + self.role = None + self.status = False + self.timing_manager = None + self.current_timing = None + self.current_pattern = None + self.current_pattern_param = None + self.current_pattern_params = None + self.current_pattern_index = 0 + self.current_interface = "HDMI" + + def _force_cleanup(self): + """强制清理所有状态""" + try: + if self.dev: + self._close_device_object(self.dev) + + self._reset_state() + + except Exception as e: + pass + + def _close_device_object(self, dev_obj): + """显式关闭设备对象""" + try: + if dev_obj is None: + return + + if self.lUniTAP and hasattr(self.lUniTAP, "close"): + try: + self.lUniTAP.close(dev_obj) + except Exception as e: + pass + + dev_obj = None + gc.collect() + time.sleep(1.0) + + except Exception as e: + pass + + def _cleanup(self): + """清理设备资源""" + try: + if self.dev: + self._close_device_object(self.dev) + self.dev = None + + if hasattr(self.lUniTAP, "cleanup"): + self.lUniTAP.cleanup() + + except Exception as e: + pass + + def get_tx_modules(self): + """根据当前接口返回 (pg, ag) 模块。""" + if not self.role: + raise RuntimeError("UCD 未打开,无法获取 TX 模块") + + interface = getattr(self, "current_interface", None) + log.info("UcdSdk.get_tx_modules interface=%s", interface) + if interface in (None, "HDMI"): + return self.role.hdtx.pg, self.role.hdtx.ag + if interface in ("DP", "Type-C"): + return self.role.dptx.pg, self.role.dptx.ag + raise ValueError(f"不支持的接口类型: {interface}") + + def _stop_audio_output(self, ag=None) -> None: + """关闭 HDMI/DP 音频发生器。PQ 测试仅需视频图案,避免电视持续输出测试音。""" + if not self.status or not self.role: + return + try: + if ag is None: + _, ag = self.get_tx_modules() + ag.stop_generate() + log.info("UcdSdk._stop_audio_output done") + except Exception: + log.exception("UcdSdk._stop_audio_output failed") + + def _apply_pg_output(self, pg) -> bool: + """提交 PG 输出,并确保音频发生器处于关闭状态。""" + try: + ok = bool(pg.apply()) + except Exception: + log.exception("UcdSdk._apply_pg_output pg.apply failed") + return False + self._stop_audio_output() + return ok + + def _resolve_timing(self, pg=None): + """优先从 current_timing 读取 timing,必要时回退到 TX 模块。""" + if self.current_timing is not None: + return self.current_timing + + if pg is not None: + get_vm = getattr(pg, "get_vm", None) + if callable(get_vm): + try: + vm = get_vm() + return getattr(vm, "timing", None) + except Exception: + pass + + return None + + def get_current_resolution(self, default=(3840, 2160)): + """从当前 timing 获取 (width, height),失败时返回 default。""" + try: + pg, _ = self.get_tx_modules() + timing = self._resolve_timing(pg) + if timing and hasattr(timing, "h_active") and hasattr(timing, "v_active"): + return timing.h_active, timing.v_active + except Exception: + pass + + return default + + def set_color_mode(self, cf, bpc, cm): + """设置颜色模式""" + current_dynamic_range = self.color_info.dynamic_range + + color_format = UCDEnum.ColorInfo.get_color_format(cf) + if color_format is None: + fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(cf) + color_format = UCDEnum.ColorInfo.get_color_format(fmt_key) + if color_format is None: + return False + + if not isinstance(bpc, int) or bpc <= 0: + return False + + colorimetry = UCDEnum.ColorInfo.get_colorimetry(cm) + if colorimetry is None: + return False + + self.color_info.color_format = color_format + self.color_info.bpc = bpc + self.color_info.colorimetry = colorimetry + self.color_info.dynamic_range = current_dynamic_range + + return True + + def apply_signal_format( + self, color_space=None, data_range=None, bit_depth=None, color_format=None, **_ + ): + """统一设置信号格式(color_format / colorimetry / dynamic_range / bpc)。 + 注:Gamma/EOTF 传输特性在 ColorInfo API 中不存在; + max_cll / max_fall 暂无对应 SDK 接口,通过 **_ 接收后忽略。 + """ + try: + if color_format: + fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(color_format) + cf = UCDEnum.ColorInfo.get_color_format(fmt_key) + if cf is not None: + self.color_info.color_format = cf + + if color_space: + colorimetry = self._get_colorimetry_from_color_space(color_space, color_format) + if colorimetry: + self.color_info.colorimetry = colorimetry + + if data_range: + if data_range == "Full": + self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA + elif data_range == "Limited": + self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA + + if bit_depth: + bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth) + self.color_info.bpc = bpc + + if self.current_timing: + self.set_video_mode() + + return True + + except Exception: + return False + + def _get_colorimetry_from_color_space(self, color_space, color_format=None): + """将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry。 + BT.2020 在 YCbCr 输出时使用 CM_ITUR_BT2020_YCbCr,RGB 输出时使用 CM_ITUR_BT2020_RGB。 + """ + is_ycbcr = UCDEnum.SignalFormat.OutputFormat.is_ycbcr(color_format) + bt2020_cm = ( + UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_YCbCr + if is_ycbcr + else UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB + ) + colorimetry_map = { + "sRGB": UniTAP.ColorInfo.Colorimetry.CM_sRGB, + "BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709, + "BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601, + "BT.2020": bt2020_cm, + "DCI-P3": UniTAP.ColorInfo.Colorimetry.CM_DCI_P3, + } + return colorimetry_map.get(color_space) + + def apply_video_mode(self): + """应用当前 color_info 和 timing""" + if self.current_timing: + log.info("UcdSdk.apply_video_mode start timing=%s", self.current_timing) + self.set_video_mode() + log.info("UcdSdk.apply_video_mode done") + return True + log.warning("UcdSdk.apply_video_mode skipped: current_timing is None") + return False + + def set_video_mode(self): + """设置视频模式""" + # 对比上次发出的配置,判断是否会触发电视重新锁定信号 + current_config = ( + self.current_timing, + self.color_info.color_format, + self.color_info.colorimetry, + self.color_info.dynamic_range, + self.color_info.bpc, + ) + self.format_changed = (current_config != getattr(self, "_last_sent_config", None)) + log.info( + "UcdSdk.set_video_mode format_changed=%s color_format=%s colorimetry=%s dynamic_range=%s bpc=%s", + self.format_changed, + self.color_info.color_format, + self.color_info.colorimetry, + self.color_info.dynamic_range, + self.color_info.bpc, + ) + if not self.format_changed: + log.info("UcdSdk.set_video_mode skipped pg.set_vm(): config unchanged") + return True + + video_mode = UniTAP.VideoMode( + timing=self.current_timing, color_info=self.color_info + ) + pg, _ = self.get_tx_modules() + log.info("UcdSdk.set_video_mode calling pg.set_vm()") + pg.set_vm(vm=video_mode) + self._stop_audio_output() + log.info("UcdSdk.set_video_mode done") + self._last_sent_config = current_config + return True + + def parse_formatted_timing(self, timing_str): + """解析格式化的timing字符串""" + if not isinstance(timing_str, str): + raise ValueError("timing_str 必须是字符串") + + s = " ".join(timing_str.strip().split()) + s = s.replace(" x", "x").replace("x ", "x") + + parts = s.split(" ", 1) + if len(parts) < 2: + raise ValueError(f"无法解析timing: {timing_str}") + type_str = parts[0].strip().upper() + rest = parts[1].strip() + + if "@" not in rest: + raise ValueError(f"无法解析timing(缺少 @): {timing_str}") + left, right = [p.strip() for p in rest.split("@", 1)] + + if "x" not in left: + raise ValueError(f"无法解析分辨率(缺少 x): {timing_str}") + wh = left.split("x") + if len(wh) != 2: + raise ValueError(f"无法解析分辨率: {timing_str}") + try: + width = int(wh[0]) + height = int(wh[1]) + except Exception: + raise ValueError(f"分辨率数字解析失败: {timing_str}") + + hz_str = right.replace("Hz", "").replace("HZ", "").strip() + try: + refresh_rate = float(hz_str) + except Exception: + raise ValueError(f"刷新率解析失败: {timing_str}") + + rtype_map = { + "DMT": "dmt", + "CTA": "cta", + "CVT": "cvt", + "OVT": "ovt", + } + if type_str not in rtype_map: + raise ValueError(f"未知的分辨率类型: {type_str}") + resolution_type = rtype_map[type_str] + + resolution_id = None + if resolution_type == "dmt": + resolution_id = _find_best_id_in_dict( + UCDEnum.TimingInfo.dmt_resolution_map, width, height, refresh_rate + ) + elif resolution_type == "cta": + resolution_id = _find_best_id_in_dict( + UCDEnum.TimingInfo.cta_resolution_map, width, height, refresh_rate + ) + elif resolution_type == "cvt": + resolution_id = _find_best_id_in_list_map( + UCDEnum.TimingInfo.cvt_resolution_map, width, height, refresh_rate + ) + elif resolution_type == "ovt": + resolution_id = _find_best_id_in_list_map( + UCDEnum.TimingInfo.ovt_resolution_map, width, height, refresh_rate + ) + + result = { + "resolution_type": resolution_type, + "width": width, + "height": height, + "refresh_rate": refresh_rate, + "resolution_id": resolution_id, + } + + return result + + def search_timing(self, width, height, refresh_rate, resolution_type=None): + """根据分辨率参数搜索合适的timing""" + if resolution_type: + resolution_type = resolution_type.lower() + standard = None + if resolution_type == "dmt": + standard = UniTAP.common.timing.Timing.Standard.SD_DMT + elif resolution_type == "cta": + standard = UniTAP.common.timing.Timing.Standard.SD_CTA + elif resolution_type == "cvt": + standard = UniTAP.common.timing.Timing.Standard.SD_CVT + + rr = float(refresh_rate) + # Try both exact and NTSC-compatible rates (e.g. 120000 / 119880). + f_rate_candidates = [ + int(round(rr * 1000)), + int(rr * 1000), + int(round((rr * 1000.0) * 1000.0 / 1001.0)), + ] + # 去重并保持顺序 + f_rate_candidates = list(dict.fromkeys(f_rate_candidates)) + + standards = [standard] + if standard is not None: + standards.append(None) + + for std in standards: + for f_rate in f_rate_candidates: + timing = self.timing_manager.search( + h_active=width, + v_active=height, + f_rate=f_rate, + standard=std, + ) + if timing: + return timing + else: + for res_type in ["dmt", "cta", "cvt", "ovt"]: + result = self.search_timing(width, height, refresh_rate, res_type) + if result: + return result + + return None + + def set_timing_from_id(self, rtype, rid): + """根据(type, id)设置设备timing""" + timing = None + if rtype.lower() == "dmt": + timing = self.timing_manager.get_dmt(rid) + elif rtype.lower() == "cta": + timing = self.timing_manager.get_cta(rid) + elif rtype.lower() == "cvt": + timing = self.timing_manager.get_cvt(rid) + elif rtype.lower() == "ovt": + get_ovt = getattr(self.timing_manager, "get_ovt", None) + if callable(get_ovt): + timing = get_ovt(rid) + else: + return False + else: + raise ValueError(f"不支持的分辨率类型: {rtype}") + + if timing: + self.current_timing = timing + return True + else: + return False + + def set_timing_from_string(self, timing_str): + """根据格式化timing字符串设置设备timing""" + try: + spec = self.parse_formatted_timing(timing_str) + except Exception: + log.exception("UcdSdk.set_timing_from_string parse failed timing=%s", timing_str) + return False + + rtype = spec["resolution_type"] + rid = spec.get("resolution_id") + width = spec["width"] + height = spec["height"] + fr = spec["refresh_rate"] + + if rid is not None and self.set_timing_from_id(rtype, rid): + log.info( + "UcdSdk.set_timing_from_string success by id timing=%s parsed=(%s id=%s)", + timing_str, + rtype, + rid, + ) + return True + + # Respect selected timing family first (DMT/CTA/CVT/OVT). + timing = self.search_timing(width, height, fr, rtype) + if timing is None: + # Fallback only for robustness: some SDKs may not classify a timing + # exactly as requested family even though width/height/fps matches. + timing = self.search_timing(width, height, fr, None) + + if timing: + self.current_timing = timing + log.info( + "UcdSdk.set_timing_from_string success timing=%s parsed=(%s %sx%s@%s)", + timing_str, + rtype, + width, + height, + fr, + ) + return True + + log.error( + "UcdSdk.set_timing_from_string no timing matched timing=%s parsed=(%s %sx%s@%s)", + timing_str, + rtype, + width, + height, + fr, + ) + return False + + def set_pattern(self, pattern, pattern_params=None): + """设置pattern""" + if self.current_timing is None: + # Pattern-only updates (e.g. Calman patch click) can still be applied on + # an already active output mode. Missing timing should not block pattern staging. + log.warning("UcdSdk.set_pattern current_timing is None; continue with pattern-only apply") + + needs_params = { + UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, + UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, + UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips, + UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes, + UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern, + UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow, + } + log.info( + "UcdSdk.set_pattern pattern=%s pattern_params=%s needs_params=%s", + getattr(pattern, "name", pattern), + pattern_params, + pattern in needs_params, + ) + if pattern in needs_params and pattern_params is not None: + self.set_pattern_params(pattern, pattern_params) + return True + + def set_pattern_params(self, pattern, pattern_params): + """设置pattern参数""" + if pattern is not None: + solid_color_patterns = { + UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, + UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, + } + if pattern in solid_color_patterns: + log.info("UcdSdk.set_pattern_params solid_color rgb=%s", pattern_params) + self.current_pattern_param = UniTAP.SolidColorParams( + first=pattern_params[0], + second=pattern_params[1], + third=pattern_params[2], + ) + return True + log.warning("UcdSdk.set_pattern_params unsupported pattern=%s", getattr(pattern, "name", pattern)) + return False + + def apply_pattern(self): + """应用当前pattern""" + if self.current_pattern is not None: + log.info( + "UcdSdk.apply_pattern start pattern=%s has_params=%s", + getattr(self.current_pattern, "name", self.current_pattern), + self.current_pattern_param is not None, + ) + pg, _ = self.get_tx_modules() + log.info("UcdSdk.apply_pattern calling pg.set_pattern()") + pg.set_pattern(self.current_pattern) + + if self.current_pattern_param is not None: + log.info("UcdSdk.apply_pattern calling pg.set_pattern_params()") + pg.set_pattern_params(self.current_pattern_param) + log.info("UcdSdk.apply_pattern done") + return True + log.warning("UcdSdk.apply_pattern skipped: current_pattern is None") + return False + + def set_next_pattern(self): + """设置下一个pattern""" + if self.current_pattern_index < len(self.current_pattern_params): + p = self.current_pattern_params[self.current_pattern_index] + self.set_pattern(self.current_pattern, p) + self.current_pattern_index += 1 + else: + error_msg = ( + f"No more patterns to set. (已设置 {self.current_pattern_index} 个图案)" + ) + raise IndexError(error_msg) + + def set_ucd_params(self, config): + """设置UCD323参数""" + self.last_error = None + self.config = config + test_type = self.config.current_test_type + + pg, _ = self.get_tx_modules() + self.timing_manager = pg.timing_manager + + color_format = self.config.current_test_types[test_type]["color_format"] + bpc = self.config.current_test_types[test_type]["bpc"] + colorimetry = self.config.current_test_types[test_type]["colorimetry"] + + if not self.set_color_mode(color_format, bpc, colorimetry): + self.last_error = ( + f"set_color_mode failed: color_format={color_format}, bpc={bpc}, colorimetry={colorimetry}" + ) + log.error( + "UcdSdk.set_ucd_params set_color_mode failed test_type=%s color_format=%s bpc=%s colorimetry=%s", + test_type, + color_format, + bpc, + colorimetry, + ) + return False + + timing_str = self.config.current_test_types[test_type]["timing"] + if not self.set_timing_from_string(timing_str): + self.last_error = f"set_timing_from_string failed: timing={timing_str}" + log.error( + "UcdSdk.set_ucd_params set_timing_from_string failed test_type=%s timing=%s", + test_type, + timing_str, + ) + return False + + self.current_pattern_index = 0 + pattern_mode = self.config.current_pattern["pattern_mode"] + pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode) + + if pattern is None: + self.last_error = f"get_video_pattern failed: pattern_mode={pattern_mode}" + return False + + self.current_pattern = pattern + self.current_pattern_params = self.config.current_pattern["pattern_params"] + + return True + + def run(self): + """运行设备""" + log.info( + "UcdSdk.run start current_pattern=%s has_pattern_param=%s", + getattr(self.current_pattern, "name", self.current_pattern), + self.current_pattern_param is not None, + ) + self.apply_video_mode() + self.apply_pattern() + pg, _ = self.get_tx_modules() + log.info("UcdSdk.run calling pg.apply()") + ok = self._apply_pg_output(pg) + log.info("UcdSdk.run done ok=%s", ok) + return ok + + def send_current_pattern_params(self, pattern_params): + """发送当前已配置的 pattern,并可附带当前 pattern 参数。""" + if not self.status or not self.role: + return False + + try: + if self.current_pattern is None: + log.error("UcdSdk.send_current_pattern_params failed: current_pattern is None") + return False + + log.info( + "UcdSdk.send_current_pattern_params pattern=%s params=%s", + getattr(self.current_pattern, "name", self.current_pattern), + pattern_params, + ) + if pattern_params is not None and not self.set_pattern( + self.current_pattern, + pattern_params, + ): + log.error("UcdSdk.send_current_pattern_params failed: set_pattern returned False") + return False + + log.info("UcdSdk.send_current_pattern_params calling run()") + self.run() + log.info("UcdSdk.send_current_pattern_params done") + return True + except Exception: + log.exception("UcdSdk.send_current_pattern_params exception") + return False + + def send_solid_rgb_pattern(self, rgb): + """发送纯色 RGB Pattern(依赖当前 timing/color_info 状态)。""" + if not self.status or not self.role: + return False + + try: + log.info("UcdSdk.send_solid_rgb_pattern rgb=%s", rgb) + self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor") + if self.current_pattern is None: + log.error("UcdSdk.send_solid_rgb_pattern failed: solidcolor pattern not found") + return False + + return self.send_current_pattern_params(list(rgb)) + except Exception: + log.exception("UcdSdk.send_solid_rgb_pattern exception") + return False + + def send_image_pattern(self, image_path): + """发送图片 Pattern(依赖当前 timing/color_info 状态)。""" + if not self.status or not self.role: + return False + + try: + pg, _ = self.get_tx_modules() + # 仅切换图案,不重复 set_vm;重复 apply video mode 会触发电视 HDMI 重锁发声。 + if getattr(self, "_last_sent_config", None) is None: + self.apply_video_mode() + pg.set_pattern(pattern=image_path) + return self._apply_pg_output(pg) + except Exception: + return False + +# ─── §3 DeviceInfo / list_devices ──────────────────────────────── + + +@dataclass(frozen=True) +class DeviceInfo: + """UCD 设备发现条目。 + + ``display`` 是 SDK 给出的完整字符串(``"0: UCD-323 #12345678"``); + ``index`` / ``serial`` / ``model`` 通过解析得到,解析失败时为 None。 + """ + + display: str + index: int | None = None + serial: str | None = None + model: str | None = None + + @classmethod + def parse(cls, display: str) -> "DeviceInfo": + idx: int | None = None + model: str | None = None + serial: str | None = None + try: + head, rest = display.split(":", 1) + idx = int(head.strip()) + rest = rest.strip() + # 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)" + tokens = rest.split() + if tokens: + model = tokens[0] + for tok in tokens[1:]: + if tok.startswith("#") and len(tok) >= 2: + serial = tok.lstrip("#") + break + except Exception: # noqa: BLE001 - 解析失败保留原 display 即可 + pass + return cls(display=display, index=idx, serial=serial, model=model) + + +def list_devices(sdk: "_UcdSdkBackend") -> list[DeviceInfo]: + """通过给定的 SDK 后端枚举可用 UCD 设备。""" + try: + raw_list = sdk.search_device() + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("枚举 UCD 设备失败") from exc + return [DeviceInfo.parse(s) for s in (raw_list or [])] + + +# ─── §4 IUcdDevice 抽象接口 ────────────────────────────────────── + + +class IUcdDevice(ABC): + """UCD 信号发生器抽象设备。 + + 上层(Service / GUI)**只**通过本接口操作硬件,不得穿透到 + UniTAP SDK 或具体实现细节。 + """ + + @property + @abstractmethod + def state(self) -> UcdState: ... + + @property + @abstractmethod + def info(self) -> DeviceInfo | None: ... + + @abstractmethod + def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: + """打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。""" + + @abstractmethod + def close(self) -> None: + """关闭设备(幂等)。""" + + @abstractmethod + def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: + """写入信号格式与 timing(未 apply)。返回 ``format_changed``。""" + + @abstractmethod + def set_pattern(self, pattern: PatternSpec) -> None: + """设置当前图案(未 apply)。""" + + @abstractmethod + def apply(self) -> None: + """将已配置的信号格式 + 图案一次性提交给硬件。""" + + @abstractmethod + def current_resolution(self) -> tuple[int, int]: + """读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。""" + + @abstractmethod + def search_devices(self) -> list[str]: + """枚举可用设备的 SDK 显示字符串列表。""" + + @property + @abstractmethod + def format_changed(self) -> bool: + """最近一次视频模式提交是否相对上次发生变化。""" + + @property + @abstractmethod + def last_error(self) -> str | None: + """最近一次配置/应用失败时的错误描述。""" + + @abstractmethod + # --- 测试 / GUI 遗留快捷 API --- + + def apply_signal_format( + self, + *, + color_space: str | None = None, + data_range: str | None = None, + bit_depth: str | None = None, + color_format: str | None = None, + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + """仅更新信号格式(沿用当前 timing),不切换图案。""" + + @abstractmethod + def set_ucd_params(self, config) -> bool: + """按 PQConfig stage 色彩 / Timing / Pattern 类型(不 apply 输出)。""" + + @abstractmethod + def send_current_pattern_params(self, pattern_params) -> bool: + """更新当前 pattern 参数并 apply 到硬件。""" + + @abstractmethod + def apply_config_and_run(self, config, pattern_params) -> bool: + """set_ucd_params + set_pattern + run 复合操作。""" + + +# ─── §5 UCD323Device 真实实现 ──────────────────────────────────── + + +class UCD323Device(IUcdDevice): + """生产环境实现。内部委托给本模块的 :class:`_UcdSdkBackend`。""" + + def __init__(self, bus: EventBus, sdk: "_UcdSdkBackend | None" = None): + self._bus = bus + self._sdk: "_UcdSdkBackend" = sdk or _UcdSdkBackend() + self._state: UcdState = UcdState.CLOSED + self._info: DeviceInfo | None = None + self._interface: Interface = Interface.HDMI + self._lock = threading.RLock() + self._lock_owner_tid: int | None = None + self._lock_owner_name: str | None = None + + self._curr_signal: SignalFormat | None = None + self._curr_timing: TimingSpec | None = None + self._curr_pattern: PatternSpec | None = None + self._last_applied: tuple[SignalFormat, TimingSpec] | None = None + + @contextmanager + def _acquire_device_lock(self, op_name: str): + current = threading.current_thread() + log.info( + "UCD323Device.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s", + op_name, + _DEVICE_LOCK_TIMEOUT_SECONDS, + threading.get_ident(), + current.name, + self._lock_owner_tid, + self._lock_owner_name, + ) + acquired = self._lock.acquire(timeout=_DEVICE_LOCK_TIMEOUT_SECONDS) + if not acquired: + raise UcdStateError( + "UCD device busy: lock timeout in " + f"UCD323Device.{op_name}, " + f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}" + ) + prev_owner_tid = self._lock_owner_tid + prev_owner_name = self._lock_owner_name + self._lock_owner_tid = threading.get_ident() + self._lock_owner_name = current.name + log.info( + "UCD323Device.%s lock acquired tid=%s thread=%s", + op_name, + self._lock_owner_tid, + self._lock_owner_name, + ) + try: + yield + finally: + self._lock_owner_tid = prev_owner_tid + self._lock_owner_name = prev_owner_name + self._lock.release() + + # --- 状态查询 --- + + @property + def state(self) -> UcdState: + return self._state + + @property + def info(self) -> DeviceInfo | None: + return self._info + + # --- 生命周期 --- + + def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: + with self._acquire_device_lock("open"): + assert_transition(self._state, UcdState.OPENED) + if interface is not Interface.HDMI: + # SDK open() 当前写死 HDMISource 角色。 + raise UcdConfigError( + f"暂不支持接口 {interface.value};当前仅实现 HDMI" + ) + try: + ok = self._sdk.open(info.display) + except Exception as exc: # noqa: BLE001 + raise UcdSdkError(f"打开设备失败: {info.display}") from exc + if not ok: + raise UcdSdkError(f"打开设备失败: {info.display}") + self._info = info + self._interface = interface + self._state = UcdState.OPENED + self._bus.publish(ConnectionChanged(DeviceKind.UCD, True, info.serial)) + + def close(self) -> None: + with self._acquire_device_lock("close"): + if self._state == UcdState.CLOSED: + return + try: + self._sdk.close() + except Exception: # noqa: BLE001 + log.exception("关闭 UCD 时发生异常") + self._state = UcdState.CLOSED + self._curr_signal = None + self._curr_timing = None + self._curr_pattern = None + self._last_applied = None + prev_serial = self._info.serial if self._info else None + self._info = None + self._bus.publish(ConnectionChanged(DeviceKind.UCD, False, prev_serial)) + + # --- 信号 / 图案配置 --- + + def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: + with self._acquire_device_lock("configure"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 configure") + try: + # 颜色模式(color_format / bpc / colorimetry) + if not self._sdk.set_color_mode( + signal.color_format.value, + int(signal.bpc), + _colorimetry_to_legacy_key(signal), + ): + raise UcdConfigError( + f"set_color_mode 失败: {signal!r}" + ) + # dynamic_range 在新接口中是一等公民 + self._apply_dynamic_range(signal) + + # Timing + if not self._sdk.set_timing_from_string(str(timing)): + raise UcdConfigError(f"set_timing_from_string 失败: {timing}") + except UcdConfigError: + raise + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("configure 异常") from exc + + self._curr_signal = signal + self._curr_timing = timing + self._state = UcdState.CONFIGURED + return (signal, timing) != self._last_applied + + def set_pattern(self, pattern: PatternSpec) -> None: + with self._acquire_device_lock("set_pattern"): + # Phase 2 过渡:允许从 OPENED 直接 set_pattern——遗留路径 + # (test_runner 等)通过旧 controller.apply_signal_format 写入 + # 信号格式,未经过本设备的 configure。此时 self._state 仍为 + # OPENED,但硬件实际已处于可接收 pattern 状态。 + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 set_pattern") + self._curr_pattern = pattern + # 仅本地暂存,真正写硬件在 apply() + + def apply(self) -> None: + with self._acquire_device_lock("apply"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 apply") + if self._curr_pattern is None: + raise UcdStateError("apply 前必须先 set_pattern") + try: + ok = self._apply_pattern_via_controller(self._curr_pattern) + except Exception as exc: # noqa: BLE001 + raise UcdSdkError( + f"apply 异常: {type(exc).__name__}: {exc}" + ) from exc + if not ok: + raise UcdApplyFailed( + f"apply 失败: pattern={self._curr_pattern.kind.value}" + ) + + # SignalApplied 事件仅在通过新 API configure 过时发出; + # 遗留路径下 self._curr_signal/_curr_timing 可能为 None。 + if self._curr_signal is not None and self._curr_timing is not None: + changed = (self._curr_signal, self._curr_timing) != self._last_applied + self._last_applied = (self._curr_signal, self._curr_timing) + self._bus.publish( + SignalApplied(self._curr_signal, self._curr_timing, changed) + ) + self._state = UcdState.APPLIED + self._bus.publish(PatternApplied(self._curr_pattern)) + + # --- 查询 --- + + def current_resolution(self) -> tuple[int, int]: + try: + return self._sdk.get_current_resolution((3840, 2160)) + except Exception: # noqa: BLE001 + return (3840, 2160) + + def search_devices(self) -> list[str]: + try: + return self._sdk.search_device() or [] + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("枚举 UCD 设备失败") from exc + + @property + def format_changed(self) -> bool: + return bool(getattr(self._sdk, "format_changed", True)) + + @property + def last_error(self) -> str | None: + err = getattr(self._sdk, "last_error", None) + return str(err) if err else None + + # --- 测试 / GUI 遗留快捷 API --- + + def apply_signal_format( + self, + *, + color_space: str | None = None, + data_range: str | None = None, + bit_depth: str | None = None, + color_format: str | None = None, + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + with self._acquire_device_lock("apply_signal_format"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 apply_signal_format") + return bool( + self._sdk.apply_signal_format( + color_space=color_space, + data_range=data_range, + bit_depth=bit_depth, + color_format=color_format, + max_cll=max_cll, + max_fall=max_fall, + ) + ) + + def set_ucd_params(self, config) -> bool: + with self._acquire_device_lock("set_ucd_params"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 set_ucd_params") + return bool(self._sdk.set_ucd_params(config)) + + def send_current_pattern_params(self, pattern_params) -> bool: + with self._acquire_device_lock("send_current_pattern_params"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 send_current_pattern_params") + ok = bool(self._sdk.send_current_pattern_params(pattern_params)) + if ok: + self._state = UcdState.APPLIED + return ok + + def apply_config_and_run(self, config, pattern_params) -> bool: + with self._acquire_device_lock("apply_config_and_run"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 apply_config_and_run") + ctrl = self._sdk + if not ctrl.set_ucd_params(config): + return False + if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): + return False + ok = bool(ctrl.run()) + if ok: + self._state = UcdState.APPLIED + return ok + + # --- 内部 --- + + def _apply_dynamic_range(self, signal: SignalFormat) -> None: + ci = self._sdk.color_info + if ci is None: + return + if signal.dynamic_range is DynamicRange.FULL: + ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA + else: + ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA + + def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool: + """根据 PatternKind 走最合适的旧 controller 路径。""" + if pattern.kind is PatternKind.IMAGE: + if not pattern.image_path: + raise UcdConfigError("IMAGE pattern 必须提供 image_path") + return bool(self._sdk.send_image_pattern(pattern.image_path)) + + # 预定义图案路径:复用 controller.set_pattern + run() + + video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value) + if video_pattern is None: + raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}") + self._sdk.current_pattern = video_pattern + + params: list[int] | None = None + if pattern.kind is PatternKind.SOLID: + if pattern.solid_rgb is None: + raise UcdConfigError("SOLID pattern 必须提供 solid_rgb") + params = list(pattern.solid_rgb) + elif pattern.extras: + params = list(pattern.extras) + + if not self._sdk.set_pattern(video_pattern, params): + raise UcdApplyFailed("controller.set_pattern 返回 False") + # Skip apply_video_mode() (i.e. pg.set_vm) – the video format is already + # configured by the main signal panel and re-applying it blocks until the + # device re-locks, causing an apparent UI freeze for pattern-only sends. + if not self._sdk.apply_pattern(): + raise UcdApplyFailed("controller.apply_pattern 返回 False") + if getattr(self._sdk, "current_timing", None) is None: + raise UcdConfigError( + "current_timing is None; please apply selected test profile/timing before sending pattern" + ) + try: + pg, _ = self._sdk.get_tx_modules() + if not self._sdk._apply_pg_output(pg): + raise UcdApplyFailed("controller.apply_pg_output 返回 False") + except UcdApplyFailed: + raise + except Exception as exc: + raise UcdSdkError("pg.apply() 失败") from exc + return True + + +__all__ = [ + "DeviceInfo", + "list_devices", + "IUcdDevice", + "UCD323Device", +] \ No newline at end of file diff --git a/app/ucd_domain.py b/app/ucd/domain.py similarity index 74% rename from app/ucd_domain.py rename to app/ucd/domain.py index dd23ebd..232f4b6 100644 --- a/app/ucd_domain.py +++ b/app/ucd/domain.py @@ -1,17 +1,4 @@ -"""UCD 控制 Domain 层。 - -纯数据 + 纯函数:枚举、值对象、状态机、错误体系、事件总线、 -业务字符串解析与映射。本模块**不**依赖 UniTAP / 任何硬件; -可用纯单测覆盖。 - -文件分区: - §1 枚举与值对象 - §2 状态机 - §3 错误体系 - §4 事件总线 - §5 业务字符串解析 / 映射 -""" - +"""UCD 域层:类型、状态机、事件、字符串与 PQConfig 映射。""" from __future__ import annotations import logging @@ -25,6 +12,14 @@ log = logging.getLogger(__name__) # ─── §1 枚举与值对象 ────────────────────────────────────────────── + +class DeviceKind(str, Enum): + """连接状态事件所指的设备类型。""" + + UCD = "ucd" + CA = "ca" + + class Interface(str, Enum): """UCD 物理输出接口。""" @@ -192,6 +187,7 @@ class UcdEvent: @dataclass(frozen=True) class ConnectionChanged(UcdEvent): + device: DeviceKind connected: bool serial: str | None = None @@ -353,6 +349,7 @@ def parse_timing_str(timing_str: str) -> TimingSpec: __all__ = [ # §1 + "DeviceKind", "Interface", "ColorFormat", "Colorimetry", @@ -386,3 +383,104 @@ __all__ = [ "is_ycbcr", "parse_timing_str", ] + + +# --- PQConfig / pattern 映射 --- + +# PQ pattern_mode 字符串 → PatternKind(大小写不敏感) +_PQ_PATTERN_MODE_TO_KIND: dict[str, PatternKind] = { + "disabled": PatternKind.DISABLED, + "solidcolor": PatternKind.SOLID, + "solidwhite": PatternKind.SOLID_WHITE, + "solidred": PatternKind.SOLID_RED, + "solidgreen": PatternKind.SOLID_GREEN, + "solidblue": PatternKind.SOLID_BLUE, + "colorbars": PatternKind.COLOR_BARS, + "chessboard": PatternKind.CHESSBOARD, + "whitevstrips": PatternKind.WHITE_VSTRIPS, + "gradientrgbstripes": PatternKind.GRADIENT_RGB_STRIPES, + "colorramp": PatternKind.COLOR_RAMP, + "coloursquares": PatternKind.COLOR_SQUARES, + "motionpattern": PatternKind.MOTION, + "squarewindow": PatternKind.SQUARE_WINDOW, +} + + +def pattern_mode_to_kind(pattern_mode: str) -> PatternKind: + key = (pattern_mode or "solidcolor").strip().lower() + kind = _PQ_PATTERN_MODE_TO_KIND.get(key) + if kind is None: + raise UcdConfigError(f"不支持的 pattern_mode: {pattern_mode!r}") + return kind + + +def build_profile_from_config(config, test_type: str | None = None): + """从 PQConfig 当前 test_type 条目构建 SignalFormat + TimingSpec。""" + test_type = test_type or config.current_test_type + profile = config.current_test_types[test_type] + signal = build_signal_format_from_profile( + color_space=profile["colorimetry"], + color_format=profile["color_format"], + bpc=int(profile["bpc"]), + data_range=profile.get("data_range", "Full"), + ) + timing = build_timing(profile["timing"]) + return signal, timing + + +def build_pattern_spec(config, params: list[int] | None = None) -> PatternSpec: + """将 PQConfig 当前 pattern 与一组参数转为 :class:`PatternSpec`。""" + pattern_mode = config.current_pattern["pattern_mode"] + kind = pattern_mode_to_kind(pattern_mode) + if params is None: + params = config.current_pattern.get("pattern_params", [[]])[0] + if kind is PatternKind.SOLID and params and len(params) >= 3: + return PatternSpec( + kind=kind, + solid_rgb=(int(params[0]), int(params[1]), int(params[2])), + ) + if params: + return PatternSpec(kind=kind, extras=tuple(int(v) for v in params)) + return PatternSpec(kind=kind) + +def build_signal_format( + *, + color_space: str, + output_format: str, + bit_depth: str, + data_range: str = "Full", +) -> SignalFormat: + return SignalFormat( + color_format=output_format_to_color_format(output_format), + colorimetry=color_space_to_colorimetry(color_space), + bpc=bit_depth_str_to_bpc(bit_depth), + dynamic_range=data_range_to_dynamic_range(data_range), + ) + + +def build_signal_format_from_profile( + *, + color_space: str, + color_format: str, + bpc: int, + data_range: str = "Full", +) -> SignalFormat: + return build_signal_format( + color_space=color_space, + output_format=color_format, + bit_depth=f"{int(bpc)}bit", + data_range=data_range, + ) + + +def build_timing(timing_str: str) -> TimingSpec: + return parse_timing_str(timing_str) + + +def solid_rgb_pattern(rgb: tuple[int, int, int] | list[int]) -> PatternSpec: + r, g, b = rgb[0], rgb[1], rgb[2] + return PatternSpec(kind=PatternKind.SOLID, solid_rgb=(int(r), int(g), int(b))) + + +def image_pattern(path: str) -> PatternSpec: + return PatternSpec(kind=PatternKind.IMAGE, image_path=path) diff --git a/drivers/UCD323_Enum.py b/app/ucd/enum.py similarity index 99% rename from drivers/UCD323_Enum.py rename to app/ucd/enum.py index 18d17bb..22da7a0 100644 --- a/drivers/UCD323_Enum.py +++ b/app/ucd/enum.py @@ -1,3 +1,5 @@ +"""UCD SDK 枚举与 UI/SDK 字符串映射。""" + from enum import IntEnum import UniTAP @@ -620,3 +622,4 @@ class UCDEnum: "DSC": "dsc", } return fmt_map.get(format_str, "rgb") + diff --git a/app/services/pattern_service.py b/app/ucd/service.py similarity index 50% rename from app/services/pattern_service.py rename to app/ucd/service.py index 9f80634..3c3924e 100644 --- a/app/services/pattern_service.py +++ b/app/ucd/service.py @@ -1,5 +1,179 @@ +"""UCD 服务层:SignalService(硬件编排)+ PatternService(测试发图)。""" from __future__ import annotations +import logging + +from app.ucd.domain import ( + EventBus, + PatternSpec, + SignalFormat, + TimingSpec, + UcdState, + image_pattern, + solid_rgb_pattern, +) +from app.ucd.device import IUcdDevice + +log = logging.getLogger(__name__) + + +# ─── SignalService ──────────────────────────────────────────────────────── + + +class SignalService: + """协调 SignalFormat / Timing / Pattern 的写入与提交。""" + + def __init__(self, device: IUcdDevice, bus: EventBus): + self._dev = device + self._bus = bus + + # -- 高层接口 ------------------------------------------------ + + def apply( + self, + *, + signal: SignalFormat, + timing: TimingSpec, + pattern: PatternSpec, + ) -> bool: + """一次性提交信号格式 + timing + 图案。 + + Returns: + ``format_changed``——本次相对上一次 :meth:`apply` 是否变化。 + """ + log.info( + "SignalService.apply signal=%s timing=%s pattern=%s", + signal, + timing, + pattern.kind.value, + ) + changed = self._dev.configure(signal, timing) + self._dev.set_pattern(pattern) + self._dev.apply() + return changed + + def send_pattern(self, pattern: PatternSpec) -> None: + """在已 configure 的信号上仅更新图案后 apply。""" + log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) + self._dev.set_pattern(pattern) + self._dev.apply() + + def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None: + self.send_pattern(solid_rgb_pattern(rgb)) + + def send_image(self, path: str) -> None: + self.send_pattern(image_pattern(path)) + + def update_signal_format( + self, + *, + color_space: str, + output_format: str, + bit_depth: str, + data_range: str = "Full", + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + """仅将信号格式提交到 SDK(沿用上一次的 timing),不切换图案。 + + UI 字符串先经域层解析做参数校验;解析失败抛 :class:`UcdConfigError`。 + """ + _ = build_signal_format( + color_space=color_space, + output_format=output_format, + bit_depth=bit_depth, + data_range=data_range, + ) + return self._dev.apply_signal_format( + color_space=color_space, + color_format=output_format, + bit_depth=bit_depth, + data_range=data_range, + max_cll=max_cll, + max_fall=max_fall, + ) + + # -- 透传给上层的查询 --------------------------------------- + + @property + def device(self) -> IUcdDevice: + return self._dev + + def current_resolution(self) -> tuple[int, int]: + return self._dev.current_resolution() + + @property + def is_connected(self) -> bool: + """UCD 设备是否已打开。供 GUI 做前置校验。""" + return self._dev.state != UcdState.CLOSED + + @property + def format_changed(self) -> bool: + """最近一次视频模式提交是否相对上次发生变化。""" + return self._dev.format_changed + + @property + def last_error(self) -> str | None: + return self._dev.last_error + + def apply_config(self, config) -> bool: + """按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern(不 apply 输出)。""" + return bool(self._dev.set_ucd_params(config)) + + def send_pattern_params(self, params) -> bool: + """以 ``params`` 更新当前 pattern 的参数并 apply。""" + return bool(self._dev.send_current_pattern_params(params)) + + def apply_and_run(self, config, pattern_params) -> bool: + """``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。""" + return bool(self._dev.apply_config_and_run(config, pattern_params)) + + def stage_test_profile( + self, + config, + *, + color_space: str, + output_format: str, + bit_depth: str, + data_range: str = "Full", + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + """按 PQConfig stage 色彩/Timing/Pattern 类型,并提交 UI 覆盖的信号格式。 + + 自动化测试在发图前调用;等价于 ``apply_config`` + ``update_signal_format``。 + """ + if not self.apply_config(config): + return False + return self.update_signal_format( + color_space=color_space, + output_format=output_format, + bit_depth=bit_depth, + data_range=data_range, + max_cll=max_cll, + max_fall=max_fall, + ) + + +__all__ = [ + "SignalService", + "build_signal_format", + "build_signal_format_from_profile", + "build_timing", + "solid_rgb_pattern", + "image_pattern", + "SignalFormat", + "TimingSpec", + "PatternSpec", + "PatternKind", + "Colorimetry", + "DynamicRange", + "UcdError", +] + + +# --- PatternService --- + import copy from dataclasses import dataclass @@ -29,6 +203,42 @@ class PatternService: detail = f", detail={err}" return f"UCD profile apply_config failed for {test_type}, timing={timing}{detail}" + def _stage_profile( + self, + active_config, + *, + color_space: str, + output_format: str, + bit_depth: str, + data_range: str = "Full", + max_cll: int | None = None, + max_fall: int | None = None, + test_type: str, + log_details: bool = False, + log_title: str = "", + log_fields: list[tuple[str, str]] | None = None, + ) -> None: + if log_details and log_title: + self._log("=" * 50, "separator") + self._log(log_title, "info") + self._log("=" * 50, "separator") + for label, value in log_fields or []: + self._log(f" {label}: {value}", "info") + + if not self.app.signal_service.stage_test_profile( + active_config, + color_space=color_space, + output_format=output_format, + bit_depth=bit_depth, + data_range=data_range, + max_cll=max_cll, + max_fall=max_fall, + ): + raise RuntimeError(self._build_apply_config_error(test_type)) + + if log_details: + self._log("信号格式设置成功", "success") + def prepare_session(self, mode, *, test_type=None, log_details=False): test_type = test_type or self.app.config.current_test_type if hasattr(self.app.config, "set_current_test_type"): @@ -62,46 +272,26 @@ class PatternService: else screen_cfg.get("color_format", "RGB") ) - if log_details: - self._log("=" * 50, "separator") - self._log("设置屏模组信号格式:", "info") - self._log("=" * 50, "separator") - for label, value in [ + self._stage_profile( + active_config, + color_space=color_space, + data_range=data_range, + bit_depth=bit_depth, + output_format=output_format, + test_type=test_type, + log_details=log_details, + log_title="设置屏模组信号格式:", + log_fields=[ ("色彩空间", color_space), ("色彩格式", output_format), ("数据范围", data_range), ("编码位深", bit_depth), ("Timing", self.app.config.current_test_types[test_type]["timing"]), - ]: - self._log(f" {label}: {value}", "info") - if not self.app.signal_service.apply_config(active_config): - raise RuntimeError(self._build_apply_config_error(test_type)) - success = self.app.signal_service.update_signal_format( - color_space=color_space, - data_range=data_range, - bit_depth=bit_depth, - output_format=output_format, + ], ) - if log_details: - self._log( - f"屏模组信号格式设置{'成功' if success else '失败'}", - "success" if success else "error", - ) elif test_type == "sdr_movie": data_range = self.app.sdr_data_range_var.get() - if log_details: - self._log("=" * 50, "separator") - self._log("设置 SDR 信号格式:", "info") - self._log("=" * 50, "separator") - for label, value in [ - ("色彩空间", self.app.sdr_color_space_var.get()), - ("色彩格式", self.app.sdr_output_format_var.get()), - ("Gamma", self.app.sdr_gamma_type_var.get()), - ("数据范围", data_range), - ("编码位深", self.app.sdr_bit_depth_var.get()), - ]: - self._log(f" {label}: {value}", "info") converted_params = convert_pattern_params( source_params, data_range=data_range, verbose=False ) @@ -110,33 +300,29 @@ class PatternService: ) if hasattr(active_config, "set_current_test_type"): active_config.set_current_test_type(test_type) - if not self.app.signal_service.apply_config(active_config): - raise RuntimeError(self._build_apply_config_error(test_type)) - success = self.app.signal_service.update_signal_format( + + self._stage_profile( + active_config, color_space=self.app.sdr_color_space_var.get(), data_range=data_range, bit_depth=self.app.sdr_bit_depth_var.get(), output_format=self.app.sdr_output_format_var.get(), + test_type=test_type, + log_details=log_details, + log_title="设置 SDR 信号格式:", + log_fields=[ + ("色彩空间", self.app.sdr_color_space_var.get()), + ("色彩格式", self.app.sdr_output_format_var.get()), + ("Gamma", self.app.sdr_gamma_type_var.get()), + ("数据范围", data_range), + ("编码位深", self.app.sdr_bit_depth_var.get()), + ], ) if log_details: - self._log(f"SDR 信号格式设置{'成功' if success else '失败'}", "success" if success else "error") self._log(f"图案参数已设置,共 {len(converted_params)} 个图案", "success") elif test_type == "hdr_movie": data_range = self.app.hdr_data_range_var.get() - if log_details: - self._log("=" * 50, "separator") - self._log("设置 HDR 信号格式:", "info") - self._log("=" * 50, "separator") - for label, value in [ - ("色彩空间", self.app.hdr_color_space_var.get()), - ("色彩格式", self.app.hdr_output_format_var.get()), - ("数据范围", data_range), - ("编码位深", self.app.hdr_bit_depth_var.get()), - ("MaxCLL", self.app.hdr_maxcll_var.get()), - ("MaxFALL", self.app.hdr_maxfall_var.get()), - ]: - self._log(f" {label}: {value}", "info") converted_params = convert_pattern_params( source_params, data_range=data_range, verbose=False ) @@ -145,18 +331,28 @@ class PatternService: ) if hasattr(active_config, "set_current_test_type"): active_config.set_current_test_type(test_type) - if not self.app.signal_service.apply_config(active_config): - raise RuntimeError(self._build_apply_config_error(test_type)) - success = self.app.signal_service.update_signal_format( + + self._stage_profile( + active_config, color_space=self.app.hdr_color_space_var.get(), data_range=data_range, bit_depth=self.app.hdr_bit_depth_var.get(), output_format=self.app.hdr_output_format_var.get(), max_cll=self.app.hdr_maxcll_var.get(), max_fall=self.app.hdr_maxfall_var.get(), + test_type=test_type, + log_details=log_details, + log_title="设置 HDR 信号格式:", + log_fields=[ + ("色彩空间", self.app.hdr_color_space_var.get()), + ("色彩格式", self.app.hdr_output_format_var.get()), + ("数据范围", data_range), + ("编码位深", self.app.hdr_bit_depth_var.get()), + ("MaxCLL", self.app.hdr_maxcll_var.get()), + ("MaxFALL", self.app.hdr_maxfall_var.get()), + ], ) if log_details: - self._log(f"HDR 信号格式设置{'成功' if success else '失败'}", "success" if success else "error") self._log(f"图案参数已设置,共 {len(converted_params)} 个图案", "success") else: @@ -213,4 +409,4 @@ class PatternService: def _log(self, message, level): if hasattr(self.app, "log_gui"): - self.app.log_gui.log(message, level=level) \ No newline at end of file + self.app.log_gui.log(message, level=level) diff --git a/app/views/panels/main_layout.py b/app/views/panels/main_layout.py index 3cb4222..4de66fb 100644 --- a/app/views/panels/main_layout.py +++ b/app/views/panels/main_layout.py @@ -4,7 +4,7 @@ import re import tkinter as tk import ttkbootstrap as ttk -from drivers.UCD323_Enum import UCDEnum +from app.ucd import UCDEnum from app.views.collapsing_frame import CollapsingFrame from app.resources import load_icon diff --git a/drivers/UCD323_Function.py b/drivers/UCD323_Function.py deleted file mode 100644 index 2bb7331..0000000 --- a/drivers/UCD323_Function.py +++ /dev/null @@ -1,757 +0,0 @@ -# -*- coding: UTF-8 -*- -import logging -import UniTAP -import time -import gc -from drivers.UCD323_Enum import UCDEnum - - -log = logging.getLogger(__name__) - - -class UCDController: - """UCD323信号发生器控制类""" - - def __init__(self): - self.lUniTAP = UniTAP.TsiLib() - self.dev = None - self.role = None - self.timing_manager = None - self.config = None - self.color_info = None - self.status = False - self.current_interface = "HDMI" - - self.current_timing = None - self.current_pattern = None - self.current_pattern_param = None - self.current_pattern_params = None - self.current_pattern_index = 0 - self.last_error = None - - def search_device(self): - """搜索可用设备""" - available_devices = self.lUniTAP.get_list_of_available_devices() - return available_devices if available_devices else [] - - def open(self, device_name): - """打开设备""" - temp_dev = None - - try: - if self.dev is not None or self.status: - self._force_cleanup() - - device_id = int(device_name.split(":")[0]) - temp_dev = self.lUniTAP.open(device_id) - - try: - self.role = temp_dev.select_role(UniTAP.dev.UCD323.HDMISource) - self.dev = temp_dev - self.current_interface = "HDMI" - - except Exception as role_error: - self._close_device_object(temp_dev) - raise role_error - - pg, ag = self.get_tx_modules() - self.timing_manager = pg.timing_manager - self.color_info = UniTAP.ColorInfo() - self._stop_audio_output(ag) - self.status = True - - return True - - except Exception as e: - self._force_cleanup() - return False - - def _reset_state(self): - """重置所有运行时状态(不关闭设备句柄)""" - self.dev = None - self.role = None - self.status = False - self.timing_manager = None - self.current_timing = None - self.current_pattern = None - self.current_pattern_param = None - self.current_pattern_params = None - self.current_pattern_index = 0 - self.current_interface = "HDMI" - - def close(self): - """关闭设备""" - try: - if self.dev: - try: - self._stop_audio_output() - except Exception: - pass - self._close_device_object(self.dev) - - self._reset_state() - self.lUniTAP = None - - for i in range(3): - gc.collect() - - time.sleep(2.0) - - self.lUniTAP = UniTAP.TsiLib() - - return True - - except Exception as e: - self._reset_state() - - try: - self.lUniTAP = None - gc.collect() - time.sleep(2.0) - self.lUniTAP = UniTAP.TsiLib() - except Exception as init_error: - pass - - return False - - def _close_device_object(self, dev_obj): - """显式关闭设备对象""" - try: - if dev_obj is None: - return - - if self.lUniTAP and hasattr(self.lUniTAP, "close"): - try: - self.lUniTAP.close(dev_obj) - except Exception as e: - pass - - dev_obj = None - gc.collect() - time.sleep(1.0) - - except Exception as e: - pass - - def _force_cleanup(self): - """强制清理所有状态""" - try: - if self.dev: - self._close_device_object(self.dev) - - self._reset_state() - - except Exception as e: - pass - - def get_tx_modules(self): - """根据当前接口返回 (pg, ag) 模块。""" - if not self.role: - raise RuntimeError("UCD 未打开,无法获取 TX 模块") - - interface = getattr(self, "current_interface", None) - log.info("UCDController.get_tx_modules interface=%s", interface) - if interface in (None, "HDMI"): - return self.role.hdtx.pg, self.role.hdtx.ag - if interface in ("DP", "Type-C"): - return self.role.dptx.pg, self.role.dptx.ag - raise ValueError(f"不支持的接口类型: {interface}") - - def _stop_audio_output(self, ag=None) -> None: - """关闭 HDMI/DP 音频发生器。PQ 测试仅需视频图案,避免电视持续输出测试音。""" - if not self.status or not self.role: - return - try: - if ag is None: - _, ag = self.get_tx_modules() - ag.stop_generate() - log.info("UCDController._stop_audio_output done") - except Exception: - log.exception("UCDController._stop_audio_output failed") - - def _apply_pg_output(self, pg) -> bool: - """提交 PG 输出,并确保音频发生器处于关闭状态。""" - try: - ok = bool(pg.apply()) - except Exception: - log.exception("UCDController._apply_pg_output pg.apply failed") - return False - self._stop_audio_output() - return ok - - def _resolve_timing(self, pg=None): - """优先从 current_timing 读取 timing,必要时回退到 TX 模块。""" - if self.current_timing is not None: - return self.current_timing - - if pg is not None: - get_vm = getattr(pg, "get_vm", None) - if callable(get_vm): - try: - vm = get_vm() - return getattr(vm, "timing", None) - except Exception: - pass - - return None - - def get_current_resolution(self, default=(3840, 2160)): - """从当前 timing 获取 (width, height),失败时返回 default。""" - try: - pg, _ = self.get_tx_modules() - timing = self._resolve_timing(pg) - if timing and hasattr(timing, "h_active") and hasattr(timing, "v_active"): - return timing.h_active, timing.v_active - except Exception: - pass - - return default - - def _cleanup(self): - """清理设备资源""" - try: - if self.dev: - self._close_device_object(self.dev) - self.dev = None - - if hasattr(self.lUniTAP, "cleanup"): - self.lUniTAP.cleanup() - - except Exception as e: - pass - - def set_ucd_params(self, config): - """设置UCD323参数""" - self.last_error = None - self.config = config - test_type = self.config.current_test_type - - pg, _ = self.get_tx_modules() - self.timing_manager = pg.timing_manager - - color_format = self.config.current_test_types[test_type]["color_format"] - bpc = self.config.current_test_types[test_type]["bpc"] - colorimetry = self.config.current_test_types[test_type]["colorimetry"] - - if not self.set_color_mode(color_format, bpc, colorimetry): - self.last_error = ( - f"set_color_mode failed: color_format={color_format}, bpc={bpc}, colorimetry={colorimetry}" - ) - log.error( - "UCDController.set_ucd_params set_color_mode failed test_type=%s color_format=%s bpc=%s colorimetry=%s", - test_type, - color_format, - bpc, - colorimetry, - ) - return False - - timing_str = self.config.current_test_types[test_type]["timing"] - if not self.set_timing_from_string(timing_str): - self.last_error = f"set_timing_from_string failed: timing={timing_str}" - log.error( - "UCDController.set_ucd_params set_timing_from_string failed test_type=%s timing=%s", - test_type, - timing_str, - ) - return False - - self.current_pattern_index = 0 - pattern_mode = self.config.current_pattern["pattern_mode"] - pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode) - - if pattern is None: - self.last_error = f"get_video_pattern failed: pattern_mode={pattern_mode}" - return False - - self.current_pattern = pattern - self.current_pattern_params = self.config.current_pattern["pattern_params"] - - return True - - def run(self): - """运行设备""" - log.info( - "UCDController.run start current_pattern=%s has_pattern_param=%s", - getattr(self.current_pattern, "name", self.current_pattern), - self.current_pattern_param is not None, - ) - self.apply_video_mode() - self.apply_pattern() - pg, _ = self.get_tx_modules() - log.info("UCDController.run calling pg.apply()") - ok = self._apply_pg_output(pg) - log.info("UCDController.run done ok=%s", ok) - return ok - - def send_image_pattern(self, image_path): - """发送图片 Pattern(依赖当前 timing/color_info 状态)。""" - if not self.status or not self.role: - return False - - try: - pg, _ = self.get_tx_modules() - # 仅切换图案,不重复 set_vm;重复 apply video mode 会触发电视 HDMI 重锁发声。 - if getattr(self, "_last_sent_config", None) is None: - self.apply_video_mode() - pg.set_pattern(pattern=image_path) - return self._apply_pg_output(pg) - except Exception: - return False - - def send_solid_rgb_pattern(self, rgb): - """发送纯色 RGB Pattern(依赖当前 timing/color_info 状态)。""" - if not self.status or not self.role: - return False - - try: - log.info("UCDController.send_solid_rgb_pattern rgb=%s", rgb) - self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor") - if self.current_pattern is None: - log.error("UCDController.send_solid_rgb_pattern failed: solidcolor pattern not found") - return False - - return self.send_current_pattern_params(list(rgb)) - except Exception: - log.exception("UCDController.send_solid_rgb_pattern exception") - return False - - def send_current_pattern_params(self, pattern_params): - """发送当前已配置的 pattern,并可附带当前 pattern 参数。""" - if not self.status or not self.role: - return False - - try: - if self.current_pattern is None: - log.error("UCDController.send_current_pattern_params failed: current_pattern is None") - return False - - log.info( - "UCDController.send_current_pattern_params pattern=%s params=%s", - getattr(self.current_pattern, "name", self.current_pattern), - pattern_params, - ) - if pattern_params is not None and not self.set_pattern( - self.current_pattern, - pattern_params, - ): - log.error("UCDController.send_current_pattern_params failed: set_pattern returned False") - return False - - log.info("UCDController.send_current_pattern_params calling run()") - self.run() - log.info("UCDController.send_current_pattern_params done") - return True - except Exception: - log.exception("UCDController.send_current_pattern_params exception") - return False - - def set_color_mode(self, cf, bpc, cm): - """设置颜色模式""" - current_dynamic_range = self.color_info.dynamic_range - - color_format = UCDEnum.ColorInfo.get_color_format(cf) - if color_format is None: - fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(cf) - color_format = UCDEnum.ColorInfo.get_color_format(fmt_key) - if color_format is None: - return False - - if not isinstance(bpc, int) or bpc <= 0: - return False - - colorimetry = UCDEnum.ColorInfo.get_colorimetry(cm) - if colorimetry is None: - return False - - self.color_info.color_format = color_format - self.color_info.bpc = bpc - self.color_info.colorimetry = colorimetry - self.color_info.dynamic_range = current_dynamic_range - - return True - - def apply_video_mode(self): - """应用当前 color_info 和 timing""" - if self.current_timing: - log.info("UCDController.apply_video_mode start timing=%s", self.current_timing) - self.set_video_mode() - log.info("UCDController.apply_video_mode done") - return True - log.warning("UCDController.apply_video_mode skipped: current_timing is None") - return False - - def set_video_mode(self): - """设置视频模式""" - # 对比上次发出的配置,判断是否会触发电视重新锁定信号 - current_config = ( - self.current_timing, - self.color_info.color_format, - self.color_info.colorimetry, - self.color_info.dynamic_range, - self.color_info.bpc, - ) - self.format_changed = (current_config != getattr(self, "_last_sent_config", None)) - log.info( - "UCDController.set_video_mode format_changed=%s color_format=%s colorimetry=%s dynamic_range=%s bpc=%s", - self.format_changed, - self.color_info.color_format, - self.color_info.colorimetry, - self.color_info.dynamic_range, - self.color_info.bpc, - ) - if not self.format_changed: - log.info("UCDController.set_video_mode skipped pg.set_vm(): config unchanged") - return True - - video_mode = UniTAP.VideoMode( - timing=self.current_timing, color_info=self.color_info - ) - pg, _ = self.get_tx_modules() - log.info("UCDController.set_video_mode calling pg.set_vm()") - pg.set_vm(vm=video_mode) - self._stop_audio_output() - log.info("UCDController.set_video_mode done") - self._last_sent_config = current_config - return True - - def set_pattern(self, pattern, pattern_params=None): - """设置pattern""" - if self.current_timing is None: - # Pattern-only updates (e.g. Calman patch click) can still be applied on - # an already active output mode. Missing timing should not block pattern staging. - log.warning("UCDController.set_pattern current_timing is None; continue with pattern-only apply") - - needs_params = { - UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, - UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, - UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips, - UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes, - UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern, - UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow, - } - log.info( - "UCDController.set_pattern pattern=%s pattern_params=%s needs_params=%s", - getattr(pattern, "name", pattern), - pattern_params, - pattern in needs_params, - ) - if pattern in needs_params and pattern_params is not None: - self.set_pattern_params(pattern, pattern_params) - return True - - def set_next_pattern(self): - """设置下一个pattern""" - if self.current_pattern_index < len(self.current_pattern_params): - p = self.current_pattern_params[self.current_pattern_index] - self.set_pattern(self.current_pattern, p) - self.current_pattern_index += 1 - else: - error_msg = ( - f"No more patterns to set. (已设置 {self.current_pattern_index} 个图案)" - ) - raise IndexError(error_msg) - - def set_pattern_params(self, pattern, pattern_params): - """设置pattern参数""" - if pattern is not None: - solid_color_patterns = { - UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, - UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, - } - if pattern in solid_color_patterns: - log.info("UCDController.set_pattern_params solid_color rgb=%s", pattern_params) - self.current_pattern_param = UniTAP.SolidColorParams( - first=pattern_params[0], - second=pattern_params[1], - third=pattern_params[2], - ) - return True - log.warning("UCDController.set_pattern_params unsupported pattern=%s", getattr(pattern, "name", pattern)) - return False - - def apply_pattern(self): - """应用当前pattern""" - if self.current_pattern is not None: - log.info( - "UCDController.apply_pattern start pattern=%s has_params=%s", - getattr(self.current_pattern, "name", self.current_pattern), - self.current_pattern_param is not None, - ) - pg, _ = self.get_tx_modules() - log.info("UCDController.apply_pattern calling pg.set_pattern()") - pg.set_pattern(self.current_pattern) - - if self.current_pattern_param is not None: - log.info("UCDController.apply_pattern calling pg.set_pattern_params()") - pg.set_pattern_params(self.current_pattern_param) - log.info("UCDController.apply_pattern done") - return True - log.warning("UCDController.apply_pattern skipped: current_pattern is None") - return False - - def search_timing(self, width, height, refresh_rate, resolution_type=None): - """根据分辨率参数搜索合适的timing""" - if resolution_type: - resolution_type = resolution_type.lower() - standard = None - if resolution_type == "dmt": - standard = UniTAP.common.timing.Timing.Standard.SD_DMT - elif resolution_type == "cta": - standard = UniTAP.common.timing.Timing.Standard.SD_CTA - elif resolution_type == "cvt": - standard = UniTAP.common.timing.Timing.Standard.SD_CVT - - rr = float(refresh_rate) - # Try both exact and NTSC-compatible rates (e.g. 120000 / 119880). - f_rate_candidates = [ - int(round(rr * 1000)), - int(rr * 1000), - int(round((rr * 1000.0) * 1000.0 / 1001.0)), - ] - # 去重并保持顺序 - f_rate_candidates = list(dict.fromkeys(f_rate_candidates)) - - standards = [standard] - if standard is not None: - standards.append(None) - - for std in standards: - for f_rate in f_rate_candidates: - timing = self.timing_manager.search( - h_active=width, - v_active=height, - f_rate=f_rate, - standard=std, - ) - if timing: - return timing - else: - for res_type in ["dmt", "cta", "cvt", "ovt"]: - result = self.search_timing(width, height, refresh_rate, res_type) - if result: - return result - - return None - - def parse_formatted_timing(self, timing_str): - """解析格式化的timing字符串""" - if not isinstance(timing_str, str): - raise ValueError("timing_str 必须是字符串") - - s = " ".join(timing_str.strip().split()) - s = s.replace(" x", "x").replace("x ", "x") - - parts = s.split(" ", 1) - if len(parts) < 2: - raise ValueError(f"无法解析timing: {timing_str}") - type_str = parts[0].strip().upper() - rest = parts[1].strip() - - if "@" not in rest: - raise ValueError(f"无法解析timing(缺少 @): {timing_str}") - left, right = [p.strip() for p in rest.split("@", 1)] - - if "x" not in left: - raise ValueError(f"无法解析分辨率(缺少 x): {timing_str}") - wh = left.split("x") - if len(wh) != 2: - raise ValueError(f"无法解析分辨率: {timing_str}") - try: - width = int(wh[0]) - height = int(wh[1]) - except Exception: - raise ValueError(f"分辨率数字解析失败: {timing_str}") - - hz_str = right.replace("Hz", "").replace("HZ", "").strip() - try: - refresh_rate = float(hz_str) - except Exception: - raise ValueError(f"刷新率解析失败: {timing_str}") - - rtype_map = { - "DMT": "dmt", - "CTA": "cta", - "CVT": "cvt", - "OVT": "ovt", - } - if type_str not in rtype_map: - raise ValueError(f"未知的分辨率类型: {type_str}") - resolution_type = rtype_map[type_str] - - def find_best_id_in_dict(res_map): - best_id, best_diff = None, float("inf") - for rid, info in res_map.items(): - if info["width"] == width and info["height"] == height: - diff = abs(float(info["refresh_rate"]) - refresh_rate) - if diff < best_diff: - best_diff = diff - best_id = rid - return best_id if best_diff <= 1.0 else None - - def find_best_id_in_list_map(res_map): - best_id, best_diff = None, float("inf") - for rid, infos in res_map.items(): - for info in infos: - if info["width"] == width and info["height"] == height: - diff = abs(float(info["refresh_rate"]) - refresh_rate) - if diff < best_diff: - best_diff = diff - best_id = rid - return best_id if best_diff <= 1.0 else None - - resolution_id = None - if resolution_type == "dmt": - resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.dmt_resolution_map) - elif resolution_type == "cta": - resolution_id = find_best_id_in_dict(UCDEnum.TimingInfo.cta_resolution_map) - elif resolution_type == "cvt": - resolution_id = find_best_id_in_list_map( - UCDEnum.TimingInfo.cvt_resolution_map - ) - elif resolution_type == "ovt": - resolution_id = find_best_id_in_list_map( - UCDEnum.TimingInfo.ovt_resolution_map - ) - - result = { - "resolution_type": resolution_type, - "width": width, - "height": height, - "refresh_rate": refresh_rate, - "resolution_id": resolution_id, - } - - return result - - def set_timing_from_string(self, timing_str): - """根据格式化timing字符串设置设备timing""" - try: - spec = self.parse_formatted_timing(timing_str) - except Exception: - log.exception("UCDController.set_timing_from_string parse failed timing=%s", timing_str) - return False - - rtype = spec["resolution_type"] - rid = spec.get("resolution_id") - width = spec["width"] - height = spec["height"] - fr = spec["refresh_rate"] - - if rid is not None and self.set_timing_from_id(rtype, rid): - log.info( - "UCDController.set_timing_from_string success by id timing=%s parsed=(%s id=%s)", - timing_str, - rtype, - rid, - ) - return True - - # Respect selected timing family first (DMT/CTA/CVT/OVT). - timing = self.search_timing(width, height, fr, rtype) - if timing is None: - # Fallback only for robustness: some SDKs may not classify a timing - # exactly as requested family even though width/height/fps matches. - timing = self.search_timing(width, height, fr, None) - - if timing: - self.current_timing = timing - log.info( - "UCDController.set_timing_from_string success timing=%s parsed=(%s %sx%s@%s)", - timing_str, - rtype, - width, - height, - fr, - ) - return True - - log.error( - "UCDController.set_timing_from_string no timing matched timing=%s parsed=(%s %sx%s@%s)", - timing_str, - rtype, - width, - height, - fr, - ) - return False - - def set_timing_from_id(self, rtype, rid): - """根据(type, id)设置设备timing""" - timing = None - if rtype.lower() == "dmt": - timing = self.timing_manager.get_dmt(rid) - elif rtype.lower() == "cta": - timing = self.timing_manager.get_cta(rid) - elif rtype.lower() == "cvt": - timing = self.timing_manager.get_cvt(rid) - elif rtype.lower() == "ovt": - get_ovt = getattr(self.timing_manager, "get_ovt", None) - if callable(get_ovt): - timing = get_ovt(rid) - else: - return False - else: - raise ValueError(f"不支持的分辨率类型: {rtype}") - - if timing: - self.current_timing = timing - return True - else: - return False - - def apply_signal_format( - self, color_space=None, data_range=None, bit_depth=None, color_format=None, **_ - ): - """统一设置信号格式(color_format / colorimetry / dynamic_range / bpc)。 - 注:Gamma/EOTF 传输特性在 ColorInfo API 中不存在; - max_cll / max_fall 暂无对应 SDK 接口,通过 **_ 接收后忽略。 - """ - try: - if color_format: - fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(color_format) - cf = UCDEnum.ColorInfo.get_color_format(fmt_key) - if cf is not None: - self.color_info.color_format = cf - - if color_space: - colorimetry = self._get_colorimetry_from_color_space(color_space, color_format) - if colorimetry: - self.color_info.colorimetry = colorimetry - - if data_range: - if data_range == "Full": - self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA - elif data_range == "Limited": - self.color_info.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA - - if bit_depth: - bpc = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth) - self.color_info.bpc = bpc - - if self.current_timing: - self.set_video_mode() - - return True - - except Exception: - return False - - def _get_colorimetry_from_color_space(self, color_space, color_format=None): - """将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry。 - BT.2020 在 YCbCr 输出时使用 CM_ITUR_BT2020_YCbCr,RGB 输出时使用 CM_ITUR_BT2020_RGB。 - """ - is_ycbcr = UCDEnum.SignalFormat.OutputFormat.is_ycbcr(color_format) - bt2020_cm = ( - UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_YCbCr - if is_ycbcr - else UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT2020_RGB - ) - colorimetry_map = { - "sRGB": UniTAP.ColorInfo.Colorimetry.CM_sRGB, - "BT.709": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT709, - "BT.601": UniTAP.ColorInfo.Colorimetry.CM_ITUR_BT601, - "BT.2020": bt2020_cm, - "DCI-P3": UniTAP.ColorInfo.Colorimetry.CM_DCI_P3, - } - return colorimetry_map.get(color_space) diff --git a/drivers/ucd_driver.py b/drivers/ucd_driver.py deleted file mode 100644 index de04318..0000000 --- a/drivers/ucd_driver.py +++ /dev/null @@ -1,639 +0,0 @@ -"""UCD 驱动层。 - -唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口,以及实现: -:class:`UCD323Device`(生产)和 :class:`FakeUcdDevice`(单测)。 - -实现策略 --------- -:class:`UCD323Device` 对外暴露完整的 :class:`IUcdDevice` 接口;SDK 调用 -当前仍委托给 :class:`drivers.UCD323_Function.UCDController`。 -上层(Service / GUI)**不得**直接访问 ``UCDController``。 -后续可将 SDK 调用逐步迁入本模块并删除旧文件。 - -文件分区: - §1 DeviceInfo / list_devices - §2 IUcdDevice 抽象接口 - §3 UCD323Device 真实实现 - §4 FakeUcdDevice 单测实现 -""" - -from __future__ import annotations - -from contextlib import contextmanager -import logging -import threading -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import TYPE_CHECKING - -from app.ucd_domain import ( - ConnectionChanged, - EventBus, - Interface, - PatternApplied, - PatternKind, - PatternSpec, - SignalApplied, - SignalFormat, - TimingSpec, - UcdApplyFailed, - UcdConfigError, - UcdNotConnected, - UcdSdkError, - UcdState, - UcdStateError, - assert_transition, -) - -if TYPE_CHECKING: - from drivers.UCD323_Function import UCDController - -log = logging.getLogger(__name__) - -_DEVICE_LOCK_TIMEOUT_SECONDS = 8.0 - - -# ─── §1 DeviceInfo / list_devices ──────────────────────────────── - - -@dataclass(frozen=True) -class DeviceInfo: - """UCD 设备发现条目。 - - ``display`` 是 SDK 给出的完整字符串(``"0: UCD-323 #12345678"``); - ``index`` / ``serial`` / ``model`` 通过解析得到,解析失败时为 None。 - """ - - display: str - index: int | None = None - serial: str | None = None - model: str | None = None - - @classmethod - def parse(cls, display: str) -> "DeviceInfo": - idx: int | None = None - model: str | None = None - serial: str | None = None - try: - head, rest = display.split(":", 1) - idx = int(head.strip()) - rest = rest.strip() - # 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)" - tokens = rest.split() - if tokens: - model = tokens[0] - for tok in tokens[1:]: - if tok.startswith("#") and len(tok) >= 2: - serial = tok.lstrip("#") - break - except Exception: # noqa: BLE001 - 解析失败保留原 display 即可 - pass - return cls(display=display, index=idx, serial=serial, model=model) - - -def list_devices(controller: "UCDController") -> list[DeviceInfo]: - """通过给定的底层 controller 枚举可用 UCD 设备。""" - try: - raw_list = controller.search_device() - except Exception as exc: # noqa: BLE001 - raise UcdSdkError("枚举 UCD 设备失败") from exc - return [DeviceInfo.parse(s) for s in (raw_list or [])] - - -# ─── §2 IUcdDevice 抽象接口 ────────────────────────────────────── - - -class IUcdDevice(ABC): - """UCD 信号发生器抽象设备。 - - 上层(Service / GUI)**只**通过本接口操作硬件,不得穿透到 - UniTAP SDK 或具体实现细节。 - """ - - @property - @abstractmethod - def state(self) -> UcdState: ... - - @property - @abstractmethod - def info(self) -> DeviceInfo | None: ... - - @abstractmethod - def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: - """打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。""" - - @abstractmethod - def close(self) -> None: - """关闭设备(幂等)。""" - - @abstractmethod - def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: - """写入信号格式与 timing(未 apply)。返回 ``format_changed``。""" - - @abstractmethod - def set_pattern(self, pattern: PatternSpec) -> None: - """设置当前图案(未 apply)。""" - - @abstractmethod - def apply(self) -> None: - """将已配置的信号格式 + 图案一次性提交给硬件。""" - - @abstractmethod - def current_resolution(self) -> tuple[int, int]: - """读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。""" - - @abstractmethod - def search_devices(self) -> list[str]: - """枚举可用设备的 SDK 显示字符串列表。""" - - @property - @abstractmethod - def format_changed(self) -> bool: - """最近一次视频模式提交是否相对上次发生变化。""" - - @property - @abstractmethod - def last_error(self) -> str | None: - """最近一次配置/应用失败时的错误描述。""" - - @abstractmethod - def apply_signal_format( - self, - *, - color_space: str | None = None, - data_range: str | None = None, - bit_depth: str | None = None, - color_format: str | None = None, - max_cll: int | None = None, - max_fall: int | None = None, - ) -> bool: - """仅更新信号格式(沿用当前 timing),不切换图案。""" - - @abstractmethod - def set_ucd_params(self, config) -> bool: - """按 PQConfig stage 色彩 / Timing / Pattern 类型(不 apply 输出)。""" - - @abstractmethod - def send_current_pattern_params(self, pattern_params) -> bool: - """更新当前 pattern 参数并 apply 到硬件。""" - - @abstractmethod - def apply_config_and_run(self, config, pattern_params) -> bool: - """set_ucd_params + set_pattern + run 复合操作。""" - - -# ─── §3 UCD323Device 真实实现 ──────────────────────────────────── - - -class UCD323Device(IUcdDevice): - """生产环境实现。内部委托给传统 :class:`UCDController`(Phase 1)。""" - - def __init__(self, bus: EventBus, controller: "UCDController | None" = None): - from drivers.UCD323_Function import UCDController as _UCDController - - self._bus = bus - self._controller: "UCDController" = controller or _UCDController() - self._state: UcdState = UcdState.CLOSED - self._info: DeviceInfo | None = None - self._interface: Interface = Interface.HDMI - self._lock = threading.RLock() - self._lock_owner_tid: int | None = None - self._lock_owner_name: str | None = None - - self._curr_signal: SignalFormat | None = None - self._curr_timing: TimingSpec | None = None - self._curr_pattern: PatternSpec | None = None - self._last_applied: tuple[SignalFormat, TimingSpec] | None = None - - @contextmanager - def _acquire_device_lock(self, op_name: str): - current = threading.current_thread() - log.info( - "UCD323Device.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s", - op_name, - _DEVICE_LOCK_TIMEOUT_SECONDS, - threading.get_ident(), - current.name, - self._lock_owner_tid, - self._lock_owner_name, - ) - acquired = self._lock.acquire(timeout=_DEVICE_LOCK_TIMEOUT_SECONDS) - if not acquired: - raise UcdStateError( - "UCD device busy: lock timeout in " - f"UCD323Device.{op_name}, " - f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}" - ) - prev_owner_tid = self._lock_owner_tid - prev_owner_name = self._lock_owner_name - self._lock_owner_tid = threading.get_ident() - self._lock_owner_name = current.name - log.info( - "UCD323Device.%s lock acquired tid=%s thread=%s", - op_name, - self._lock_owner_tid, - self._lock_owner_name, - ) - try: - yield - finally: - self._lock_owner_tid = prev_owner_tid - self._lock_owner_name = prev_owner_name - self._lock.release() - - # -- 读访问 -------------------------------------------------- - - @property - def state(self) -> UcdState: - return self._state - - @property - def info(self) -> DeviceInfo | None: - return self._info - - # -- 生命周期 ------------------------------------------------ - - def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: - with self._acquire_device_lock("open"): - assert_transition(self._state, UcdState.OPENED) - if interface is not Interface.HDMI: - # Phase 1:底层 UCDController.open() 写死了 HDMISource。 - raise UcdConfigError( - f"暂不支持接口 {interface.value};当前仅实现 HDMI" - ) - try: - ok = self._controller.open(info.display) - except Exception as exc: # noqa: BLE001 - raise UcdSdkError(f"打开设备失败: {info.display}") from exc - if not ok: - raise UcdSdkError(f"打开设备失败: {info.display}") - self._info = info - self._interface = interface - self._state = UcdState.OPENED - self._bus.publish(ConnectionChanged(True, info.serial)) - - def close(self) -> None: - with self._acquire_device_lock("close"): - if self._state == UcdState.CLOSED: - return - try: - self._controller.close() - except Exception: # noqa: BLE001 - log.exception("关闭 UCD 时发生异常") - self._state = UcdState.CLOSED - self._curr_signal = None - self._curr_timing = None - self._curr_pattern = None - self._last_applied = None - prev_serial = self._info.serial if self._info else None - self._info = None - self._bus.publish(ConnectionChanged(False, prev_serial)) - - # -- 配置 ---------------------------------------------------- - - def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: - with self._acquire_device_lock("configure"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 configure") - try: - # 颜色模式(color_format / bpc / colorimetry) - if not self._controller.set_color_mode( - signal.color_format.value, - int(signal.bpc), - _colorimetry_to_legacy_key(signal), - ): - raise UcdConfigError( - f"set_color_mode 失败: {signal!r}" - ) - # dynamic_range 在新接口中是一等公民 - self._apply_dynamic_range(signal) - - # Timing - if not self._controller.set_timing_from_string(str(timing)): - raise UcdConfigError(f"set_timing_from_string 失败: {timing}") - except UcdConfigError: - raise - except Exception as exc: # noqa: BLE001 - raise UcdSdkError("configure 异常") from exc - - self._curr_signal = signal - self._curr_timing = timing - self._state = UcdState.CONFIGURED - return (signal, timing) != self._last_applied - - def set_pattern(self, pattern: PatternSpec) -> None: - with self._acquire_device_lock("set_pattern"): - # Phase 2 过渡:允许从 OPENED 直接 set_pattern——遗留路径 - # (test_runner 等)通过旧 controller.apply_signal_format 写入 - # 信号格式,未经过本设备的 configure。此时 self._state 仍为 - # OPENED,但硬件实际已处于可接收 pattern 状态。 - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 set_pattern") - self._curr_pattern = pattern - # 仅本地暂存,真正写硬件在 apply() - - def apply(self) -> None: - with self._acquire_device_lock("apply"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 apply") - if self._curr_pattern is None: - raise UcdStateError("apply 前必须先 set_pattern") - try: - ok = self._apply_pattern_via_controller(self._curr_pattern) - except Exception as exc: # noqa: BLE001 - raise UcdSdkError( - f"apply 异常: {type(exc).__name__}: {exc}" - ) from exc - if not ok: - raise UcdApplyFailed( - f"apply 失败: pattern={self._curr_pattern.kind.value}" - ) - - # SignalApplied 事件仅在通过新 API configure 过时发出; - # 遗留路径下 self._curr_signal/_curr_timing 可能为 None。 - if self._curr_signal is not None and self._curr_timing is not None: - changed = (self._curr_signal, self._curr_timing) != self._last_applied - self._last_applied = (self._curr_signal, self._curr_timing) - self._bus.publish( - SignalApplied(self._curr_signal, self._curr_timing, changed) - ) - self._state = UcdState.APPLIED - self._bus.publish(PatternApplied(self._curr_pattern)) - - # -- 查询 ---------------------------------------------------- - - def current_resolution(self) -> tuple[int, int]: - try: - return self._controller.get_current_resolution((3840, 2160)) - except Exception: # noqa: BLE001 - return (3840, 2160) - - def search_devices(self) -> list[str]: - try: - return self._controller.search_device() or [] - except Exception as exc: # noqa: BLE001 - raise UcdSdkError("枚举 UCD 设备失败") from exc - - @property - def format_changed(self) -> bool: - return bool(getattr(self._controller, "format_changed", True)) - - @property - def last_error(self) -> str | None: - err = getattr(self._controller, "last_error", None) - return str(err) if err else None - - def apply_signal_format( - self, - *, - color_space: str | None = None, - data_range: str | None = None, - bit_depth: str | None = None, - color_format: str | None = None, - max_cll: int | None = None, - max_fall: int | None = None, - ) -> bool: - with self._acquire_device_lock("apply_signal_format"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 apply_signal_format") - return bool( - self._controller.apply_signal_format( - color_space=color_space, - data_range=data_range, - bit_depth=bit_depth, - color_format=color_format, - max_cll=max_cll, - max_fall=max_fall, - ) - ) - - def set_ucd_params(self, config) -> bool: - with self._acquire_device_lock("set_ucd_params"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 set_ucd_params") - return bool(self._controller.set_ucd_params(config)) - - def send_current_pattern_params(self, pattern_params) -> bool: - with self._acquire_device_lock("send_current_pattern_params"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 send_current_pattern_params") - ok = bool(self._controller.send_current_pattern_params(pattern_params)) - if ok: - self._state = UcdState.APPLIED - return ok - - def apply_config_and_run(self, config, pattern_params) -> bool: - with self._acquire_device_lock("apply_config_and_run"): - if self._state == UcdState.CLOSED: - raise UcdNotConnected("UCD 未连接,无法 apply_config_and_run") - ctrl = self._controller - if not ctrl.set_ucd_params(config): - return False - if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): - return False - ok = bool(ctrl.run()) - if ok: - self._state = UcdState.APPLIED - return ok - - # -- 内部辅助 ------------------------------------------------ - - def _apply_dynamic_range(self, signal: SignalFormat) -> None: - import UniTAP # 局部导入,避免本模块在无 SDK 环境下导入即失败 - - from app.ucd_domain import DynamicRange - - ci = self._controller.color_info - if ci is None: - return - if signal.dynamic_range is DynamicRange.FULL: - ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA - else: - ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA - - def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool: - """根据 PatternKind 走最合适的旧 controller 路径。""" - if pattern.kind is PatternKind.IMAGE: - if not pattern.image_path: - raise UcdConfigError("IMAGE pattern 必须提供 image_path") - return bool(self._controller.send_image_pattern(pattern.image_path)) - - # 预定义图案路径:复用 controller.set_pattern + run() - from drivers.UCD323_Enum import UCDEnum # 局部导入避免循环 - - video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value) - if video_pattern is None: - raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}") - self._controller.current_pattern = video_pattern - - params: list[int] | None = None - if pattern.kind is PatternKind.SOLID: - if pattern.solid_rgb is None: - raise UcdConfigError("SOLID pattern 必须提供 solid_rgb") - params = list(pattern.solid_rgb) - elif pattern.extras: - params = list(pattern.extras) - - if not self._controller.set_pattern(video_pattern, params): - raise UcdApplyFailed("controller.set_pattern 返回 False") - # Skip apply_video_mode() (i.e. pg.set_vm) – the video format is already - # configured by the main signal panel and re-applying it blocks until the - # device re-locks, causing an apparent UI freeze for pattern-only sends. - if not self._controller.apply_pattern(): - raise UcdApplyFailed("controller.apply_pattern 返回 False") - if getattr(self._controller, "current_timing", None) is None: - raise UcdConfigError( - "current_timing is None; please apply selected test profile/timing before sending pattern" - ) - try: - pg, _ = self._controller.get_tx_modules() - if not self._controller._apply_pg_output(pg): - raise UcdApplyFailed("controller.apply_pg_output 返回 False") - except UcdApplyFailed: - raise - except Exception as exc: - raise UcdSdkError("pg.apply() 失败") from exc - return True - - -def _colorimetry_to_legacy_key(signal: SignalFormat) -> str: - """新 :class:`Colorimetry` → 旧 ``UCDEnum.ColorInfo.get_colorimetry`` 的 key。 - - BT.2020 在 YCbCr / RGB 输出下走不同 SDK 枚举(参考旧 - ``_get_colorimetry_from_color_space`` 的逻辑),这里也做同样的分支。 - """ - from app.ucd_domain import Colorimetry, is_ycbcr - - cm = signal.colorimetry - ycbcr = is_ycbcr(signal.color_format) - - if cm is Colorimetry.BT2020: - return "bt2020ycbcr" if ycbcr else "bt2020rgb" - return { - Colorimetry.SRGB: "srgb", - Colorimetry.BT709: "bt709", - Colorimetry.BT601: "bt601", - Colorimetry.DCI_P3: "dcip3", - Colorimetry.ADOBE_RGB: "adobergb", - }.get(cm, "srgb") - - -# ─── §4 FakeUcdDevice 单测实现 ─────────────────────────────────── - - -class FakeUcdDevice(IUcdDevice): - """无硬件依赖的 Fake 实现;记录调用序列供单测断言。""" - - def __init__(self, bus: EventBus | None = None) -> None: - self._bus = bus or EventBus() - self._state = UcdState.CLOSED - self._info: DeviceInfo | None = None - self._signal: SignalFormat | None = None - self._timing: TimingSpec | None = None - self._pattern: PatternSpec | None = None - self._last_applied: tuple[SignalFormat, TimingSpec] | None = None - self.calls: list[tuple] = [] # ("open", info) / ("configure", ...) ... - - @property - def state(self) -> UcdState: - return self._state - - @property - def info(self) -> DeviceInfo | None: - return self._info - - def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: - assert_transition(self._state, UcdState.OPENED) - self.calls.append(("open", info, interface)) - self._info = info - self._state = UcdState.OPENED - self._bus.publish(ConnectionChanged(True, info.serial)) - - def close(self) -> None: - if self._state == UcdState.CLOSED: - return - self.calls.append(("close",)) - self._state = UcdState.CLOSED - prev = self._info.serial if self._info else None - self._info = None - self._signal = self._timing = self._pattern = None - self._last_applied = None - self._bus.publish(ConnectionChanged(False, prev)) - - def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("configure", signal, timing)) - self._signal, self._timing = signal, timing - self._state = UcdState.CONFIGURED - return (signal, timing) != self._last_applied - - def set_pattern(self, pattern: PatternSpec) -> None: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("set_pattern", pattern)) - self._pattern = pattern - - def apply(self) -> None: - if self._signal is None or self._timing is None: - raise UcdStateError("apply 前必须 configure") - if self._pattern is None: - raise UcdStateError("apply 前必须 set_pattern") - self.calls.append(("apply",)) - changed = (self._signal, self._timing) != self._last_applied - self._last_applied = (self._signal, self._timing) - self._state = UcdState.APPLIED - self._bus.publish(SignalApplied(self._signal, self._timing, changed)) - self._bus.publish(PatternApplied(self._pattern)) - - def current_resolution(self) -> tuple[int, int]: - if self._timing is None: - return (3840, 2160) - return (self._timing.width, self._timing.height) - - def search_devices(self) -> list[str]: - return [] - - @property - def format_changed(self) -> bool: - return self._last_applied is None - - @property - def last_error(self) -> str | None: - return None - - def apply_signal_format(self, **kwargs) -> bool: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("apply_signal_format", kwargs)) - return True - - def set_ucd_params(self, config) -> bool: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("set_ucd_params", config)) - self._state = UcdState.OPENED - return True - - def send_current_pattern_params(self, pattern_params) -> bool: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("send_current_pattern_params", pattern_params)) - self._state = UcdState.APPLIED - return True - - def apply_config_and_run(self, config, pattern_params) -> bool: - if self._state == UcdState.CLOSED: - raise UcdNotConnected() - self.calls.append(("apply_config_and_run", config, pattern_params)) - self._state = UcdState.APPLIED - return True - - -__all__ = [ - "DeviceInfo", - "list_devices", - "IUcdDevice", - "UCD323Device", - "FakeUcdDevice", -] diff --git a/pqAutomationApp.py b/pqAutomationApp.py index e946f0a..c7d3a1f 100644 --- a/pqAutomationApp.py +++ b/pqAutomationApp.py @@ -9,9 +9,14 @@ import traceback import matplotlib import matplotlib.pyplot as plt from app_version import APP_NAME, APP_VERSION, get_app_title -from drivers.ucd_driver import UCD323Device -from app.ucd_domain import EventBus -from app.services.ucd_service import SignalService +from app.ucd import ( + ConnectionChanged, + DeviceKind, + EventBus, + PatternService, + SignalService, + UCD323Device, +) from app.pq.pq_config import PQConfig from app.pq.pq_result import PQResultStore from app.export import ( @@ -55,7 +60,6 @@ from app.plots.plot_gamut import PlotGamutMixin from app.views.chart_frame import ChartFrameMixin from app.config_io import ConfigIOMixin from app.tests.local_dimming import LocalDimmingMixin -from app.services import PatternService from app.device.connection import DeviceConnectionMixin from app.runner.test_runner import TestRunnerMixin @@ -201,6 +205,7 @@ class PQAutomationApp( self.create_calman_panel() # 创建测试类型选择区域 self.create_test_type_frame() + self._setup_connection_event_handlers() # 创建操作按钮区域 self.create_operation_frame() # 创建结果图表区域 @@ -229,6 +234,23 @@ class PQAutomationApp( anchor=tk.E, ).pack(side=tk.RIGHT) + def _setup_connection_event_handlers(self) -> None: + """订阅连接事件,驱动 UCD / CA 指示灯(替代轮询 controller.status)。""" + + def on_connection_changed(evt: ConnectionChanged) -> None: + if evt.device is DeviceKind.UCD: + indicator = getattr(self, "ucd_status_indicator", None) + elif evt.device is DeviceKind.CA: + indicator = getattr(self, "ca_status_indicator", None) + else: + return + if indicator is None: + return + state = "green" if evt.connected else "gray" + self._dispatch_ui(self.update_connection_indicator, indicator, state) + + self.event_bus.subscribe(ConnectionChanged, on_connection_changed) + def _dispatch_ui(self, fn, *args, **kwargs): """把 ``fn(*args, **kwargs)`` 调度到 Tk 主线程执行。 diff --git a/pqAutomationApp.spec b/pqAutomationApp.spec index 409f435..21099fa 100644 --- a/pqAutomationApp.spec +++ b/pqAutomationApp.spec @@ -108,11 +108,11 @@ a = Analysis( 'drivers.baseSerail', 'drivers.caSerail', 'drivers.tvSerail', - 'drivers.UCD323_Enum', - 'drivers.UCD323_Function', - 'drivers.ucd_driver', - 'app.ucd_domain', - 'app.services.ucd_service', + 'app.ucd', + 'app.ucd.domain', + 'app.ucd.enum', + 'app.ucd.device', + 'app.ucd.service', ], hookspath=[], hooksconfig={},