From cc7218411c8b914c2645bef929f16f9bc23c23e7 Mon Sep 17 00:00:00 2001 From: "xinzhu.yin" Date: Thu, 11 Jun 2026 15:53:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84UCD=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/device/connection.py | 9 +- app/runner/test_runner.py | 4 +- app/services/pattern_service.py | 11 +- app/services/ucd_service.py | 185 +++++++--------------- app/tests/local_dimming.py | 113 ++++++++++++- app/views/panels/custom_template_panel.py | 6 +- app/views/panels/main_layout.py | 14 +- app_version.py | 4 +- drivers/UCD323_Function.py | 50 +++++- drivers/ucd_driver.py | 173 ++++++++++++++++++-- pqAutomationApp.py | 14 +- 11 files changed, 395 insertions(+), 188 deletions(-) diff --git a/app/device/connection.py b/app/device/connection.py index 742eb5d..15abe17 100644 --- a/app/device/connection.py +++ b/app/device/connection.py @@ -65,7 +65,7 @@ class ConnectionController: def list_ucd_devices(self) -> list[str]: """返回 SDK 给出的设备显示字符串列表。""" try: - return self._device.raw_controller.search_device() or [] + return self._device.search_devices() except Exception as exc: # noqa: BLE001 self._log(f"枚举 UCD 设备失败: {exc}", level="error") return [] @@ -103,11 +103,6 @@ class ConnectionController: self._device.close() except Exception: # noqa: BLE001 pass - # 旧 controller.status 也要清零,兼容仍读取它的代码 - try: - self._app.ucd.status = False - except Exception: # noqa: BLE001 - pass self._log("UCD连接已断开", level="info") # -- CA 连接 ------------------------------------------------- @@ -347,7 +342,7 @@ def update_connection_indicator(self: "PQAutomationApp", indicator, connected): def refresh_connection_indicators(self: "PQAutomationApp"): """根据当前设备状态重画 UCD / CA 指示灯。""" if hasattr(self, "ucd_status_indicator"): - ucd_connected = bool(getattr(self.ucd, "status", False)) + ucd_connected = self.signal_service.is_connected _draw_connection_indicator( self.ucd_status_indicator, "green" if ucd_connected else "gray", diff --git a/app/runner/test_runner.py b/app/runner/test_runner.py index c66ed43..30906d1 100644 --- a/app/runner/test_runner.py +++ b/app/runner/test_runner.py @@ -330,9 +330,7 @@ def send_fix_pattern(self: "PQAutomationApp", mode): if mode == "screen_module": format_changed = True else: - format_changed = bool( - getattr(getattr(self, "ucd", None), "format_changed", True) - ) + format_changed = bool(self.signal_service.format_changed) # 预热提交:prepare_session 仅 stage 了新的 color/timing/pattern, # 真正的 ``pg.apply()`` 要到第一次发图时才发生。提前发送首个 pattern, diff --git a/app/services/pattern_service.py b/app/services/pattern_service.py index 3b7de95..9f80634 100644 --- a/app/services/pattern_service.py +++ b/app/services/pattern_service.py @@ -24,14 +24,9 @@ class PatternService: def _build_apply_config_error(self, test_type): timing = self.app.config.current_test_types.get(test_type, {}).get("timing", "-") detail = "" - try: - ctrl = getattr(self.app.signal_service.device, "raw_controller", None) - if ctrl is not None: - d = getattr(ctrl, "last_error", None) - if d: - detail = f", detail={d}" - except Exception: - pass + err = self.app.signal_service.last_error + if err: + detail = f", detail={err}" return f"UCD profile apply_config failed for {test_type}, timing={timing}{detail}" def prepare_session(self, mode, *, test_type=None, log_details=False): diff --git a/app/services/ucd_service.py b/app/services/ucd_service.py index a333feb..d4b9d8b 100644 --- a/app/services/ucd_service.py +++ b/app/services/ucd_service.py @@ -8,14 +8,13 @@ 本层不直接 import UniTAP,也不读取 :mod:`tkinter` 变量; 所有输入都是显式参数,便于单测。 + +线程安全由 :class:`UCD323Device` 的设备锁统一保证,本层不再重复加锁。 """ from __future__ import annotations -from contextlib import contextmanager import logging -import sys -import threading from app.ucd_domain import ( Colorimetry, @@ -26,6 +25,7 @@ from app.ucd_domain import ( SignalFormat, TimingSpec, UcdError, + UcdState, bit_depth_str_to_bpc, color_space_to_colorimetry, data_range_to_dynamic_range, @@ -36,10 +36,6 @@ from drivers.ucd_driver import IUcdDevice log = logging.getLogger(__name__) -_LOCK_TIMEOUT_SECONDS = 8.0 -_DEBUG_LOCK_TIMEOUT_SECONDS = 0.3 -_PATTERN_LOCK_TIMEOUT_SECONDS = 0.8 - # ─── 视图字符串 → 值对象 转换工具 ──────────────────────────────── @@ -63,6 +59,23 @@ def build_signal_format( ) +def build_signal_format_from_profile( + *, + color_space: str, + color_format: str, + bpc: int, + data_range: str = "Full", +) -> SignalFormat: + """从 PQConfig test_type 条目组装 :class:`SignalFormat`。""" + bit_depth = f"{int(bpc)}bit" + return build_signal_format( + color_space=color_space, + output_format=color_format, + bit_depth=bit_depth, + data_range=data_range, + ) + + def build_timing(timing_str: str) -> TimingSpec: """``"DMT 3840x2160@60Hz"`` → :class:`TimingSpec`。""" return parse_timing_str(timing_str) @@ -81,63 +94,11 @@ def image_pattern(path: str) -> PatternSpec: class SignalService: - """协调 SignalFormat / Timing / Pattern 的写入与提交。 - - 使用线程锁串行化所有对外的 ``apply_*`` 调用,避免多个测试线程 - 同时操作 UCD 造成 SDK 状态错乱。 - """ + """协调 SignalFormat / Timing / Pattern 的写入与提交。""" def __init__(self, device: IUcdDevice, bus: EventBus): self._dev = device self._bus = bus - self._lock = threading.RLock() - self._lock_owner_tid: int | None = None - self._lock_owner_name: str | None = None - - def _effective_lock_timeout(self, timeout_override: float | None = None) -> float: - """调试模式下缩短锁等待,避免单步时表现为 UI 长时间无响应。""" - if timeout_override is not None: - return timeout_override - if sys.gettrace() is not None: - return _DEBUG_LOCK_TIMEOUT_SECONDS - return _LOCK_TIMEOUT_SECONDS - - @contextmanager - def _acquire_service_lock(self, op_name: str, timeout_override: float | None = None): - timeout = self._effective_lock_timeout(timeout_override) - current = threading.current_thread() - log.info( - "SignalService.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s", - op_name, - timeout, - threading.get_ident(), - current.name, - self._lock_owner_tid, - self._lock_owner_name, - ) - acquired = self._lock.acquire(timeout=timeout) - if not acquired: - raise UcdError( - "UCD busy: lock timeout in " - f"SignalService.{op_name} ({timeout:.1f}s), " - f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}" - ) - prev_owner_tid = self._lock_owner_tid - prev_owner_name = self._lock_owner_name - self._lock_owner_tid = threading.get_ident() - self._lock_owner_name = current.name - log.info( - "SignalService.%s lock acquired tid=%s thread=%s", - op_name, - self._lock_owner_tid, - self._lock_owner_name, - ) - try: - yield - finally: - self._lock_owner_tid = prev_owner_tid - self._lock_owner_name = prev_owner_name - self._lock.release() # -- 高层接口 ------------------------------------------------ @@ -153,24 +114,22 @@ class SignalService: Returns: ``format_changed``——本次相对上一次 :meth:`apply` 是否变化。 """ - with self._acquire_service_lock("apply"): - log.info( - "SignalService.apply signal=%s timing=%s pattern=%s", - signal, - timing, - pattern.kind.value, - ) - changed = self._dev.configure(signal, timing) - self._dev.set_pattern(pattern) - self._dev.apply() - return changed + log.info( + "SignalService.apply signal=%s timing=%s pattern=%s", + signal, + timing, + pattern.kind.value, + ) + changed = self._dev.configure(signal, timing) + self._dev.set_pattern(pattern) + self._dev.apply() + return changed def send_pattern(self, pattern: PatternSpec) -> None: """在已 configure 的信号上仅更新图案后 apply。""" - with self._acquire_service_lock("send_pattern", _PATTERN_LOCK_TIMEOUT_SECONDS): - log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) - self._dev.set_pattern(pattern) - self._dev.apply() + log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) + self._dev.set_pattern(pattern) + self._dev.apply() def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None: self.send_pattern(solid_rgb_pattern(rgb)) @@ -178,12 +137,6 @@ class SignalService: def send_image(self, path: str) -> None: self.send_pattern(image_pattern(path)) - # -- 过渡期 API(Phase 2)----------------------------------- - # 现有 GUI 回调以"仅更新信号格式、不切换图案"的方式调用 - # ``ucd.apply_signal_format(color_space=..., color_format=..., bit_depth=...)``。 - # 新代码统一通过本方法走 SignalService;内部仍委托给底层 - # controller 的同名旧接口,迁移完成后将替换为纯净实现。 - def update_signal_format( self, *, @@ -194,31 +147,24 @@ class SignalService: max_cll: int | None = None, max_fall: int | None = None, ) -> bool: - """仅将信号格式 stage 到 SDK(沿用上一次的 timing),不切换图案。 + """仅将信号格式提交到 SDK(沿用上一次的 timing),不切换图案。 UI 字符串先经域层解析做参数校验;解析失败抛 :class:`UcdConfigError`。 """ - # 解析仅做校验;当前实现走 raw controller 的旧 API _ = build_signal_format( color_space=color_space, output_format=output_format, bit_depth=bit_depth, data_range=data_range, ) - ctrl = getattr(self._dev, "raw_controller", None) - if ctrl is None: - raise UcdError("update_signal_format 暂仅支持 UCD323Device") - with self._acquire_service_lock("update_signal_format"): - return bool( - ctrl.apply_signal_format( - color_space=color_space, - color_format=output_format, - bit_depth=bit_depth, - data_range=data_range, - max_cll=max_cll, - max_fall=max_fall, - ) - ) + return self._dev.apply_signal_format( + color_space=color_space, + color_format=output_format, + bit_depth=bit_depth, + data_range=data_range, + max_cll=max_cll, + max_fall=max_fall, + ) # -- 透传给上层的查询 --------------------------------------- @@ -232,54 +178,37 @@ class SignalService: @property def is_connected(self) -> bool: """UCD 设备是否已打开。供 GUI 做前置校验。""" - ctrl = getattr(self._dev, "raw_controller", None) - return bool(ctrl and getattr(ctrl, "status", False)) + return self._dev.state != UcdState.CLOSED - # -- 过渡期 API(Phase 5):config 驱动的写入 ----------------- - # 现有 GUI / Service 通过 ``PQConfig`` 对象描述当次测试参数, - # 由 :class:`UCDController.set_ucd_params` 翻译为色彩/Timing/Pattern。 - # 在配置层重构落地前,这两个方法作为 SignalService 的统一入口, - # 让上层不再直接接触 ``self.app.ucd``。 + @property + def format_changed(self) -> bool: + """最近一次视频模式提交是否相对上次发生变化。""" + return self._dev.format_changed + + @property + def last_error(self) -> str | None: + return self._dev.last_error def apply_config(self, config) -> bool: """按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern(不 apply 输出)。""" - ctrl = getattr(self._dev, "raw_controller", None) - if ctrl is None: - raise UcdError("apply_config 暂仅支持 UCD323Device") - with self._acquire_service_lock("apply_config"): - return bool(ctrl.set_ucd_params(config)) + return bool(self._dev.set_ucd_params(config)) def send_pattern_params(self, params) -> bool: """以 ``params`` 更新当前 pattern 的参数并 apply。""" - ctrl = getattr(self._dev, "raw_controller", None) - if ctrl is None: - raise UcdError("send_pattern_params 暂仅支持 UCD323Device") - with self._acquire_service_lock("send_pattern_params"): - return bool(ctrl.send_current_pattern_params(params)) + return bool(self._dev.send_current_pattern_params(params)) def apply_and_run(self, config, pattern_params) -> bool: - """``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。 - - 服务于 custom_template_panel 单步流程。 - """ - ctrl = getattr(self._dev, "raw_controller", None) - if ctrl is None: - raise UcdError("apply_and_run 暂仅支持 UCD323Device") - with self._acquire_service_lock("apply_and_run"): - if not ctrl.set_ucd_params(config): - return False - if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): - return False - return bool(ctrl.run()) + """``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。""" + return bool(self._dev.apply_config_and_run(config, pattern_params)) __all__ = [ "SignalService", "build_signal_format", + "build_signal_format_from_profile", "build_timing", "solid_rgb_pattern", "image_pattern", - # 重导出常用域类型方便上层 import 一次到位 "SignalFormat", "TimingSpec", "PatternSpec", diff --git a/app/tests/local_dimming.py b/app/tests/local_dimming.py index 6fa5ec4..4ca3cb3 100644 --- a/app/tests/local_dimming.py +++ b/app/tests/local_dimming.py @@ -157,9 +157,118 @@ def _ensure_checkerboard_image(width, height, grid_size, center_white): _IMAGE_CACHE[key] = path return path + +def _ld_ucd_params_signature(self: "PQAutomationApp") -> tuple: + """Local Dimming 发图前 UCD 参数签名,用于跳过未变化的重复配置。""" + test_type = getattr(self.config, "current_test_type", "screen_module") + cfg = self.config.current_test_types.get(test_type, {}) + timing = cfg.get("timing", "") + + if test_type == "screen_module": + color_space = ( + self.screen_module_color_space_var.get() + if hasattr(self, "screen_module_color_space_var") + else cfg.get("colorimetry", "sRGB") + ) + data_range = ( + self.screen_module_data_range_var.get() + if hasattr(self, "screen_module_data_range_var") + else cfg.get("data_range", "Full") + ) + bit_depth = ( + self.screen_module_bit_depth_var.get() + if hasattr(self, "screen_module_bit_depth_var") + else f"{int(cfg.get('bpc', 8))}bit" + ) + output_format = ( + self.screen_module_output_format_var.get() + if hasattr(self, "screen_module_output_format_var") + else cfg.get("color_format", "RGB") + ) + elif test_type == "sdr_movie": + color_space = ( + self.sdr_color_space_var.get() + if hasattr(self, "sdr_color_space_var") + else cfg.get("colorimetry", "sRGB") + ) + data_range = ( + self.sdr_data_range_var.get() + if hasattr(self, "sdr_data_range_var") + else cfg.get("data_range", "Full") + ) + bit_depth = ( + self.sdr_bit_depth_var.get() + if hasattr(self, "sdr_bit_depth_var") + else f"{int(cfg.get('bpc', 8))}bit" + ) + output_format = ( + self.sdr_output_format_var.get() + if hasattr(self, "sdr_output_format_var") + else cfg.get("color_format", "RGB") + ) + elif test_type == "hdr_movie": + color_space = ( + self.hdr_color_space_var.get() + if hasattr(self, "hdr_color_space_var") + else cfg.get("colorimetry", "sRGB") + ) + data_range = ( + self.hdr_data_range_var.get() + if hasattr(self, "hdr_data_range_var") + else cfg.get("data_range", "Full") + ) + bit_depth = ( + self.hdr_bit_depth_var.get() + if hasattr(self, "hdr_bit_depth_var") + else f"{int(cfg.get('bpc', 8))}bit" + ) + output_format = ( + self.hdr_output_format_var.get() + if hasattr(self, "hdr_output_format_var") + else cfg.get("color_format", "RGB") + ) + max_cll = self.hdr_maxcll_var.get() if hasattr(self, "hdr_maxcll_var") else None + max_fall = self.hdr_maxfall_var.get() if hasattr(self, "hdr_maxfall_var") else None + return (test_type, timing, color_space, data_range, bit_depth, output_format, max_cll, max_fall) + elif test_type == "local_dimming": + color_space = ( + self.local_dimming_color_space_var.get() + if hasattr(self, "local_dimming_color_space_var") + else cfg.get("colorimetry", "sRGB") + ) + data_range = ( + self.local_dimming_data_range_var.get() + if hasattr(self, "local_dimming_data_range_var") + else cfg.get("data_range", "Full") + ) + bit_depth = ( + self.local_dimming_bit_depth_var.get() + if hasattr(self, "local_dimming_bit_depth_var") + else f"{int(cfg.get('bpc', 8))}bit" + ) + output_format = ( + self.local_dimming_output_format_var.get() + if hasattr(self, "local_dimming_output_format_var") + else cfg.get("color_format", "RGB") + ) + else: + return (test_type,) + + return (test_type, timing, color_space, data_range, bit_depth, output_format) + + +def invalidate_ld_ucd_params_cache(self: "PQAutomationApp") -> None: + """信号格式或分辨率变更后,强制下次发图重新写入 UCD 参数。""" + self._last_ld_ucd_signature = None + + def _apply_ld_ucd_params(self: "PQAutomationApp") -> bool: """发送 Local Dimming 图案前,按当前测试类型写入 UCD 参数。""" - test_type = getattr(self.config, "current_test_type", "screen_module") + signature = _ld_ucd_params_signature(self) + if getattr(self, "_last_ld_ucd_signature", None) == signature: + return True + + test_type = signature[0] cfg = self.config.current_test_types.get(test_type, {}) try: @@ -279,6 +388,7 @@ def _apply_ld_ucd_params(self: "PQAutomationApp") -> bool: self._dispatch_ui(self.log_gui.log, "Local Dimming UCD 参数设置失败", "error") return False + self._last_ld_ucd_signature = signature return True except Exception as e: self._dispatch_ui(self.log_gui.log, f"Local Dimming UCD 参数设置异常: {e}", "error") @@ -914,5 +1024,6 @@ class LocalDimmingMixin: clear_ld_records = clear_ld_records save_local_dimming_results = save_local_dimming_results plot_ld_instant_peak_curve = plot_ld_instant_peak_curve + invalidate_ld_ucd_params_cache = invalidate_ld_ucd_params_cache _insert_ld_tree_item = _insert_ld_tree_item diff --git a/app/views/panels/custom_template_panel.py b/app/views/panels/custom_template_panel.py index 35e2a7f..fed5029 100644 --- a/app/views/panels/custom_template_panel.py +++ b/app/views/panels/custom_template_panel.py @@ -218,7 +218,7 @@ def show_custom_result_context_menu(self: "PQAutomationApp", event): can_single_step = ( has_selection and self.ca is not None - and self.ucd is not None + and self.signal_service.is_connected and not self.testing ) try: @@ -259,7 +259,7 @@ def start_custom_row_single_step(self: "PQAutomationApp"): if not hasattr(self, "custom_result_tree"): return - if self.ca is None or self.ucd is None: + if self.ca is None or not self.signal_service.is_connected: messagebox.showerror("错误", "请先连接CA410和信号发生器") return @@ -570,7 +570,7 @@ def append_custom_template_result(self: "PQAutomationApp", row_no, result_data): def start_custom_template_test(self: "PQAutomationApp"): """开始客户模板测试(SDR)""" - if self.ca is None or self.ucd is None: + if self.ca is None or not self.signal_service.is_connected: messagebox.showerror("错误", "请先连接CA410和信号发生器") return diff --git a/app/views/panels/main_layout.py b/app/views/panels/main_layout.py index 10af4b0..3cb4222 100644 --- a/app/views/panels/main_layout.py +++ b/app/views/panels/main_layout.py @@ -1077,7 +1077,7 @@ def on_screen_module_signal_format_changed(self: "PQAutomationApp", event=None): self.save_pq_config() return - if getattr(self.ucd, "status", False): + if self.signal_service.is_connected: ok = self.signal_service.update_signal_format( color_space=color_space, data_range=data_range, @@ -1121,7 +1121,7 @@ def on_sdr_output_format_changed(self: "PQAutomationApp", event=None): self.log_gui.log("警告: 测试进行中,格式更改将在下次测试时生效", level="error") return - if getattr(self.ucd, "status", False): + if self.signal_service.is_connected: ok = self.signal_service.update_signal_format( color_space=self.sdr_color_space_var.get(), data_range=self.sdr_data_range_var.get(), @@ -1145,7 +1145,7 @@ def on_hdr_output_format_changed(self: "PQAutomationApp", event=None): self.log_gui.log("警告: 测试进行中,格式更改将在下次测试时生效", level="error") return - if getattr(self.ucd, "status", False): + if self.signal_service.is_connected: ok = self.signal_service.update_signal_format( color_space=self.hdr_color_space_var.get(), data_range=self.hdr_data_range_var.get(), @@ -1169,6 +1169,9 @@ def on_local_dimming_timing_changed(self: "PQAutomationApp", event=None): self.config.current_test_types.setdefault("local_dimming", {})["timing"] = selected_timing + if hasattr(self, "invalidate_ld_ucd_params_cache"): + self.invalidate_ld_ucd_params_cache() + if self.testing: self.log_gui.log("警告: 测试进行中,分辨率更改将在下次测试时生效", level="error") @@ -1191,6 +1194,9 @@ def on_local_dimming_signal_format_changed(self: "PQAutomationApp", event=None): ld_cfg["bpc"] = UCDEnum.SignalFormat.BitDepth.get_bit_value(bit_depth) ld_cfg["data_range"] = data_range + if hasattr(self, "invalidate_ld_ucd_params_cache"): + self.invalidate_ld_ucd_params_cache() + self.log_gui.log( ( "Local Dimming 信号格式已更新: " @@ -1205,7 +1211,7 @@ def on_local_dimming_signal_format_changed(self: "PQAutomationApp", event=None): self.save_pq_config() return - if getattr(self.ucd, "status", False): + if self.signal_service.is_connected: ok = self.signal_service.update_signal_format( color_space=color_space, data_range=data_range, diff --git a/app_version.py b/app_version.py index f27b34d..2fbc144 100644 --- a/app_version.py +++ b/app_version.py @@ -3,10 +3,10 @@ APP_VERSION = "106.26.0.0" def is_beta_version(version: str = APP_VERSION) -> bool: - """版本号第3、4段均为 '0' 时(格式 x.x.0.0)判定为测试版。""" + """版本号第3、4段均为 '0' 时(格式 x.x.0.x)判定为测试版。""" parts = version.split(".") if len(parts) >= 4: - return parts[2] == "0" and parts[3] == "0" + return parts[2] == "0" return False diff --git a/drivers/UCD323_Function.py b/drivers/UCD323_Function.py index 746a320..2bb7331 100644 --- a/drivers/UCD323_Function.py +++ b/drivers/UCD323_Function.py @@ -54,9 +54,10 @@ class UCDController: self._close_device_object(temp_dev) raise role_error - pg, _ = self.get_tx_modules() + pg, ag = self.get_tx_modules() self.timing_manager = pg.timing_manager self.color_info = UniTAP.ColorInfo() + self._stop_audio_output(ag) self.status = True return True @@ -82,6 +83,10 @@ class UCDController: """关闭设备""" try: if self.dev: + try: + self._stop_audio_output() + except Exception: + pass self._close_device_object(self.dev) self._reset_state() @@ -152,6 +157,28 @@ class UCDController: return self.role.dptx.pg, self.role.dptx.ag raise ValueError(f"不支持的接口类型: {interface}") + def _stop_audio_output(self, ag=None) -> None: + """关闭 HDMI/DP 音频发生器。PQ 测试仅需视频图案,避免电视持续输出测试音。""" + if not self.status or not self.role: + return + try: + if ag is None: + _, ag = self.get_tx_modules() + ag.stop_generate() + log.info("UCDController._stop_audio_output done") + except Exception: + log.exception("UCDController._stop_audio_output failed") + + def _apply_pg_output(self, pg) -> bool: + """提交 PG 输出,并确保音频发生器处于关闭状态。""" + try: + ok = bool(pg.apply()) + except Exception: + log.exception("UCDController._apply_pg_output pg.apply failed") + return False + self._stop_audio_output() + return ok + def _resolve_timing(self, pg=None): """优先从 current_timing 读取 timing,必要时回退到 TX 模块。""" if self.current_timing is not None: @@ -253,9 +280,9 @@ class UCDController: self.apply_pattern() pg, _ = self.get_tx_modules() log.info("UCDController.run calling pg.apply()") - pg.apply() - log.info("UCDController.run done") - return True + ok = self._apply_pg_output(pg) + log.info("UCDController.run done ok=%s", ok) + return ok def send_image_pattern(self, image_path): """发送图片 Pattern(依赖当前 timing/color_info 状态)。""" @@ -264,10 +291,11 @@ class UCDController: try: pg, _ = self.get_tx_modules() - self.apply_video_mode() + # 仅切换图案,不重复 set_vm;重复 apply video mode 会触发电视 HDMI 重锁发声。 + if getattr(self, "_last_sent_config", None) is None: + self.apply_video_mode() pg.set_pattern(pattern=image_path) - pg.apply() - return True + return self._apply_pg_output(pg) except Exception: return False @@ -323,6 +351,9 @@ class UCDController: current_dynamic_range = self.color_info.dynamic_range color_format = UCDEnum.ColorInfo.get_color_format(cf) + if color_format is None: + fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(cf) + color_format = UCDEnum.ColorInfo.get_color_format(fmt_key) if color_format is None: return False @@ -369,12 +400,17 @@ class UCDController: self.color_info.dynamic_range, self.color_info.bpc, ) + if not self.format_changed: + log.info("UCDController.set_video_mode skipped pg.set_vm(): config unchanged") + return True + video_mode = UniTAP.VideoMode( timing=self.current_timing, color_info=self.color_info ) pg, _ = self.get_tx_modules() log.info("UCDController.set_video_mode calling pg.set_vm()") pg.set_vm(vm=video_mode) + self._stop_audio_output() log.info("UCDController.set_video_mode done") self._last_sent_config = current_config return True diff --git a/drivers/ucd_driver.py b/drivers/ucd_driver.py index 43a6efe..de04318 100644 --- a/drivers/ucd_driver.py +++ b/drivers/ucd_driver.py @@ -3,11 +3,12 @@ 唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口,以及实现: :class:`UCD323Device`(生产)和 :class:`FakeUcdDevice`(单测)。 -Phase 1 实现策略 ------------------ -为保证零行为变更,:class:`UCD323Device` 当前**内部委托**给已有的 -:class:`drivers.UCD323_Function.UCDController`。后续 Phase 2 会将 -SDK 调用直接搬入本模块,届时可删除旧 ``UCDController`` 文件。 +实现策略 +-------- +:class:`UCD323Device` 对外暴露完整的 :class:`IUcdDevice` 接口;SDK 调用 +当前仍委托给 :class:`drivers.UCD323_Function.UCDController`。 +上层(Service / GUI)**不得**直接访问 ``UCDController``。 +后续可将 SDK 调用逐步迁入本模块并删除旧文件。 文件分区: §1 DeviceInfo / list_devices @@ -141,6 +142,45 @@ class IUcdDevice(ABC): def current_resolution(self) -> tuple[int, int]: """读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。""" + @abstractmethod + def search_devices(self) -> list[str]: + """枚举可用设备的 SDK 显示字符串列表。""" + + @property + @abstractmethod + def format_changed(self) -> bool: + """最近一次视频模式提交是否相对上次发生变化。""" + + @property + @abstractmethod + def last_error(self) -> str | None: + """最近一次配置/应用失败时的错误描述。""" + + @abstractmethod + def apply_signal_format( + self, + *, + color_space: str | None = None, + data_range: str | None = None, + bit_depth: str | None = None, + color_format: str | None = None, + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + """仅更新信号格式(沿用当前 timing),不切换图案。""" + + @abstractmethod + def set_ucd_params(self, config) -> bool: + """按 PQConfig stage 色彩 / Timing / Pattern 类型(不 apply 输出)。""" + + @abstractmethod + def send_current_pattern_params(self, pattern_params) -> bool: + """更新当前 pattern 参数并 apply 到硬件。""" + + @abstractmethod + def apply_config_and_run(self, config, pattern_params) -> bool: + """set_ucd_params + set_pattern + run 复合操作。""" + # ─── §3 UCD323Device 真实实现 ──────────────────────────────────── @@ -211,14 +251,6 @@ class UCD323Device(IUcdDevice): def info(self) -> DeviceInfo | None: return self._info - @property - def raw_controller(self) -> "UCDController": - """Phase 1 过渡期:给暂未迁移的旧调用点的逃生通道。 - - 新代码**不**应使用本属性,迁移完成后即可删除。 - """ - return self._controller - # -- 生命周期 ------------------------------------------------ def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: @@ -336,6 +368,74 @@ class UCD323Device(IUcdDevice): except Exception: # noqa: BLE001 return (3840, 2160) + def search_devices(self) -> list[str]: + try: + return self._controller.search_device() or [] + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("枚举 UCD 设备失败") from exc + + @property + def format_changed(self) -> bool: + return bool(getattr(self._controller, "format_changed", True)) + + @property + def last_error(self) -> str | None: + err = getattr(self._controller, "last_error", None) + return str(err) if err else None + + def apply_signal_format( + self, + *, + color_space: str | None = None, + data_range: str | None = None, + bit_depth: str | None = None, + color_format: str | None = None, + max_cll: int | None = None, + max_fall: int | None = None, + ) -> bool: + with self._acquire_device_lock("apply_signal_format"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 apply_signal_format") + return bool( + self._controller.apply_signal_format( + color_space=color_space, + data_range=data_range, + bit_depth=bit_depth, + color_format=color_format, + max_cll=max_cll, + max_fall=max_fall, + ) + ) + + def set_ucd_params(self, config) -> bool: + with self._acquire_device_lock("set_ucd_params"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 set_ucd_params") + return bool(self._controller.set_ucd_params(config)) + + def send_current_pattern_params(self, pattern_params) -> bool: + with self._acquire_device_lock("send_current_pattern_params"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 send_current_pattern_params") + ok = bool(self._controller.send_current_pattern_params(pattern_params)) + if ok: + self._state = UcdState.APPLIED + return ok + + def apply_config_and_run(self, config, pattern_params) -> bool: + with self._acquire_device_lock("apply_config_and_run"): + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 apply_config_and_run") + ctrl = self._controller + if not ctrl.set_ucd_params(config): + return False + if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): + return False + ok = bool(ctrl.run()) + if ok: + self._state = UcdState.APPLIED + return ok + # -- 内部辅助 ------------------------------------------------ def _apply_dynamic_range(self, signal: SignalFormat) -> None: @@ -387,7 +487,10 @@ class UCD323Device(IUcdDevice): ) try: pg, _ = self._controller.get_tx_modules() - pg.apply() + if not self._controller._apply_pg_output(pg): + raise UcdApplyFailed("controller.apply_pg_output 返回 False") + except UcdApplyFailed: + raise except Exception as exc: raise UcdSdkError("pg.apply() 失败") from exc return True @@ -466,8 +569,8 @@ class FakeUcdDevice(IUcdDevice): return (signal, timing) != self._last_applied def set_pattern(self, pattern: PatternSpec) -> None: - if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED): - raise UcdStateError(f"非法状态 {self._state.name}") + if self._state == UcdState.CLOSED: + raise UcdNotConnected() self.calls.append(("set_pattern", pattern)) self._pattern = pattern @@ -488,6 +591,44 @@ class FakeUcdDevice(IUcdDevice): return (3840, 2160) return (self._timing.width, self._timing.height) + def search_devices(self) -> list[str]: + return [] + + @property + def format_changed(self) -> bool: + return self._last_applied is None + + @property + def last_error(self) -> str | None: + return None + + def apply_signal_format(self, **kwargs) -> bool: + if self._state == UcdState.CLOSED: + raise UcdNotConnected() + self.calls.append(("apply_signal_format", kwargs)) + return True + + def set_ucd_params(self, config) -> bool: + if self._state == UcdState.CLOSED: + raise UcdNotConnected() + self.calls.append(("set_ucd_params", config)) + self._state = UcdState.OPENED + return True + + def send_current_pattern_params(self, pattern_params) -> bool: + if self._state == UcdState.CLOSED: + raise UcdNotConnected() + self.calls.append(("send_current_pattern_params", pattern_params)) + self._state = UcdState.APPLIED + return True + + def apply_config_and_run(self, config, pattern_params) -> bool: + if self._state == UcdState.CLOSED: + raise UcdNotConnected() + self.calls.append(("apply_config_and_run", config, pattern_params)) + self._state = UcdState.APPLIED + return True + __all__ = [ "DeviceInfo", diff --git a/pqAutomationApp.py b/pqAutomationApp.py index 33380b1..e946f0a 100644 --- a/pqAutomationApp.py +++ b/pqAutomationApp.py @@ -9,7 +9,6 @@ import traceback import matplotlib import matplotlib.pyplot as plt from app_version import APP_NAME, APP_VERSION, get_app_title -from drivers.UCD323_Function import UCDController from drivers.ucd_driver import UCD323Device from app.ucd_domain import EventBus from app.services.ucd_service import SignalService @@ -100,13 +99,10 @@ class PQAutomationApp( # 初始化设备连接状态 self.ca = None # CA410色度计 - self.ucd = UCDController() # 信号发生器(旧接口,过渡期保留) - # 新架构:EventBus + 设备抽象 + 服务层。 - # UCD323Device 内部委托 self.ucd,保证零行为变更; - # 新代码统一走 self.signal_service。 + # UCD:EventBus + 设备抽象 + 服务层;上层统一走 signal_service / ucd_device。 self.event_bus = EventBus() - self.ucd_device = UCD323Device(self.event_bus, self.ucd) + self.ucd_device = UCD323Device(self.event_bus) self.signal_service = SignalService(self.ucd_device, self.event_bus) # 连接控制器:统一管理 CA/UCD 生命周期。 @@ -467,7 +463,7 @@ class PQAutomationApp( def _check_start_preconditions(self): """检查开始测试前置条件:设备连接 & 未在测试中。""" - if self.ca is None or self.ucd is None: + if self.ca is None or not self.signal_service.is_connected: messagebox.showerror("错误", "请先连接CA410和信号发生器") return False if self.testing: @@ -808,8 +804,8 @@ class PQAutomationApp( print("配置已清理,不再保存") # 断开设备连接 - if self.ucd.status: - self.ucd.close() + if self.signal_service.is_connected: + self.connection.disconnect_ucd() if self.ca is not None: self.ca.close()