Files
pqAutomationApp/drivers/ucd_driver.py

437 lines
16 KiB
Python
Raw Normal View History

2026-05-24 10:49:28 +08:00
"""UCD 驱动层。
唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口以及实现
:class:`UCD323Device`生产 :class:`FakeUcdDevice`单测
Phase 1 实现策略
-----------------
为保证零行为变更:class:`UCD323Device` 当前**内部委托**给已有的
:class:`drivers.UCD323_Function.UCDController`后续 Phase 2 会将
SDK 调用直接搬入本模块届时可删除旧 ``UCDController`` 文件
文件分区
§1 DeviceInfo / list_devices
§2 IUcdDevice 抽象接口
§3 UCD323Device 真实实现
§4 FakeUcdDevice 单测实现
"""
from __future__ import annotations
import logging
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING
from app.ucd_domain import (
ConnectionChanged,
EventBus,
Interface,
PatternApplied,
PatternKind,
PatternSpec,
SignalApplied,
SignalFormat,
TimingSpec,
UcdApplyFailed,
UcdConfigError,
UcdNotConnected,
UcdSdkError,
UcdState,
UcdStateError,
assert_transition,
)
if TYPE_CHECKING:
from drivers.UCD323_Function import UCDController
log = logging.getLogger(__name__)
# ─── §1 DeviceInfo / list_devices ────────────────────────────────
@dataclass(frozen=True)
class DeviceInfo:
"""UCD 设备发现条目。
``display`` SDK 给出的完整字符串``"0: UCD-323 #12345678"``
``index`` / ``serial`` / ``model`` 通过解析得到解析失败时为 None
"""
display: str
index: int | None = None
serial: str | None = None
model: str | None = None
@classmethod
def parse(cls, display: str) -> "DeviceInfo":
idx: int | None = None
model: str | None = None
serial: str | None = None
try:
head, rest = display.split(":", 1)
idx = int(head.strip())
rest = rest.strip()
# 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)"
tokens = rest.split()
if tokens:
model = tokens[0]
for tok in tokens[1:]:
if tok.startswith("#") and len(tok) >= 2:
serial = tok.lstrip("#")
break
except Exception: # noqa: BLE001 - 解析失败保留原 display 即可
pass
return cls(display=display, index=idx, serial=serial, model=model)
def list_devices(controller: "UCDController") -> list[DeviceInfo]:
"""通过给定的底层 controller 枚举可用 UCD 设备。"""
try:
raw_list = controller.search_device()
except Exception as exc: # noqa: BLE001
raise UcdSdkError("枚举 UCD 设备失败") from exc
return [DeviceInfo.parse(s) for s in (raw_list or [])]
# ─── §2 IUcdDevice 抽象接口 ──────────────────────────────────────
class IUcdDevice(ABC):
"""UCD 信号发生器抽象设备。
上层Service / GUI****通过本接口操作硬件不得穿透到
UniTAP SDK 或具体实现细节
"""
@property
@abstractmethod
def state(self) -> UcdState: ...
@property
@abstractmethod
def info(self) -> DeviceInfo | None: ...
@abstractmethod
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
"""打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。"""
@abstractmethod
def close(self) -> None:
"""关闭设备(幂等)。"""
@abstractmethod
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
"""写入信号格式与 timing未 apply。返回 ``format_changed``。"""
@abstractmethod
def set_pattern(self, pattern: PatternSpec) -> None:
"""设置当前图案(未 apply"""
@abstractmethod
def apply(self) -> None:
"""将已配置的信号格式 + 图案一次性提交给硬件。"""
@abstractmethod
def current_resolution(self) -> tuple[int, int]:
"""读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。"""
# ─── §3 UCD323Device 真实实现 ────────────────────────────────────
class UCD323Device(IUcdDevice):
"""生产环境实现。内部委托给传统 :class:`UCDController`Phase 1"""
def __init__(self, bus: EventBus, controller: "UCDController | None" = None):
from drivers.UCD323_Function import UCDController as _UCDController
self._bus = bus
self._controller: "UCDController" = controller or _UCDController()
self._state: UcdState = UcdState.CLOSED
self._info: DeviceInfo | None = None
self._interface: Interface = Interface.HDMI
self._lock = threading.RLock()
self._curr_signal: SignalFormat | None = None
self._curr_timing: TimingSpec | None = None
self._curr_pattern: PatternSpec | None = None
self._last_applied: tuple[SignalFormat, TimingSpec] | None = None
# -- 读访问 --------------------------------------------------
@property
def state(self) -> UcdState:
return self._state
@property
def info(self) -> DeviceInfo | None:
return self._info
@property
def raw_controller(self) -> "UCDController":
"""Phase 1 过渡期:给暂未迁移的旧调用点的逃生通道。
新代码****应使用本属性迁移完成后即可删除
"""
return self._controller
# -- 生命周期 ------------------------------------------------
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
with self._lock:
assert_transition(self._state, UcdState.OPENED)
if interface is not Interface.HDMI:
# Phase 1底层 UCDController.open() 写死了 HDMISource。
raise UcdConfigError(
f"暂不支持接口 {interface.value};当前仅实现 HDMI"
)
try:
ok = self._controller.open(info.display)
except Exception as exc: # noqa: BLE001
raise UcdSdkError(f"打开设备失败: {info.display}") from exc
if not ok:
raise UcdSdkError(f"打开设备失败: {info.display}")
self._info = info
self._interface = interface
self._state = UcdState.OPENED
self._bus.publish(ConnectionChanged(True, info.serial))
def close(self) -> None:
with self._lock:
if self._state == UcdState.CLOSED:
return
try:
self._controller.close()
except Exception: # noqa: BLE001
log.exception("关闭 UCD 时发生异常")
self._state = UcdState.CLOSED
self._curr_signal = None
self._curr_timing = None
self._curr_pattern = None
self._last_applied = None
prev_serial = self._info.serial if self._info else None
self._info = None
self._bus.publish(ConnectionChanged(False, prev_serial))
# -- 配置 ----------------------------------------------------
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
with self._lock:
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 configure")
try:
# 颜色模式color_format / bpc / colorimetry
if not self._controller.set_color_mode(
signal.color_format.value,
int(signal.bpc),
_colorimetry_to_legacy_key(signal),
):
raise UcdConfigError(
f"set_color_mode 失败: {signal!r}"
)
# dynamic_range 在新接口中是一等公民
self._apply_dynamic_range(signal)
# Timing
if not self._controller.set_timing_from_string(str(timing)):
raise UcdConfigError(f"set_timing_from_string 失败: {timing}")
except UcdConfigError:
raise
except Exception as exc: # noqa: BLE001
raise UcdSdkError("configure 异常") from exc
self._curr_signal = signal
self._curr_timing = timing
self._state = UcdState.CONFIGURED
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
with self._lock:
if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED):
raise UcdStateError(
f"set_pattern 需要 CONFIGURED/APPLIED 状态,当前 {self._state.name}"
)
self._curr_pattern = pattern
# 仅本地暂存,真正写硬件在 apply()
def apply(self) -> None:
with self._lock:
if self._curr_signal is None or self._curr_timing is None:
raise UcdStateError("apply 前必须先 configure")
if self._curr_pattern is None:
raise UcdStateError("apply 前必须先 set_pattern")
try:
ok = self._apply_pattern_via_controller(self._curr_pattern)
except Exception as exc: # noqa: BLE001
raise UcdSdkError("apply 异常") from exc
if not ok:
raise UcdApplyFailed(
f"apply 失败: pattern={self._curr_pattern.kind.value}"
)
changed = (self._curr_signal, self._curr_timing) != self._last_applied
self._last_applied = (self._curr_signal, self._curr_timing)
self._state = UcdState.APPLIED
self._bus.publish(
SignalApplied(self._curr_signal, self._curr_timing, changed)
)
self._bus.publish(PatternApplied(self._curr_pattern))
# -- 查询 ----------------------------------------------------
def current_resolution(self) -> tuple[int, int]:
try:
return self._controller.get_current_resolution((3840, 2160))
except Exception: # noqa: BLE001
return (3840, 2160)
# -- 内部辅助 ------------------------------------------------
def _apply_dynamic_range(self, signal: SignalFormat) -> None:
import UniTAP # 局部导入,避免本模块在无 SDK 环境下导入即失败
from app.ucd_domain import DynamicRange
ci = self._controller.color_info
if ci is None:
return
if signal.dynamic_range is DynamicRange.FULL:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA
else:
ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA
def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool:
"""根据 PatternKind 走最合适的旧 controller 路径。"""
if pattern.kind is PatternKind.IMAGE:
if not pattern.image_path:
raise UcdConfigError("IMAGE pattern 必须提供 image_path")
return bool(self._controller.send_image_pattern(pattern.image_path))
# 预定义图案路径:复用 controller.set_pattern + run()
from drivers.UCD323_Enum import UCDEnum # 局部导入避免循环
video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value)
if video_pattern is None:
raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}")
self._controller.current_pattern = video_pattern
params: list[int] | None = None
if pattern.kind is PatternKind.SOLID:
if pattern.solid_rgb is None:
raise UcdConfigError("SOLID pattern 必须提供 solid_rgb")
params = list(pattern.solid_rgb)
elif pattern.extras:
params = list(pattern.extras)
if not self._controller.set_pattern(video_pattern, params):
raise UcdApplyFailed("controller.set_pattern 返回 False")
return bool(self._controller.run())
def _colorimetry_to_legacy_key(signal: SignalFormat) -> str:
"""新 :class:`Colorimetry` → 旧 ``UCDEnum.ColorInfo.get_colorimetry`` 的 key。
BT.2020 YCbCr / RGB 输出下走不同 SDK 枚举参考旧
``_get_colorimetry_from_color_space`` 的逻辑这里也做同样的分支
"""
from app.ucd_domain import Colorimetry, is_ycbcr
cm = signal.colorimetry
ycbcr = is_ycbcr(signal.color_format)
if cm is Colorimetry.BT2020:
return "bt2020ycbcr" if ycbcr else "bt2020rgb"
return {
Colorimetry.SRGB: "srgb",
Colorimetry.BT709: "bt709",
Colorimetry.BT601: "bt601",
Colorimetry.DCI_P3: "dcip3",
Colorimetry.ADOBE_RGB: "adobergb",
}.get(cm, "srgb")
# ─── §4 FakeUcdDevice 单测实现 ───────────────────────────────────
class FakeUcdDevice(IUcdDevice):
"""无硬件依赖的 Fake 实现;记录调用序列供单测断言。"""
def __init__(self, bus: EventBus | None = None) -> None:
self._bus = bus or EventBus()
self._state = UcdState.CLOSED
self._info: DeviceInfo | None = None
self._signal: SignalFormat | None = None
self._timing: TimingSpec | None = None
self._pattern: PatternSpec | None = None
self._last_applied: tuple[SignalFormat, TimingSpec] | None = None
self.calls: list[tuple] = [] # ("open", info) / ("configure", ...) ...
@property
def state(self) -> UcdState:
return self._state
@property
def info(self) -> DeviceInfo | None:
return self._info
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
assert_transition(self._state, UcdState.OPENED)
self.calls.append(("open", info, interface))
self._info = info
self._state = UcdState.OPENED
self._bus.publish(ConnectionChanged(True, info.serial))
def close(self) -> None:
if self._state == UcdState.CLOSED:
return
self.calls.append(("close",))
self._state = UcdState.CLOSED
prev = self._info.serial if self._info else None
self._info = None
self._signal = self._timing = self._pattern = None
self._last_applied = None
self._bus.publish(ConnectionChanged(False, prev))
def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("configure", signal, timing))
self._signal, self._timing = signal, timing
self._state = UcdState.CONFIGURED
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED):
raise UcdStateError(f"非法状态 {self._state.name}")
self.calls.append(("set_pattern", pattern))
self._pattern = pattern
def apply(self) -> None:
if self._signal is None or self._timing is None:
raise UcdStateError("apply 前必须 configure")
if self._pattern is None:
raise UcdStateError("apply 前必须 set_pattern")
self.calls.append(("apply",))
changed = (self._signal, self._timing) != self._last_applied
self._last_applied = (self._signal, self._timing)
self._state = UcdState.APPLIED
self._bus.publish(SignalApplied(self._signal, self._timing, changed))
self._bus.publish(PatternApplied(self._pattern))
def current_resolution(self) -> tuple[int, int]:
if self._timing is None:
return (3840, 2160)
return (self._timing.width, self._timing.height)
__all__ = [
"DeviceInfo",
"list_devices",
"IUcdDevice",
"UCD323Device",
"FakeUcdDevice",
]