"""UCD 控制 Domain 层。 纯数据 + 纯函数:枚举、值对象、状态机、错误体系、事件总线、 业务字符串解析与映射。本模块**不**依赖 UniTAP / 任何硬件; 可用纯单测覆盖。 文件分区: §1 枚举与值对象 §2 状态机 §3 错误体系 §4 事件总线 §5 业务字符串解析 / 映射 """ from __future__ import annotations import logging from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable log = logging.getLogger(__name__) # ─── §1 枚举与值对象 ────────────────────────────────────────────── class Interface(str, Enum): """UCD 物理输出接口。""" HDMI = "HDMI" DP = "DP" USBC = "Type-C" class ColorFormat(str, Enum): """像素颜色格式(与 UI 显示字符串解耦的内部表示)。""" RGB = "rgb" YCBCR_444 = "ycbcr444" YCBCR_422 = "ycbcr422" YCBCR_420 = "ycbcr420" Y_ONLY = "yonly" IDO_DEFINED = "ido_defined" RAW = "raw" DSC = "dsc" class Colorimetry(str, Enum): """色度空间。""" SRGB = "sRGB" BT709 = "BT.709" BT601 = "BT.601" BT2020 = "BT.2020" DCI_P3 = "DCI-P3" ADOBE_RGB = "AdobeRGB" class DynamicRange(str, Enum): FULL = "Full" LIMITED = "Limited" class TimingStandard(str, Enum): DMT = "dmt" CTA = "cta" CVT = "cvt" OVT = "ovt" class PatternKind(str, Enum): """高层图案种类。 与 UCD 内部 VideoPattern 枚举区分:仅暴露业务上真正用到的几种。 """ DISABLED = "disabled" SOLID = "solidcolor" SOLID_WHITE = "solidwhite" SOLID_RED = "solidred" SOLID_GREEN = "solidgreen" SOLID_BLUE = "solidblue" COLOR_BARS = "colorbars" CHESSBOARD = "chessboard" WHITE_VSTRIPS = "whitevstrips" GRADIENT_RGB_STRIPES = "gradientrgbstripes" COLOR_RAMP = "colorramp" COLOR_SQUARES = "coloursquares" MOTION = "motionpattern" SQUARE_WINDOW = "squarewindow" IMAGE = "image" # 来自文件路径 @dataclass(frozen=True) class SignalFormat: """信号格式(color_info 部分)。""" color_format: ColorFormat colorimetry: Colorimetry bpc: int # 8 / 10 / 12 dynamic_range: DynamicRange = DynamicRange.FULL @dataclass(frozen=True) class TimingSpec: """显示 Timing 描述。""" standard: TimingStandard width: int height: int refresh_hz: float def __str__(self) -> str: # 便于日志 return f"{self.standard.value.upper()} {self.width}x{self.height}@{self.refresh_hz:g}Hz" @dataclass(frozen=True) class PatternSpec: """图案描述。 联合字段含义: SOLID → solid_rgb=(r,g,b) IMAGE → image_path 其它预定义图案 → extras 视具体类型 """ kind: PatternKind solid_rgb: tuple[int, int, int] | None = None image_path: str | None = None extras: tuple = field(default_factory=tuple) # ─── §2 状态机 ─────────────────────────────────────────────────── class UcdState(Enum): CLOSED = 0 OPENED = 1 # 设备已打开,未配置信号 CONFIGURED = 2 # SignalFormat + Timing 已写入,未 apply APPLIED = 3 # 已 apply,硬件正在输出 _ALLOWED: dict[UcdState, set[UcdState]] = { UcdState.CLOSED: {UcdState.OPENED, UcdState.CLOSED}, UcdState.OPENED: {UcdState.CONFIGURED, UcdState.CLOSED, UcdState.OPENED}, UcdState.CONFIGURED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED}, UcdState.APPLIED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED}, } def assert_transition(curr: UcdState, nxt: UcdState) -> None: """校验状态转移。非法转移抛 :class:`UcdStateError`。""" if nxt not in _ALLOWED[curr]: raise UcdStateError(f"非法状态转移: {curr.name} -> {nxt.name}") # ─── §3 错误体系 ───────────────────────────────────────────────── class UcdError(Exception): """UCD 控制相关错误的基类。""" class UcdNotConnected(UcdError): """设备未打开/未连接。""" class UcdStateError(UcdError): """状态机非法转移或前置条件不满足。""" class UcdConfigError(UcdError): """业务配置参数不合法(如不支持的 color_format、解析失败)。""" class UcdApplyFailed(UcdError): """SDK ``apply`` 返回失败或超时。""" class UcdSdkError(UcdError): """UniTAP SDK 内部异常的包装;原始异常保存在 ``__cause__``。""" # ─── §4 事件总线 ───────────────────────────────────────────────── @dataclass(frozen=True) class UcdEvent: """事件基类。""" @dataclass(frozen=True) class ConnectionChanged(UcdEvent): connected: bool serial: str | None = None @dataclass(frozen=True) class SignalApplied(UcdEvent): signal: SignalFormat timing: TimingSpec format_changed: bool # 与上一次 apply 相比 (signal,timing) 是否改变 @dataclass(frozen=True) class PatternApplied(UcdEvent): pattern: PatternSpec class EventBus: """极简同步事件总线(按事件类型分发)。 单线程使用;如需跨线程派发,由订阅者自行 marshal 到 UI 线程。 """ def __init__(self) -> None: self._subs: dict[type, list[Callable[[Any], None]]] = {} def subscribe(self, evt_type: type, cb: Callable[[Any], None]) -> None: self._subs.setdefault(evt_type, []).append(cb) def publish(self, evt: UcdEvent) -> None: for cb in self._subs.get(type(evt), []): try: cb(evt) except Exception: # noqa: BLE001 - 订阅者错误不应影响发布者 log.exception("UCD 事件处理器抛出异常: %r", evt) # ─── §5 业务字符串解析 / 映射 ──────────────────────────────────── _OUTPUT_FORMAT_TO_COLOR_FORMAT: dict[str, ColorFormat] = { "RGB": ColorFormat.RGB, "YCbCr 4:4:4": ColorFormat.YCBCR_444, "YCbCr 4:2:2": ColorFormat.YCBCR_422, "YCbCr 4:2:0": ColorFormat.YCBCR_420, "Y Only": ColorFormat.Y_ONLY, "IDO Defined": ColorFormat.IDO_DEFINED, "RAW": ColorFormat.RAW, "DSC": ColorFormat.DSC, } _COLOR_SPACE_TO_COLORIMETRY: dict[str, Colorimetry] = { "sRGB": Colorimetry.SRGB, "BT.709": Colorimetry.BT709, "BT.601": Colorimetry.BT601, "BT.2020": Colorimetry.BT2020, "DCI-P3": Colorimetry.DCI_P3, "AdobeRGB": Colorimetry.ADOBE_RGB, } _BIT_DEPTH_STR_TO_BPC: dict[str, int] = { "8bit": 8, "10bit": 10, "12bit": 12, } _DATA_RANGE_TO_DYNAMIC_RANGE: dict[str, DynamicRange] = { "Full": DynamicRange.FULL, "Limited": DynamicRange.LIMITED, } def output_format_to_color_format(s: str) -> ColorFormat: """显示用 ``"YCbCr 4:4:4"`` → :class:`ColorFormat`。未知值视为 RGB。""" return _OUTPUT_FORMAT_TO_COLOR_FORMAT.get(s, ColorFormat.RGB) def color_space_to_colorimetry(s: str) -> Colorimetry: """显示用 ``"BT.709"`` → :class:`Colorimetry`。未知抛 :class:`UcdConfigError`。""" if s in _COLOR_SPACE_TO_COLORIMETRY: return _COLOR_SPACE_TO_COLORIMETRY[s] raise UcdConfigError(f"未知色彩空间: {s!r}") def bit_depth_str_to_bpc(s: str) -> int: """``"10bit"`` → 10。未知抛 :class:`UcdConfigError`。""" if s in _BIT_DEPTH_STR_TO_BPC: return _BIT_DEPTH_STR_TO_BPC[s] raise UcdConfigError(f"未知位深: {s!r}") def data_range_to_dynamic_range(s: str) -> DynamicRange: if s in _DATA_RANGE_TO_DYNAMIC_RANGE: return _DATA_RANGE_TO_DYNAMIC_RANGE[s] raise UcdConfigError(f"未知数据范围: {s!r}") def is_ycbcr(color_format: ColorFormat | str | None) -> bool: """判断输出是否为 YCbCr 系列。接受 :class:`ColorFormat` 或 UI 字符串。""" if color_format is None: return False if isinstance(color_format, ColorFormat): return color_format in { ColorFormat.YCBCR_444, ColorFormat.YCBCR_422, ColorFormat.YCBCR_420, } return "YCbCr" in str(color_format) def parse_timing_str(timing_str: str) -> TimingSpec: """解析 ``"DMT 3840x2160@60Hz"`` 风格的字符串为 :class:`TimingSpec`。 宽容处理空格 / 大小写 / ``Hz`` 后缀大小写。 解析失败抛 :class:`UcdConfigError`。 """ if not isinstance(timing_str, str): raise UcdConfigError(f"timing_str 必须是字符串: {timing_str!r}") s = " ".join(timing_str.strip().split()) s = s.replace(" x", "x").replace("x ", "x") parts = s.split(" ", 1) if len(parts) < 2: raise UcdConfigError(f"无法解析 timing: {timing_str!r}") type_str, rest = parts[0].strip().upper(), parts[1].strip() if "@" not in rest: raise UcdConfigError(f"无法解析 timing (缺少 '@'): {timing_str!r}") left, right = (p.strip() for p in rest.split("@", 1)) if "x" not in left: raise UcdConfigError(f"无法解析分辨率 (缺少 'x'): {timing_str!r}") wh = left.split("x") if len(wh) != 2: raise UcdConfigError(f"无法解析分辨率: {timing_str!r}") try: width, height = int(wh[0]), int(wh[1]) except ValueError as exc: raise UcdConfigError(f"分辨率数字解析失败: {timing_str!r}") from exc hz_str = right.replace("Hz", "").replace("HZ", "").strip() try: refresh_hz = float(hz_str) except ValueError as exc: raise UcdConfigError(f"刷新率解析失败: {timing_str!r}") from exc try: standard = TimingStandard(type_str.lower()) except ValueError as exc: raise UcdConfigError(f"未知的分辨率类型: {type_str!r}") from exc return TimingSpec( standard=standard, width=width, height=height, refresh_hz=refresh_hz, ) __all__ = [ # §1 "Interface", "ColorFormat", "Colorimetry", "DynamicRange", "TimingStandard", "PatternKind", "SignalFormat", "TimingSpec", "PatternSpec", # §2 "UcdState", "assert_transition", # §3 "UcdError", "UcdNotConnected", "UcdStateError", "UcdConfigError", "UcdApplyFailed", "UcdSdkError", # §4 "UcdEvent", "ConnectionChanged", "SignalApplied", "PatternApplied", "EventBus", # §5 "output_format_to_color_format", "color_space_to_colorimetry", "bit_depth_str_to_bpc", "data_range_to_dynamic_range", "is_ycbcr", "parse_timing_str", ]