diff --git a/app/services/ucd_service.py b/app/services/ucd_service.py new file mode 100644 index 0000000..925c0a6 --- /dev/null +++ b/app/services/ucd_service.py @@ -0,0 +1,152 @@ +"""UCD 信号 / 图案应用服务层。 + +服务层是 GUI ↔ Driver 的唯一通道,负责: +- 将 UI 字符串("BT.709"、"10bit"、"YCbCr 4:4:4" 等)翻译成 :class:`SignalFormat`; +- 将各 panel 的 timing 字符串翻译成 :class:`TimingSpec`; +- 协调 :meth:`IUcdDevice.configure` / ``set_pattern`` / ``apply`` 的调用顺序; +- 通过 :class:`EventBus` 让 GUI 订阅状态变化,而非主动轮询。 + +本层不直接 import UniTAP,也不读取 :mod:`tkinter` 变量; +所有输入都是显式参数,便于单测。 +""" + +from __future__ import annotations + +import logging +import threading + +from app.ucd_domain import ( + Colorimetry, + DynamicRange, + EventBus, + PatternKind, + PatternSpec, + SignalFormat, + TimingSpec, + UcdError, + bit_depth_str_to_bpc, + color_space_to_colorimetry, + data_range_to_dynamic_range, + output_format_to_color_format, + parse_timing_str, +) +from drivers.ucd_driver import IUcdDevice + +log = logging.getLogger(__name__) + + +# ─── 视图字符串 → 值对象 转换工具 ──────────────────────────────── + + +def build_signal_format( + *, + color_space: str, + output_format: str, + bit_depth: str, + data_range: str = "Full", +) -> SignalFormat: + """根据下拉框字符串组装 :class:`SignalFormat`。 + + 各参数解析失败抛 :class:`UcdConfigError`。 + """ + return SignalFormat( + color_format=output_format_to_color_format(output_format), + colorimetry=color_space_to_colorimetry(color_space), + bpc=bit_depth_str_to_bpc(bit_depth), + dynamic_range=data_range_to_dynamic_range(data_range), + ) + + +def build_timing(timing_str: str) -> TimingSpec: + """``"DMT 3840x2160@60Hz"`` → :class:`TimingSpec`。""" + return parse_timing_str(timing_str) + + +def solid_rgb_pattern(rgb: tuple[int, int, int] | list[int]) -> PatternSpec: + r, g, b = rgb[0], rgb[1], rgb[2] + return PatternSpec(kind=PatternKind.SOLID, solid_rgb=(int(r), int(g), int(b))) + + +def image_pattern(path: str) -> PatternSpec: + return PatternSpec(kind=PatternKind.IMAGE, image_path=path) + + +# ─── 服务 ──────────────────────────────────────────────────────── + + +class SignalService: + """协调 SignalFormat / Timing / Pattern 的写入与提交。 + + 使用线程锁串行化所有对外的 ``apply_*`` 调用,避免多个测试线程 + 同时操作 UCD 造成 SDK 状态错乱。 + """ + + def __init__(self, device: IUcdDevice, bus: EventBus): + self._dev = device + self._bus = bus + self._lock = threading.RLock() + + # -- 高层接口 ------------------------------------------------ + + def apply( + self, + *, + signal: SignalFormat, + timing: TimingSpec, + pattern: PatternSpec, + ) -> bool: + """一次性提交信号格式 + timing + 图案。 + + Returns: + ``format_changed``——本次相对上一次 :meth:`apply` 是否变化。 + """ + with self._lock: + log.info( + "SignalService.apply signal=%s timing=%s pattern=%s", + signal, + timing, + pattern.kind.value, + ) + changed = self._dev.configure(signal, timing) + self._dev.set_pattern(pattern) + self._dev.apply() + return changed + + def send_pattern(self, pattern: PatternSpec) -> None: + """在已 configure 的信号上仅更新图案后 apply。""" + with self._lock: + log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) + self._dev.set_pattern(pattern) + self._dev.apply() + + def send_solid_rgb(self, rgb: tuple[int, int, int] | list[int]) -> None: + self.send_pattern(solid_rgb_pattern(rgb)) + + def send_image(self, path: str) -> None: + self.send_pattern(image_pattern(path)) + + # -- 透传给上层的查询 --------------------------------------- + + @property + def device(self) -> IUcdDevice: + return self._dev + + def current_resolution(self) -> tuple[int, int]: + return self._dev.current_resolution() + + +__all__ = [ + "SignalService", + "build_signal_format", + "build_timing", + "solid_rgb_pattern", + "image_pattern", + # 重导出常用域类型方便上层 import 一次到位 + "SignalFormat", + "TimingSpec", + "PatternSpec", + "PatternKind", + "Colorimetry", + "DynamicRange", + "UcdError", +] diff --git a/app/ucd_domain.py b/app/ucd_domain.py new file mode 100644 index 0000000..dd23ebd --- /dev/null +++ b/app/ucd_domain.py @@ -0,0 +1,388 @@ +"""UCD 控制 Domain 层。 + +纯数据 + 纯函数:枚举、值对象、状态机、错误体系、事件总线、 +业务字符串解析与映射。本模块**不**依赖 UniTAP / 任何硬件; +可用纯单测覆盖。 + +文件分区: + §1 枚举与值对象 + §2 状态机 + §3 错误体系 + §4 事件总线 + §5 业务字符串解析 / 映射 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable + +log = logging.getLogger(__name__) + + +# ─── §1 枚举与值对象 ────────────────────────────────────────────── + + +class Interface(str, Enum): + """UCD 物理输出接口。""" + + HDMI = "HDMI" + DP = "DP" + USBC = "Type-C" + + +class ColorFormat(str, Enum): + """像素颜色格式(与 UI 显示字符串解耦的内部表示)。""" + + RGB = "rgb" + YCBCR_444 = "ycbcr444" + YCBCR_422 = "ycbcr422" + YCBCR_420 = "ycbcr420" + Y_ONLY = "yonly" + IDO_DEFINED = "ido_defined" + RAW = "raw" + DSC = "dsc" + + +class Colorimetry(str, Enum): + """色度空间。""" + + SRGB = "sRGB" + BT709 = "BT.709" + BT601 = "BT.601" + BT2020 = "BT.2020" + DCI_P3 = "DCI-P3" + ADOBE_RGB = "AdobeRGB" + + +class DynamicRange(str, Enum): + FULL = "Full" + LIMITED = "Limited" + + +class TimingStandard(str, Enum): + DMT = "dmt" + CTA = "cta" + CVT = "cvt" + OVT = "ovt" + + +class PatternKind(str, Enum): + """高层图案种类。 + + 与 UCD 内部 VideoPattern 枚举区分:仅暴露业务上真正用到的几种。 + """ + + DISABLED = "disabled" + SOLID = "solidcolor" + SOLID_WHITE = "solidwhite" + SOLID_RED = "solidred" + SOLID_GREEN = "solidgreen" + SOLID_BLUE = "solidblue" + COLOR_BARS = "colorbars" + CHESSBOARD = "chessboard" + WHITE_VSTRIPS = "whitevstrips" + GRADIENT_RGB_STRIPES = "gradientrgbstripes" + COLOR_RAMP = "colorramp" + COLOR_SQUARES = "coloursquares" + MOTION = "motionpattern" + SQUARE_WINDOW = "squarewindow" + IMAGE = "image" # 来自文件路径 + + +@dataclass(frozen=True) +class SignalFormat: + """信号格式(color_info 部分)。""" + + color_format: ColorFormat + colorimetry: Colorimetry + bpc: int # 8 / 10 / 12 + dynamic_range: DynamicRange = DynamicRange.FULL + + +@dataclass(frozen=True) +class TimingSpec: + """显示 Timing 描述。""" + + standard: TimingStandard + width: int + height: int + refresh_hz: float + + def __str__(self) -> str: # 便于日志 + return f"{self.standard.value.upper()} {self.width}x{self.height}@{self.refresh_hz:g}Hz" + + +@dataclass(frozen=True) +class PatternSpec: + """图案描述。 + + 联合字段含义: + SOLID → solid_rgb=(r,g,b) + IMAGE → image_path + 其它预定义图案 → extras 视具体类型 + """ + + kind: PatternKind + solid_rgb: tuple[int, int, int] | None = None + image_path: str | None = None + extras: tuple = field(default_factory=tuple) + + +# ─── §2 状态机 ─────────────────────────────────────────────────── + + +class UcdState(Enum): + CLOSED = 0 + OPENED = 1 # 设备已打开,未配置信号 + CONFIGURED = 2 # SignalFormat + Timing 已写入,未 apply + APPLIED = 3 # 已 apply,硬件正在输出 + + +_ALLOWED: dict[UcdState, set[UcdState]] = { + UcdState.CLOSED: {UcdState.OPENED, UcdState.CLOSED}, + UcdState.OPENED: {UcdState.CONFIGURED, UcdState.CLOSED, UcdState.OPENED}, + UcdState.CONFIGURED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED}, + UcdState.APPLIED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED}, +} + + +def assert_transition(curr: UcdState, nxt: UcdState) -> None: + """校验状态转移。非法转移抛 :class:`UcdStateError`。""" + if nxt not in _ALLOWED[curr]: + raise UcdStateError(f"非法状态转移: {curr.name} -> {nxt.name}") + + +# ─── §3 错误体系 ───────────────────────────────────────────────── + + +class UcdError(Exception): + """UCD 控制相关错误的基类。""" + + +class UcdNotConnected(UcdError): + """设备未打开/未连接。""" + + +class UcdStateError(UcdError): + """状态机非法转移或前置条件不满足。""" + + +class UcdConfigError(UcdError): + """业务配置参数不合法(如不支持的 color_format、解析失败)。""" + + +class UcdApplyFailed(UcdError): + """SDK ``apply`` 返回失败或超时。""" + + +class UcdSdkError(UcdError): + """UniTAP SDK 内部异常的包装;原始异常保存在 ``__cause__``。""" + + +# ─── §4 事件总线 ───────────────────────────────────────────────── + + +@dataclass(frozen=True) +class UcdEvent: + """事件基类。""" + + +@dataclass(frozen=True) +class ConnectionChanged(UcdEvent): + connected: bool + serial: str | None = None + + +@dataclass(frozen=True) +class SignalApplied(UcdEvent): + signal: SignalFormat + timing: TimingSpec + format_changed: bool # 与上一次 apply 相比 (signal,timing) 是否改变 + + +@dataclass(frozen=True) +class PatternApplied(UcdEvent): + pattern: PatternSpec + + +class EventBus: + """极简同步事件总线(按事件类型分发)。 + + 单线程使用;如需跨线程派发,由订阅者自行 marshal 到 UI 线程。 + """ + + def __init__(self) -> None: + self._subs: dict[type, list[Callable[[Any], None]]] = {} + + def subscribe(self, evt_type: type, cb: Callable[[Any], None]) -> None: + self._subs.setdefault(evt_type, []).append(cb) + + def publish(self, evt: UcdEvent) -> None: + for cb in self._subs.get(type(evt), []): + try: + cb(evt) + except Exception: # noqa: BLE001 - 订阅者错误不应影响发布者 + log.exception("UCD 事件处理器抛出异常: %r", evt) + + +# ─── §5 业务字符串解析 / 映射 ──────────────────────────────────── + + +_OUTPUT_FORMAT_TO_COLOR_FORMAT: dict[str, ColorFormat] = { + "RGB": ColorFormat.RGB, + "YCbCr 4:4:4": ColorFormat.YCBCR_444, + "YCbCr 4:2:2": ColorFormat.YCBCR_422, + "YCbCr 4:2:0": ColorFormat.YCBCR_420, + "Y Only": ColorFormat.Y_ONLY, + "IDO Defined": ColorFormat.IDO_DEFINED, + "RAW": ColorFormat.RAW, + "DSC": ColorFormat.DSC, +} + +_COLOR_SPACE_TO_COLORIMETRY: dict[str, Colorimetry] = { + "sRGB": Colorimetry.SRGB, + "BT.709": Colorimetry.BT709, + "BT.601": Colorimetry.BT601, + "BT.2020": Colorimetry.BT2020, + "DCI-P3": Colorimetry.DCI_P3, + "AdobeRGB": Colorimetry.ADOBE_RGB, +} + +_BIT_DEPTH_STR_TO_BPC: dict[str, int] = { + "8bit": 8, + "10bit": 10, + "12bit": 12, +} + +_DATA_RANGE_TO_DYNAMIC_RANGE: dict[str, DynamicRange] = { + "Full": DynamicRange.FULL, + "Limited": DynamicRange.LIMITED, +} + + +def output_format_to_color_format(s: str) -> ColorFormat: + """显示用 ``"YCbCr 4:4:4"`` → :class:`ColorFormat`。未知值视为 RGB。""" + return _OUTPUT_FORMAT_TO_COLOR_FORMAT.get(s, ColorFormat.RGB) + + +def color_space_to_colorimetry(s: str) -> Colorimetry: + """显示用 ``"BT.709"`` → :class:`Colorimetry`。未知抛 :class:`UcdConfigError`。""" + if s in _COLOR_SPACE_TO_COLORIMETRY: + return _COLOR_SPACE_TO_COLORIMETRY[s] + raise UcdConfigError(f"未知色彩空间: {s!r}") + + +def bit_depth_str_to_bpc(s: str) -> int: + """``"10bit"`` → 10。未知抛 :class:`UcdConfigError`。""" + if s in _BIT_DEPTH_STR_TO_BPC: + return _BIT_DEPTH_STR_TO_BPC[s] + raise UcdConfigError(f"未知位深: {s!r}") + + +def data_range_to_dynamic_range(s: str) -> DynamicRange: + if s in _DATA_RANGE_TO_DYNAMIC_RANGE: + return _DATA_RANGE_TO_DYNAMIC_RANGE[s] + raise UcdConfigError(f"未知数据范围: {s!r}") + + +def is_ycbcr(color_format: ColorFormat | str | None) -> bool: + """判断输出是否为 YCbCr 系列。接受 :class:`ColorFormat` 或 UI 字符串。""" + if color_format is None: + return False + if isinstance(color_format, ColorFormat): + return color_format in { + ColorFormat.YCBCR_444, + ColorFormat.YCBCR_422, + ColorFormat.YCBCR_420, + } + return "YCbCr" in str(color_format) + + +def parse_timing_str(timing_str: str) -> TimingSpec: + """解析 ``"DMT 3840x2160@60Hz"`` 风格的字符串为 :class:`TimingSpec`。 + + 宽容处理空格 / 大小写 / ``Hz`` 后缀大小写。 + 解析失败抛 :class:`UcdConfigError`。 + """ + if not isinstance(timing_str, str): + raise UcdConfigError(f"timing_str 必须是字符串: {timing_str!r}") + + s = " ".join(timing_str.strip().split()) + s = s.replace(" x", "x").replace("x ", "x") + + parts = s.split(" ", 1) + if len(parts) < 2: + raise UcdConfigError(f"无法解析 timing: {timing_str!r}") + type_str, rest = parts[0].strip().upper(), parts[1].strip() + + if "@" not in rest: + raise UcdConfigError(f"无法解析 timing (缺少 '@'): {timing_str!r}") + left, right = (p.strip() for p in rest.split("@", 1)) + + if "x" not in left: + raise UcdConfigError(f"无法解析分辨率 (缺少 'x'): {timing_str!r}") + wh = left.split("x") + if len(wh) != 2: + raise UcdConfigError(f"无法解析分辨率: {timing_str!r}") + try: + width, height = int(wh[0]), int(wh[1]) + except ValueError as exc: + raise UcdConfigError(f"分辨率数字解析失败: {timing_str!r}") from exc + + hz_str = right.replace("Hz", "").replace("HZ", "").strip() + try: + refresh_hz = float(hz_str) + except ValueError as exc: + raise UcdConfigError(f"刷新率解析失败: {timing_str!r}") from exc + + try: + standard = TimingStandard(type_str.lower()) + except ValueError as exc: + raise UcdConfigError(f"未知的分辨率类型: {type_str!r}") from exc + + return TimingSpec( + standard=standard, + width=width, + height=height, + refresh_hz=refresh_hz, + ) + + +__all__ = [ + # §1 + "Interface", + "ColorFormat", + "Colorimetry", + "DynamicRange", + "TimingStandard", + "PatternKind", + "SignalFormat", + "TimingSpec", + "PatternSpec", + # §2 + "UcdState", + "assert_transition", + # §3 + "UcdError", + "UcdNotConnected", + "UcdStateError", + "UcdConfigError", + "UcdApplyFailed", + "UcdSdkError", + # §4 + "UcdEvent", + "ConnectionChanged", + "SignalApplied", + "PatternApplied", + "EventBus", + # §5 + "output_format_to_color_format", + "color_space_to_colorimetry", + "bit_depth_str_to_bpc", + "data_range_to_dynamic_range", + "is_ycbcr", + "parse_timing_str", +] diff --git a/drivers/ucd_driver.py b/drivers/ucd_driver.py new file mode 100644 index 0000000..5f68b17 --- /dev/null +++ b/drivers/ucd_driver.py @@ -0,0 +1,436 @@ +"""UCD 驱动层。 + +唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口,以及实现: +:class:`UCD323Device`(生产)和 :class:`FakeUcdDevice`(单测)。 + +Phase 1 实现策略 +----------------- +为保证零行为变更,:class:`UCD323Device` 当前**内部委托**给已有的 +:class:`drivers.UCD323_Function.UCDController`。后续 Phase 2 会将 +SDK 调用直接搬入本模块,届时可删除旧 ``UCDController`` 文件。 + +文件分区: + §1 DeviceInfo / list_devices + §2 IUcdDevice 抽象接口 + §3 UCD323Device 真实实现 + §4 FakeUcdDevice 单测实现 +""" + +from __future__ import annotations + +import logging +import threading +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from app.ucd_domain import ( + ConnectionChanged, + EventBus, + Interface, + PatternApplied, + PatternKind, + PatternSpec, + SignalApplied, + SignalFormat, + TimingSpec, + UcdApplyFailed, + UcdConfigError, + UcdNotConnected, + UcdSdkError, + UcdState, + UcdStateError, + assert_transition, +) + +if TYPE_CHECKING: + from drivers.UCD323_Function import UCDController + +log = logging.getLogger(__name__) + + +# ─── §1 DeviceInfo / list_devices ──────────────────────────────── + + +@dataclass(frozen=True) +class DeviceInfo: + """UCD 设备发现条目。 + + ``display`` 是 SDK 给出的完整字符串(``"0: UCD-323 #12345678"``); + ``index`` / ``serial`` / ``model`` 通过解析得到,解析失败时为 None。 + """ + + display: str + index: int | None = None + serial: str | None = None + model: str | None = None + + @classmethod + def parse(cls, display: str) -> "DeviceInfo": + idx: int | None = None + model: str | None = None + serial: str | None = None + try: + head, rest = display.split(":", 1) + idx = int(head.strip()) + rest = rest.strip() + # 形如 "UCD-323 #12345678" 或 "UCD-323 #12345678 (in use)" + tokens = rest.split() + if tokens: + model = tokens[0] + for tok in tokens[1:]: + if tok.startswith("#") and len(tok) >= 2: + serial = tok.lstrip("#") + break + except Exception: # noqa: BLE001 - 解析失败保留原 display 即可 + pass + return cls(display=display, index=idx, serial=serial, model=model) + + +def list_devices(controller: "UCDController") -> list[DeviceInfo]: + """通过给定的底层 controller 枚举可用 UCD 设备。""" + try: + raw_list = controller.search_device() + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("枚举 UCD 设备失败") from exc + return [DeviceInfo.parse(s) for s in (raw_list or [])] + + +# ─── §2 IUcdDevice 抽象接口 ────────────────────────────────────── + + +class IUcdDevice(ABC): + """UCD 信号发生器抽象设备。 + + 上层(Service / GUI)**只**通过本接口操作硬件,不得穿透到 + UniTAP SDK 或具体实现细节。 + """ + + @property + @abstractmethod + def state(self) -> UcdState: ... + + @property + @abstractmethod + def info(self) -> DeviceInfo | None: ... + + @abstractmethod + def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: + """打开设备并选择接口角色。失败抛 :class:`UcdSdkError` 等。""" + + @abstractmethod + def close(self) -> None: + """关闭设备(幂等)。""" + + @abstractmethod + def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: + """写入信号格式与 timing(未 apply)。返回 ``format_changed``。""" + + @abstractmethod + def set_pattern(self, pattern: PatternSpec) -> None: + """设置当前图案(未 apply)。""" + + @abstractmethod + def apply(self) -> None: + """将已配置的信号格式 + 图案一次性提交给硬件。""" + + @abstractmethod + def current_resolution(self) -> tuple[int, int]: + """读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。""" + + +# ─── §3 UCD323Device 真实实现 ──────────────────────────────────── + + +class UCD323Device(IUcdDevice): + """生产环境实现。内部委托给传统 :class:`UCDController`(Phase 1)。""" + + def __init__(self, bus: EventBus, controller: "UCDController | None" = None): + from drivers.UCD323_Function import UCDController as _UCDController + + self._bus = bus + self._controller: "UCDController" = controller or _UCDController() + self._state: UcdState = UcdState.CLOSED + self._info: DeviceInfo | None = None + self._interface: Interface = Interface.HDMI + self._lock = threading.RLock() + + self._curr_signal: SignalFormat | None = None + self._curr_timing: TimingSpec | None = None + self._curr_pattern: PatternSpec | None = None + self._last_applied: tuple[SignalFormat, TimingSpec] | None = None + + # -- 读访问 -------------------------------------------------- + + @property + def state(self) -> UcdState: + return self._state + + @property + def info(self) -> DeviceInfo | None: + return self._info + + @property + def raw_controller(self) -> "UCDController": + """Phase 1 过渡期:给暂未迁移的旧调用点的逃生通道。 + + 新代码**不**应使用本属性,迁移完成后即可删除。 + """ + return self._controller + + # -- 生命周期 ------------------------------------------------ + + def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: + with self._lock: + assert_transition(self._state, UcdState.OPENED) + if interface is not Interface.HDMI: + # Phase 1:底层 UCDController.open() 写死了 HDMISource。 + raise UcdConfigError( + f"暂不支持接口 {interface.value};当前仅实现 HDMI" + ) + try: + ok = self._controller.open(info.display) + except Exception as exc: # noqa: BLE001 + raise UcdSdkError(f"打开设备失败: {info.display}") from exc + if not ok: + raise UcdSdkError(f"打开设备失败: {info.display}") + self._info = info + self._interface = interface + self._state = UcdState.OPENED + self._bus.publish(ConnectionChanged(True, info.serial)) + + def close(self) -> None: + with self._lock: + if self._state == UcdState.CLOSED: + return + try: + self._controller.close() + except Exception: # noqa: BLE001 + log.exception("关闭 UCD 时发生异常") + self._state = UcdState.CLOSED + self._curr_signal = None + self._curr_timing = None + self._curr_pattern = None + self._last_applied = None + prev_serial = self._info.serial if self._info else None + self._info = None + self._bus.publish(ConnectionChanged(False, prev_serial)) + + # -- 配置 ---------------------------------------------------- + + def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: + with self._lock: + if self._state == UcdState.CLOSED: + raise UcdNotConnected("UCD 未连接,无法 configure") + try: + # 颜色模式(color_format / bpc / colorimetry) + if not self._controller.set_color_mode( + signal.color_format.value, + int(signal.bpc), + _colorimetry_to_legacy_key(signal), + ): + raise UcdConfigError( + f"set_color_mode 失败: {signal!r}" + ) + # dynamic_range 在新接口中是一等公民 + self._apply_dynamic_range(signal) + + # Timing + if not self._controller.set_timing_from_string(str(timing)): + raise UcdConfigError(f"set_timing_from_string 失败: {timing}") + except UcdConfigError: + raise + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("configure 异常") from exc + + self._curr_signal = signal + self._curr_timing = timing + self._state = UcdState.CONFIGURED + return (signal, timing) != self._last_applied + + def set_pattern(self, pattern: PatternSpec) -> None: + with self._lock: + if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED): + raise UcdStateError( + f"set_pattern 需要 CONFIGURED/APPLIED 状态,当前 {self._state.name}" + ) + self._curr_pattern = pattern + # 仅本地暂存,真正写硬件在 apply() + + def apply(self) -> None: + with self._lock: + if self._curr_signal is None or self._curr_timing is None: + raise UcdStateError("apply 前必须先 configure") + if self._curr_pattern is None: + raise UcdStateError("apply 前必须先 set_pattern") + try: + ok = self._apply_pattern_via_controller(self._curr_pattern) + except Exception as exc: # noqa: BLE001 + raise UcdSdkError("apply 异常") from exc + if not ok: + raise UcdApplyFailed( + f"apply 失败: pattern={self._curr_pattern.kind.value}" + ) + + changed = (self._curr_signal, self._curr_timing) != self._last_applied + self._last_applied = (self._curr_signal, self._curr_timing) + self._state = UcdState.APPLIED + self._bus.publish( + SignalApplied(self._curr_signal, self._curr_timing, changed) + ) + self._bus.publish(PatternApplied(self._curr_pattern)) + + # -- 查询 ---------------------------------------------------- + + def current_resolution(self) -> tuple[int, int]: + try: + return self._controller.get_current_resolution((3840, 2160)) + except Exception: # noqa: BLE001 + return (3840, 2160) + + # -- 内部辅助 ------------------------------------------------ + + def _apply_dynamic_range(self, signal: SignalFormat) -> None: + import UniTAP # 局部导入,避免本模块在无 SDK 环境下导入即失败 + + from app.ucd_domain import DynamicRange + + ci = self._controller.color_info + if ci is None: + return + if signal.dynamic_range is DynamicRange.FULL: + ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_VESA + else: + ci.dynamic_range = UniTAP.ColorInfo.DynamicRange.DR_CTA + + def _apply_pattern_via_controller(self, pattern: PatternSpec) -> bool: + """根据 PatternKind 走最合适的旧 controller 路径。""" + if pattern.kind is PatternKind.IMAGE: + if not pattern.image_path: + raise UcdConfigError("IMAGE pattern 必须提供 image_path") + return bool(self._controller.send_image_pattern(pattern.image_path)) + + # 预定义图案路径:复用 controller.set_pattern + run() + from drivers.UCD323_Enum import UCDEnum # 局部导入避免循环 + + video_pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern.kind.value) + if video_pattern is None: + raise UcdConfigError(f"不支持的 PatternKind: {pattern.kind!r}") + self._controller.current_pattern = video_pattern + + params: list[int] | None = None + if pattern.kind is PatternKind.SOLID: + if pattern.solid_rgb is None: + raise UcdConfigError("SOLID pattern 必须提供 solid_rgb") + params = list(pattern.solid_rgb) + elif pattern.extras: + params = list(pattern.extras) + + if not self._controller.set_pattern(video_pattern, params): + raise UcdApplyFailed("controller.set_pattern 返回 False") + return bool(self._controller.run()) + + +def _colorimetry_to_legacy_key(signal: SignalFormat) -> str: + """新 :class:`Colorimetry` → 旧 ``UCDEnum.ColorInfo.get_colorimetry`` 的 key。 + + BT.2020 在 YCbCr / RGB 输出下走不同 SDK 枚举(参考旧 + ``_get_colorimetry_from_color_space`` 的逻辑),这里也做同样的分支。 + """ + from app.ucd_domain import Colorimetry, is_ycbcr + + cm = signal.colorimetry + ycbcr = is_ycbcr(signal.color_format) + + if cm is Colorimetry.BT2020: + return "bt2020ycbcr" if ycbcr else "bt2020rgb" + return { + Colorimetry.SRGB: "srgb", + Colorimetry.BT709: "bt709", + Colorimetry.BT601: "bt601", + Colorimetry.DCI_P3: "dcip3", + Colorimetry.ADOBE_RGB: "adobergb", + }.get(cm, "srgb") + + +# ─── §4 FakeUcdDevice 单测实现 ─────────────────────────────────── + + +class FakeUcdDevice(IUcdDevice): + """无硬件依赖的 Fake 实现;记录调用序列供单测断言。""" + + def __init__(self, bus: EventBus | None = None) -> None: + self._bus = bus or EventBus() + self._state = UcdState.CLOSED + self._info: DeviceInfo | None = None + self._signal: SignalFormat | None = None + self._timing: TimingSpec | None = None + self._pattern: PatternSpec | None = None + self._last_applied: tuple[SignalFormat, TimingSpec] | None = None + self.calls: list[tuple] = [] # ("open", info) / ("configure", ...) ... + + @property + def state(self) -> UcdState: + return self._state + + @property + def info(self) -> DeviceInfo | None: + return self._info + + def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: + assert_transition(self._state, UcdState.OPENED) + self.calls.append(("open", info, interface)) + self._info = info + self._state = UcdState.OPENED + self._bus.publish(ConnectionChanged(True, info.serial)) + + def close(self) -> None: + if self._state == UcdState.CLOSED: + return + self.calls.append(("close",)) + self._state = UcdState.CLOSED + prev = self._info.serial if self._info else None + self._info = None + self._signal = self._timing = self._pattern = None + self._last_applied = None + self._bus.publish(ConnectionChanged(False, prev)) + + def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: + if self._state == UcdState.CLOSED: + raise UcdNotConnected() + self.calls.append(("configure", signal, timing)) + self._signal, self._timing = signal, timing + self._state = UcdState.CONFIGURED + return (signal, timing) != self._last_applied + + def set_pattern(self, pattern: PatternSpec) -> None: + if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED): + raise UcdStateError(f"非法状态 {self._state.name}") + self.calls.append(("set_pattern", pattern)) + self._pattern = pattern + + def apply(self) -> None: + if self._signal is None or self._timing is None: + raise UcdStateError("apply 前必须 configure") + if self._pattern is None: + raise UcdStateError("apply 前必须 set_pattern") + self.calls.append(("apply",)) + changed = (self._signal, self._timing) != self._last_applied + self._last_applied = (self._signal, self._timing) + self._state = UcdState.APPLIED + self._bus.publish(SignalApplied(self._signal, self._timing, changed)) + self._bus.publish(PatternApplied(self._pattern)) + + def current_resolution(self) -> tuple[int, int]: + if self._timing is None: + return (3840, 2160) + return (self._timing.width, self._timing.height) + + +__all__ = [ + "DeviceInfo", + "list_devices", + "IUcdDevice", + "UCD323Device", + "FakeUcdDevice", +] diff --git a/settings/pq_config.json b/settings/pq_config.json index fdd9979..9084d52 100644 --- a/settings/pq_config.json +++ b/settings/pq_config.json @@ -1,5 +1,5 @@ { - "current_test_type": "sdr_movie", + "current_test_type": "screen_module", "test_types": { "screen_module": { "name": "屏模组性能测试", @@ -26,7 +26,7 @@ "test_items": [ "gamut" ], - "timing": "DMT 1920x 1080 @ 60Hz", + "timing": "DMT 1600x 1200 @ 60Hz", "color_format": "RGB", "bpc": 8, "colorimetry": "sRGB",