Files

291 lines
10 KiB
Python
Raw Permalink Normal View History

2026-05-24 10:49:28 +08:00
"""UCD 信号 / 图案应用服务层。
服务层是 GUI Driver 的唯一通道负责
- UI 字符串"BT.709""10bit""YCbCr 4:4:4" 翻译成 :class:`SignalFormat`
- 将各 panel timing 字符串翻译成 :class:`TimingSpec`
- 协调 :meth:`IUcdDevice.configure` / ``set_pattern`` / ``apply`` 的调用顺序
- 通过 :class:`EventBus` GUI 订阅状态变化而非主动轮询
本层不直接 import UniTAP也不读取 :mod:`tkinter` 变量
所有输入都是显式参数便于单测
"""
from __future__ import annotations
from contextlib import contextmanager
2026-05-24 10:49:28 +08:00
import logging
import sys
2026-05-24 10:49:28 +08:00
import threading
from app.ucd_domain import (
Colorimetry,
DynamicRange,
EventBus,
PatternKind,
PatternSpec,
SignalFormat,
TimingSpec,
UcdError,
bit_depth_str_to_bpc,
color_space_to_colorimetry,
data_range_to_dynamic_range,
output_format_to_color_format,
parse_timing_str,
)
from drivers.ucd_driver import IUcdDevice
log = logging.getLogger(__name__)
_LOCK_TIMEOUT_SECONDS = 8.0
_DEBUG_LOCK_TIMEOUT_SECONDS = 0.3
_PATTERN_LOCK_TIMEOUT_SECONDS = 0.8
2026-05-24 10:49:28 +08:00
# ─── 视图字符串 → 值对象 转换工具 ────────────────────────────────
def build_signal_format(
*,
color_space: str,
output_format: str,
bit_depth: str,
data_range: str = "Full",
) -> SignalFormat:
"""根据下拉框字符串组装 :class:`SignalFormat`。
各参数解析失败抛 :class:`UcdConfigError`
"""
return SignalFormat(
color_format=output_format_to_color_format(output_format),
colorimetry=color_space_to_colorimetry(color_space),
bpc=bit_depth_str_to_bpc(bit_depth),
dynamic_range=data_range_to_dynamic_range(data_range),
)
def build_timing(timing_str: str) -> TimingSpec:
"""``"DMT 3840x2160@60Hz"`` → :class:`TimingSpec`。"""
return parse_timing_str(timing_str)
def solid_rgb_pattern(rgb: tuple[int, int, int] | list[int]) -> PatternSpec:
r, g, b = rgb[0], rgb[1], rgb[2]
return PatternSpec(kind=PatternKind.SOLID, solid_rgb=(int(r), int(g), int(b)))
def image_pattern(path: str) -> PatternSpec:
return PatternSpec(kind=PatternKind.IMAGE, image_path=path)
# ─── 服务 ────────────────────────────────────────────────────────
class SignalService:
"""协调 SignalFormat / Timing / Pattern 的写入与提交。
使用线程锁串行化所有对外的 ``apply_*`` 调用避免多个测试线程
同时操作 UCD 造成 SDK 状态错乱
"""
def __init__(self, device: IUcdDevice, bus: EventBus):
self._dev = device
self._bus = bus
self._lock = threading.RLock()
self._lock_owner_tid: int | None = None
self._lock_owner_name: str | None = None
def _effective_lock_timeout(self, timeout_override: float | None = None) -> float:
"""调试模式下缩短锁等待,避免单步时表现为 UI 长时间无响应。"""
if timeout_override is not None:
return timeout_override
if sys.gettrace() is not None:
return _DEBUG_LOCK_TIMEOUT_SECONDS
return _LOCK_TIMEOUT_SECONDS
@contextmanager
def _acquire_service_lock(self, op_name: str, timeout_override: float | None = None):
timeout = self._effective_lock_timeout(timeout_override)
current = threading.current_thread()
log.info(
"SignalService.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s",
op_name,
timeout,
threading.get_ident(),
current.name,
self._lock_owner_tid,
self._lock_owner_name,
)
acquired = self._lock.acquire(timeout=timeout)
if not acquired:
raise UcdError(
"UCD busy: lock timeout in "
f"SignalService.{op_name} ({timeout:.1f}s), "
f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}"
)
prev_owner_tid = self._lock_owner_tid
prev_owner_name = self._lock_owner_name
self._lock_owner_tid = threading.get_ident()
self._lock_owner_name = current.name
log.info(
"SignalService.%s lock acquired tid=%s thread=%s",
op_name,
self._lock_owner_tid,
self._lock_owner_name,
)
try:
yield
finally:
self._lock_owner_tid = prev_owner_tid
self._lock_owner_name = prev_owner_name
self._lock.release()
2026-05-24 10:49:28 +08:00
# -- 高层接口 ------------------------------------------------
def apply(
self,
*,
signal: SignalFormat,
timing: TimingSpec,
pattern: PatternSpec,
) -> bool:
"""一次性提交信号格式 + timing + 图案。
Returns:
``format_changed``本次相对上一次 :meth:`apply` 是否变化
"""
with self._acquire_service_lock("apply"):
2026-05-24 10:49:28 +08:00
log.info(
"SignalService.apply signal=%s timing=%s pattern=%s",
signal,
timing,
pattern.kind.value,
)
changed = self._dev.configure(signal, timing)
self._dev.set_pattern(pattern)
self._dev.apply()
return changed
def send_pattern(self, pattern: PatternSpec) -> None:
"""在已 configure 的信号上仅更新图案后 apply。"""
with self._acquire_service_lock("send_pattern", _PATTERN_LOCK_TIMEOUT_SECONDS):
2026-05-24 10:49:28 +08:00
log.info("SignalService.send_pattern pattern=%s", pattern.kind.value)
self._dev.set_pattern(pattern)
self._dev.apply()
def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None:
self.send_pattern(solid_rgb_pattern(rgb))
def send_image(self, path: str) -> None:
self.send_pattern(image_pattern(path))
2026-05-24 11:02:37 +08:00
# -- 过渡期 APIPhase 2-----------------------------------
# 现有 GUI 回调以"仅更新信号格式、不切换图案"的方式调用
# ``ucd.apply_signal_format(color_space=..., color_format=..., bit_depth=...)``。
# 新代码统一通过本方法走 SignalService内部仍委托给底层
# controller 的同名旧接口,迁移完成后将替换为纯净实现。
def update_signal_format(
self,
*,
color_space: str,
output_format: str,
bit_depth: str,
data_range: str = "Full",
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
"""仅将信号格式 stage 到 SDK沿用上一次的 timing不切换图案。
UI 字符串先经域层解析做参数校验解析失败抛 :class:`UcdConfigError`
"""
# 解析仅做校验;当前实现走 raw controller 的旧 API
_ = build_signal_format(
color_space=color_space,
output_format=output_format,
bit_depth=bit_depth,
data_range=data_range,
)
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("update_signal_format 暂仅支持 UCD323Device")
with self._acquire_service_lock("update_signal_format"):
2026-05-24 11:02:37 +08:00
return bool(
ctrl.apply_signal_format(
color_space=color_space,
color_format=output_format,
bit_depth=bit_depth,
data_range=data_range,
max_cll=max_cll,
max_fall=max_fall,
)
)
2026-05-24 10:49:28 +08:00
# -- 透传给上层的查询 ---------------------------------------
@property
def device(self) -> IUcdDevice:
return self._dev
def current_resolution(self) -> tuple[int, int]:
return self._dev.current_resolution()
2026-05-24 11:21:30 +08:00
@property
def is_connected(self) -> bool:
"""UCD 设备是否已打开。供 GUI 做前置校验。"""
ctrl = getattr(self._dev, "raw_controller", None)
return bool(ctrl and getattr(ctrl, "status", False))
# -- 过渡期 APIPhase 5config 驱动的写入 -----------------
# 现有 GUI / Service 通过 ``PQConfig`` 对象描述当次测试参数,
# 由 :class:`UCDController.set_ucd_params` 翻译为色彩/Timing/Pattern。
# 在配置层重构落地前,这两个方法作为 SignalService 的统一入口,
# 让上层不再直接接触 ``self.app.ucd``。
def apply_config(self, config) -> bool:
"""按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern不 apply 输出)。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_config 暂仅支持 UCD323Device")
with self._acquire_service_lock("apply_config"):
2026-05-24 11:21:30 +08:00
return bool(ctrl.set_ucd_params(config))
def send_pattern_params(self, params) -> bool:
"""以 ``params`` 更新当前 pattern 的参数并 apply。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("send_pattern_params 暂仅支持 UCD323Device")
with self._acquire_service_lock("send_pattern_params"):
2026-05-24 11:21:30 +08:00
return bool(ctrl.send_current_pattern_params(params))
def apply_and_run(self, config, pattern_params) -> bool:
"""``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。
服务于 custom_template_panel 单步流程
"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_and_run 暂仅支持 UCD323Device")
with self._acquire_service_lock("apply_and_run"):
2026-05-24 11:21:30 +08:00
if not ctrl.set_ucd_params(config):
return False
if not ctrl.set_pattern(ctrl.current_pattern, pattern_params):
return False
return bool(ctrl.run())
2026-05-24 10:49:28 +08:00
__all__ = [
"SignalService",
"build_signal_format",
"build_timing",
"solid_rgb_pattern",
"image_pattern",
# 重导出常用域类型方便上层 import 一次到位
"SignalFormat",
"TimingSpec",
"PatternSpec",
"PatternKind",
"Colorimetry",
"DynamicRange",
"UcdError",
]