Files
pqAutomationApp/app/services/ucd_service.py

291 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""UCD 信号 / 图案应用服务层。
服务层是 GUI ↔ Driver 的唯一通道,负责:
- 将 UI 字符串("BT.709""10bit""YCbCr 4:4:4" 等)翻译成 :class:`SignalFormat`
- 将各 panel 的 timing 字符串翻译成 :class:`TimingSpec`
- 协调 :meth:`IUcdDevice.configure` / ``set_pattern`` / ``apply`` 的调用顺序;
- 通过 :class:`EventBus` 让 GUI 订阅状态变化,而非主动轮询。
本层不直接 import UniTAP也不读取 :mod:`tkinter` 变量;
所有输入都是显式参数,便于单测。
"""
from __future__ import annotations
from contextlib import contextmanager
import logging
import sys
import threading
from app.ucd_domain import (
Colorimetry,
DynamicRange,
EventBus,
PatternKind,
PatternSpec,
SignalFormat,
TimingSpec,
UcdError,
bit_depth_str_to_bpc,
color_space_to_colorimetry,
data_range_to_dynamic_range,
output_format_to_color_format,
parse_timing_str,
)
from drivers.ucd_driver import IUcdDevice
log = logging.getLogger(__name__)
_LOCK_TIMEOUT_SECONDS = 8.0
_DEBUG_LOCK_TIMEOUT_SECONDS = 0.3
_PATTERN_LOCK_TIMEOUT_SECONDS = 0.8
# ─── 视图字符串 → 值对象 转换工具 ────────────────────────────────
def build_signal_format(
*,
color_space: str,
output_format: str,
bit_depth: str,
data_range: str = "Full",
) -> SignalFormat:
"""根据下拉框字符串组装 :class:`SignalFormat`。
各参数解析失败抛 :class:`UcdConfigError`。
"""
return SignalFormat(
color_format=output_format_to_color_format(output_format),
colorimetry=color_space_to_colorimetry(color_space),
bpc=bit_depth_str_to_bpc(bit_depth),
dynamic_range=data_range_to_dynamic_range(data_range),
)
def build_timing(timing_str: str) -> TimingSpec:
"""``"DMT 3840x2160@60Hz"`` → :class:`TimingSpec`。"""
return parse_timing_str(timing_str)
def solid_rgb_pattern(rgb: tuple[int, int, int] | list[int]) -> PatternSpec:
r, g, b = rgb[0], rgb[1], rgb[2]
return PatternSpec(kind=PatternKind.SOLID, solid_rgb=(int(r), int(g), int(b)))
def image_pattern(path: str) -> PatternSpec:
return PatternSpec(kind=PatternKind.IMAGE, image_path=path)
# ─── 服务 ────────────────────────────────────────────────────────
class SignalService:
"""协调 SignalFormat / Timing / Pattern 的写入与提交。
使用线程锁串行化所有对外的 ``apply_*`` 调用,避免多个测试线程
同时操作 UCD 造成 SDK 状态错乱。
"""
def __init__(self, device: IUcdDevice, bus: EventBus):
self._dev = device
self._bus = bus
self._lock = threading.RLock()
self._lock_owner_tid: int | None = None
self._lock_owner_name: str | None = None
def _effective_lock_timeout(self, timeout_override: float | None = None) -> float:
"""调试模式下缩短锁等待,避免单步时表现为 UI 长时间无响应。"""
if timeout_override is not None:
return timeout_override
if sys.gettrace() is not None:
return _DEBUG_LOCK_TIMEOUT_SECONDS
return _LOCK_TIMEOUT_SECONDS
@contextmanager
def _acquire_service_lock(self, op_name: str, timeout_override: float | None = None):
timeout = self._effective_lock_timeout(timeout_override)
current = threading.current_thread()
log.info(
"SignalService.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s",
op_name,
timeout,
threading.get_ident(),
current.name,
self._lock_owner_tid,
self._lock_owner_name,
)
acquired = self._lock.acquire(timeout=timeout)
if not acquired:
raise UcdError(
"UCD busy: lock timeout in "
f"SignalService.{op_name} ({timeout:.1f}s), "
f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}"
)
prev_owner_tid = self._lock_owner_tid
prev_owner_name = self._lock_owner_name
self._lock_owner_tid = threading.get_ident()
self._lock_owner_name = current.name
log.info(
"SignalService.%s lock acquired tid=%s thread=%s",
op_name,
self._lock_owner_tid,
self._lock_owner_name,
)
try:
yield
finally:
self._lock_owner_tid = prev_owner_tid
self._lock_owner_name = prev_owner_name
self._lock.release()
# -- 高层接口 ------------------------------------------------
def apply(
self,
*,
signal: SignalFormat,
timing: TimingSpec,
pattern: PatternSpec,
) -> bool:
"""一次性提交信号格式 + timing + 图案。
Returns:
``format_changed``——本次相对上一次 :meth:`apply` 是否变化。
"""
with self._acquire_service_lock("apply"):
log.info(
"SignalService.apply signal=%s timing=%s pattern=%s",
signal,
timing,
pattern.kind.value,
)
changed = self._dev.configure(signal, timing)
self._dev.set_pattern(pattern)
self._dev.apply()
return changed
def send_pattern(self, pattern: PatternSpec) -> None:
"""在已 configure 的信号上仅更新图案后 apply。"""
with self._acquire_service_lock("send_pattern", _PATTERN_LOCK_TIMEOUT_SECONDS):
log.info("SignalService.send_pattern pattern=%s", pattern.kind.value)
self._dev.set_pattern(pattern)
self._dev.apply()
def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None:
self.send_pattern(solid_rgb_pattern(rgb))
def send_image(self, path: str) -> None:
self.send_pattern(image_pattern(path))
# -- 过渡期 APIPhase 2-----------------------------------
# 现有 GUI 回调以"仅更新信号格式、不切换图案"的方式调用
# ``ucd.apply_signal_format(color_space=..., color_format=..., bit_depth=...)``。
# 新代码统一通过本方法走 SignalService内部仍委托给底层
# controller 的同名旧接口,迁移完成后将替换为纯净实现。
def update_signal_format(
self,
*,
color_space: str,
output_format: str,
bit_depth: str,
data_range: str = "Full",
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
"""仅将信号格式 stage 到 SDK沿用上一次的 timing不切换图案。
UI 字符串先经域层解析做参数校验;解析失败抛 :class:`UcdConfigError`。
"""
# 解析仅做校验;当前实现走 raw controller 的旧 API
_ = build_signal_format(
color_space=color_space,
output_format=output_format,
bit_depth=bit_depth,
data_range=data_range,
)
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("update_signal_format 暂仅支持 UCD323Device")
with self._acquire_service_lock("update_signal_format"):
return bool(
ctrl.apply_signal_format(
color_space=color_space,
color_format=output_format,
bit_depth=bit_depth,
data_range=data_range,
max_cll=max_cll,
max_fall=max_fall,
)
)
# -- 透传给上层的查询 ---------------------------------------
@property
def device(self) -> IUcdDevice:
return self._dev
def current_resolution(self) -> tuple[int, int]:
return self._dev.current_resolution()
@property
def is_connected(self) -> bool:
"""UCD 设备是否已打开。供 GUI 做前置校验。"""
ctrl = getattr(self._dev, "raw_controller", None)
return bool(ctrl and getattr(ctrl, "status", False))
# -- 过渡期 APIPhase 5config 驱动的写入 -----------------
# 现有 GUI / Service 通过 ``PQConfig`` 对象描述当次测试参数,
# 由 :class:`UCDController.set_ucd_params` 翻译为色彩/Timing/Pattern。
# 在配置层重构落地前,这两个方法作为 SignalService 的统一入口,
# 让上层不再直接接触 ``self.app.ucd``。
def apply_config(self, config) -> bool:
"""按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern不 apply 输出)。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_config 暂仅支持 UCD323Device")
with self._acquire_service_lock("apply_config"):
return bool(ctrl.set_ucd_params(config))
def send_pattern_params(self, params) -> bool:
"""以 ``params`` 更新当前 pattern 的参数并 apply。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("send_pattern_params 暂仅支持 UCD323Device")
with self._acquire_service_lock("send_pattern_params"):
return bool(ctrl.send_current_pattern_params(params))
def apply_and_run(self, config, pattern_params) -> bool:
"""``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。
服务于 custom_template_panel 单步流程。
"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_and_run 暂仅支持 UCD323Device")
with self._acquire_service_lock("apply_and_run"):
if not ctrl.set_ucd_params(config):
return False
if not ctrl.set_pattern(ctrl.current_pattern, pattern_params):
return False
return bool(ctrl.run())
__all__ = [
"SignalService",
"build_signal_format",
"build_timing",
"solid_rgb_pattern",
"image_pattern",
# 重导出常用域类型方便上层 import 一次到位
"SignalFormat",
"TimingSpec",
"PatternSpec",
"PatternKind",
"Colorimetry",
"DynamicRange",
"UcdError",
]