Files
pqAutomationApp/drivers/ucd_driver.py
2026-05-24 10:49:28 +08:00

437 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""UCD 驱动层。
唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口,以及实现:
:class:`UCD323Device`(生产)和 :class:`FakeUcdDevice`(单测)。
Phase 1 实现策略
-----------------
为保证零行为变更,:class:`UCD323Device` 当前**内部委托**给已有的
:class:`drivers.UCD323_Function.UCDController`。后续 Phase 2 会将
SDK 调用直接搬入本模块,届时可删除旧 ``UCDController`` 文件。
文件分区:
§1 DeviceInfo / list_devices
§2 IUcdDevice 抽象接口
§3 UCD323Device 真实实现
§4 FakeUcdDevice 单测实现
"""
from __future__ import annotations
import logging
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING
from app.ucd_domain import (
ConnectionChanged,
EventBus,
Interface,
PatternApplied,
PatternKind,
PatternSpec,
SignalApplied,
SignalFormat,
TimingSpec,
UcdApplyFailed,
UcdConfigError,
UcdNotConnected,
UcdSdkError,
UcdState,
UcdStateError,
assert_transition,
)
if TYPE_CHECKING:
from drivers.UCD323_Function import UCDController
log = logging.getLogger(__name__)
# ─── §1 DeviceInfo / list_devices ────────────────────────────────
@dataclass(frozen=True)
class DeviceInfo:
"""UCD 设备发现条目。
``display`` 是 SDK 给出的完整字符串(``"0: UCD-323 #12345678"``
``index`` / ``serial`` / ``model`` 通过解析得到,解析失败时为 None。
"""
display: str
index: int | None = None
serial: str | None = None
model: str | None = None
@classmethod
def parse(cls, display: str) -> "DeviceInfo":
idx: int | None = None
model: str | None = None
serial: str | None = None
try:
head, rest = display.split(":", 1)
idx = int(head.strip())
rest = rest.strip()
# 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)"
tokens = rest.split()
if tokens:
model = tokens[0]
for tok in tokens[1:]:
if tok.startswith("#") and len(tok) >= 2:
serial = tok.lstrip("#")
break
except Exception: # noqa: BLE001 - 解析失败保留原 display 即可
pass
return cls(display=display, index=idx, serial=serial, model=model)
def list_devices(controller: "UCDController") -> list[DeviceInfo]:
"""通过给定的底层 controller 枚举可用 UCD 设备。"""
try:
raw_list = controller.search_device()
except Exception as exc: # noqa: BLE001
raise UcdSdkError("枚举 UCD 设备失败") from exc
return [DeviceInfo.parse(s) for s in (raw_list or [])]
# ─── §2 IUcdDevice 抽象接口 ──────────────────────────────────────
class IUcdDevice(ABC):
"""UCD 信号发生器抽象设备。
上层Service / GUI**只**通过本接口操作硬件,不得穿透到
UniTAP SDK 或具体实现细节。
"""
@property
@abstractmethod
def state(self) -> UcdState: ...
@property
@abstractmethod
def info(self) -> DeviceInfo | None: ...
@abstractmethod
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
"""打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。"""
@abstractmethod
def close(self) -> None:
"""关闭设备(幂等)。"""
@abstractmethod
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
"""写入信号格式与 timing未 apply。返回 ``format_changed``。"""
@abstractmethod
def set_pattern(self, pattern: PatternSpec) -> None:
"""设置当前图案(未 apply"""
@abstractmethod
def apply(self) -> None:
"""将已配置的信号格式 + 图案一次性提交给硬件。"""
@abstractmethod
def current_resolution(self) -> tuple[int, int]:
"""读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。"""
# ─── §3 UCD323Device 真实实现 ────────────────────────────────────
class UCD323Device(IUcdDevice):
"""生产环境实现。内部委托给传统 :class:`UCDController`Phase 1"""
def __init__(self, bus: EventBus, controller: "UCDController | None" = None):
from drivers.UCD323_Function import UCDController as _UCDController
self._bus = bus
self._controller: "UCDController" = controller or _UCDController()
self._state: UcdState = UcdState.CLOSED
self._info: DeviceInfo | None = None
self._interface: Interface = Interface.HDMI
self._lock = threading.RLock()
self._curr_signal: SignalFormat | None = None
self._curr_timing: TimingSpec | None = None
self._curr_pattern: PatternSpec | None = None
self._last_applied: tuple[SignalFormat, TimingSpec] | None = None
# -- 读访问 --------------------------------------------------
@property
def state(self) -> UcdState:
return self._state
@property
def info(self) -> DeviceInfo | None:
return self._info
@property
def raw_controller(self) -> "UCDController":
"""Phase 1 过渡期:给暂未迁移的旧调用点的逃生通道。
新代码**不**应使用本属性,迁移完成后即可删除。
"""
return self._controller
# -- 生命周期 ------------------------------------------------
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
with self._lock:
assert_transition(self._state, UcdState.OPENED)
if interface is not Interface.HDMI:
# Phase 1底层 UCDController.open() 写死了 HDMISource。
raise UcdConfigError(
f"暂不支持接口 {interface.value};当前仅实现 HDMI"
)
try:
ok = self._controller.open(info.display)
except Exception as exc: # noqa: BLE001
raise UcdSdkError(f"打开设备失败: {info.display}") from exc
if not ok:
raise UcdSdkError(f"打开设备失败: {info.display}")
self._info = info
self._interface = interface
self._state = UcdState.OPENED
self._bus.publish(ConnectionChanged(True, info.serial))
def close(self) -> None:
with self._lock:
if self._state == UcdState.CLOSED:
return
try:
self._controller.close()
except Exception: # noqa: BLE001
log.exception("关闭 UCD 时发生异常")
self._state = UcdState.CLOSED
self._curr_signal = None
self._curr_timing = None
self._curr_pattern = None
self._last_applied = None
prev_serial = self._info.serial if self._info else None
self._info = None
self._bus.publish(ConnectionChanged(False, prev_serial))
# -- 配置 ----------------------------------------------------
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
with self._lock:
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 configure")
try:
# 颜色模式color_format / bpc / colorimetry
if not self._controller.set_color_mode(
signal.color_format.value,
int(signal.bpc),
_colorimetry_to_legacy_key(signal),
):
raise UcdConfigError(
f"set_color_mode 失败: {signal!r}"
)
# dynamic_range 在新接口中是一等公民
self._apply_dynamic_range(signal)
# Timing
if not self._controller.set_timing_from_string(str(timing)):
raise UcdConfigError(f"set_timing_from_string 失败: {timing}")
except UcdConfigError:
raise
except Exception as exc: # noqa: BLE001
raise UcdSdkError("configure 异常") from exc
self._curr_signal = signal
self._curr_timing = timing
self._state = UcdState.CONFIGURED
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
with self._lock:
if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED):
raise UcdStateError(
f"set_pattern 需要 CONFIGURED/APPLIED 状态,当前 {self._state.name}"
)
self._curr_pattern = pattern
# 仅本地暂存,真正写硬件在 apply()
def apply(self) -> None:
with self._lock:
if self._curr_signal is None or self._curr_timing is None:
raise UcdStateError("apply 前必须先 configure")
if self._curr_pattern is None:
raise UcdStateError("apply 前必须先 set_pattern")
try:
ok = self._apply_pattern_via_controller(self._curr_pattern)
except Exception as exc: # noqa: BLE001
raise UcdSdkError("apply 异常") from exc
if not ok:
raise UcdApplyFailed(
f"apply 失败: pattern={self._curr_pattern.kind.value}"
)
changed = (self._curr_signal, self._curr_timing) != self._last_applied
self._last_applied = (self._curr_signal, self._curr_timing)
self._state = UcdState.APPLIED
self._bus.publish(
SignalApplied(self._curr_signal, self._curr_timing, changed)
)
self._bus.publish(PatternApplied(self._curr_pattern))
# -- 查询 ----------------------------------------------------
def current_resolution(self) -> tuple[int, int]:
try:
return self._controller.get_current_resolution((3840, 2160))
except Exception: # noqa: BLE001
return (3840, 2160)
# -- 内部辅助 ------------------------------------------------
def _apply_dynamic_range(self, signal: SignalFormat) -> None:
import UniTAP # 局部导入,避免本模块在无 SDK 环境下导入即失败
from app.ucd_domain import DynamicRange
ci = self._controller.color_info
if ci is None:
return
if signal.dynamic_range is DynamicRange.FULL:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA
else:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool:
"""根据 PatternKind 走最合适的旧 controller 路径。"""
if pattern.kind is PatternKind.IMAGE:
if not pattern.image_path:
raise UcdConfigError("IMAGE pattern 必须提供 image_path")
return bool(self._controller.send_image_pattern(pattern.image_path))
# 预定义图案路径:复用 controller.set_pattern + run()
from drivers.UCD323_Enum import UCDEnum # 局部导入避免循环
video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value)
if video_pattern is None:
raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}")
self._controller.current_pattern = video_pattern
params: list[int] | None = None
if pattern.kind is PatternKind.SOLID:
if pattern.solid_rgb is None:
raise UcdConfigError("SOLID pattern 必须提供 solid_rgb")
params = list(pattern.solid_rgb)
elif pattern.extras:
params = list(pattern.extras)
if not self._controller.set_pattern(video_pattern, params):
raise UcdApplyFailed("controller.set_pattern 返回 False")
return bool(self._controller.run())
def _colorimetry_to_legacy_key(signal: SignalFormat) -> str:
"""新 :class:`Colorimetry` → 旧 ``UCDEnum.ColorInfo.get_colorimetry`` 的 key。
BT.2020 在 YCbCr / RGB 输出下走不同 SDK 枚举(参考旧
``_get_colorimetry_from_color_space`` 的逻辑),这里也做同样的分支。
"""
from app.ucd_domain import Colorimetry, is_ycbcr
cm = signal.colorimetry
ycbcr = is_ycbcr(signal.color_format)
if cm is Colorimetry.BT2020:
return "bt2020ycbcr" if ycbcr else "bt2020rgb"
return {
Colorimetry.SRGB: "srgb",
Colorimetry.BT709: "bt709",
Colorimetry.BT601: "bt601",
Colorimetry.DCI_P3: "dcip3",
Colorimetry.ADOBE_RGB: "adobergb",
}.get(cm, "srgb")
# ─── §4 FakeUcdDevice 单测实现 ───────────────────────────────────
class FakeUcdDevice(IUcdDevice):
"""无硬件依赖的 Fake 实现;记录调用序列供单测断言。"""
def __init__(self, bus: EventBus | None = None) -> None:
self._bus = bus or EventBus()
self._state = UcdState.CLOSED
self._info: DeviceInfo | None = None
self._signal: SignalFormat | None = None
self._timing: TimingSpec | None = None
self._pattern: PatternSpec | None = None
self._last_applied: tuple[SignalFormat, TimingSpec] | None = None
self.calls: list[tuple] = [] # ("open", info) / ("configure", ...) ...
@property
def state(self) -> UcdState:
return self._state
@property
def info(self) -> DeviceInfo | None:
return self._info
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
assert_transition(self._state, UcdState.OPENED)
self.calls.append(("open", info, interface))
self._info = info
self._state = UcdState.OPENED
self._bus.publish(ConnectionChanged(True, info.serial))
def close(self) -> None:
if self._state == UcdState.CLOSED:
return
self.calls.append(("close",))
self._state = UcdState.CLOSED
prev = self._info.serial if self._info else None
self._info = None
self._signal = self._timing = self._pattern = None
self._last_applied = None
self._bus.publish(ConnectionChanged(False, prev))
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("configure", signal, timing))
self._signal, self._timing = signal, timing
self._state = UcdState.CONFIGURED
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED):
raise UcdStateError(f"非法状态 {self._state.name}")
self.calls.append(("set_pattern", pattern))
self._pattern = pattern
def apply(self) -> None:
if self._signal is None or self._timing is None:
raise UcdStateError("apply 前必须 configure")
if self._pattern is None:
raise UcdStateError("apply 前必须 set_pattern")
self.calls.append(("apply",))
changed = (self._signal, self._timing) != self._last_applied
self._last_applied = (self._signal, self._timing)
self._state = UcdState.APPLIED
self._bus.publish(SignalApplied(self._signal, self._timing, changed))
self._bus.publish(PatternApplied(self._pattern))
def current_resolution(self) -> tuple[int, int]:
if self._timing is None:
return (3840, 2160)
return (self._timing.width, self._timing.height)
__all__ = [
"DeviceInfo",
"list_devices",
"IUcdDevice",
"UCD323Device",
"FakeUcdDevice",
]