Files
pqAutomationApp/app/ucd_domain.py
2026-05-24 10:49:28 +08:00

389 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""UCD 控制 Domain 层。
纯数据 + 纯函数:枚举、值对象、状态机、错误体系、事件总线、
业务字符串解析与映射。本模块**不**依赖 UniTAP / 任何硬件;
可用纯单测覆盖。
文件分区:
§1 枚举与值对象
§2 状态机
§3 错误体系
§4 事件总线
§5 业务字符串解析 / 映射
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
log = logging.getLogger(__name__)
# ─── §1 枚举与值对象 ──────────────────────────────────────────────
class Interface(str, Enum):
"""UCD 物理输出接口。"""
HDMI = "HDMI"
DP = "DP"
USBC = "Type-C"
class ColorFormat(str, Enum):
"""像素颜色格式(与 UI 显示字符串解耦的内部表示)。"""
RGB = "rgb"
YCBCR_444 = "ycbcr444"
YCBCR_422 = "ycbcr422"
YCBCR_420 = "ycbcr420"
Y_ONLY = "yonly"
IDO_DEFINED = "ido_defined"
RAW = "raw"
DSC = "dsc"
class Colorimetry(str, Enum):
"""色度空间。"""
SRGB = "sRGB"
BT709 = "BT.709"
BT601 = "BT.601"
BT2020 = "BT.2020"
DCI_P3 = "DCI-P3"
ADOBE_RGB = "AdobeRGB"
class DynamicRange(str, Enum):
FULL = "Full"
LIMITED = "Limited"
class TimingStandard(str, Enum):
DMT = "dmt"
CTA = "cta"
CVT = "cvt"
OVT = "ovt"
class PatternKind(str, Enum):
"""高层图案种类。
与 UCD 内部 VideoPattern 枚举区分:仅暴露业务上真正用到的几种。
"""
DISABLED = "disabled"
SOLID = "solidcolor"
SOLID_WHITE = "solidwhite"
SOLID_RED = "solidred"
SOLID_GREEN = "solidgreen"
SOLID_BLUE = "solidblue"
COLOR_BARS = "colorbars"
CHESSBOARD = "chessboard"
WHITE_VSTRIPS = "whitevstrips"
GRADIENT_RGB_STRIPES = "gradientrgbstripes"
COLOR_RAMP = "colorramp"
COLOR_SQUARES = "coloursquares"
MOTION = "motionpattern"
SQUARE_WINDOW = "squarewindow"
IMAGE = "image" # 来自文件路径
@dataclass(frozen=True)
class SignalFormat:
"""信号格式color_info 部分)。"""
color_format: ColorFormat
colorimetry: Colorimetry
bpc: int # 8 / 10 / 12
dynamic_range: DynamicRange = DynamicRange.FULL
@dataclass(frozen=True)
class TimingSpec:
"""显示 Timing 描述。"""
standard: TimingStandard
width: int
height: int
refresh_hz: float
def __str__(self) -> str: # 便于日志
return f"{self.standard.value.upper()} {self.width}x{self.height}@{self.refresh_hz:g}Hz"
@dataclass(frozen=True)
class PatternSpec:
"""图案描述。
联合字段含义:
SOLID → solid_rgb=(r,g,b)
IMAGE → image_path
其它预定义图案 → extras 视具体类型
"""
kind: PatternKind
solid_rgb: tuple[int, int, int] | None = None
image_path: str | None = None
extras: tuple = field(default_factory=tuple)
# ─── §2 状态机 ───────────────────────────────────────────────────
class UcdState(Enum):
CLOSED = 0
OPENED = 1 # 设备已打开,未配置信号
CONFIGURED = 2 # SignalFormat + Timing 已写入,未 apply
APPLIED = 3 # 已 apply硬件正在输出
_ALLOWED: dict[UcdState, set[UcdState]] = {
UcdState.CLOSED: {UcdState.OPENED, UcdState.CLOSED},
UcdState.OPENED: {UcdState.CONFIGURED, UcdState.CLOSED, UcdState.OPENED},
UcdState.CONFIGURED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED},
UcdState.APPLIED: {UcdState.CONFIGURED, UcdState.APPLIED, UcdState.OPENED, UcdState.CLOSED},
}
def assert_transition(curr: UcdState, nxt: UcdState) -> None:
"""校验状态转移。非法转移抛 :class:`UcdStateError`。"""
if nxt not in _ALLOWED[curr]:
raise UcdStateError(f"非法状态转移: {curr.name} -> {nxt.name}")
# ─── §3 错误体系 ─────────────────────────────────────────────────
class UcdError(Exception):
"""UCD 控制相关错误的基类。"""
class UcdNotConnected(UcdError):
"""设备未打开/未连接。"""
class UcdStateError(UcdError):
"""状态机非法转移或前置条件不满足。"""
class UcdConfigError(UcdError):
"""业务配置参数不合法(如不支持的 color_format、解析失败"""
class UcdApplyFailed(UcdError):
"""SDK ``apply`` 返回失败或超时。"""
class UcdSdkError(UcdError):
"""UniTAP SDK 内部异常的包装;原始异常保存在 ``__cause__``。"""
# ─── §4 事件总线 ─────────────────────────────────────────────────
@dataclass(frozen=True)
class UcdEvent:
"""事件基类。"""
@dataclass(frozen=True)
class ConnectionChanged(UcdEvent):
connected: bool
serial: str | None = None
@dataclass(frozen=True)
class SignalApplied(UcdEvent):
signal: SignalFormat
timing: TimingSpec
format_changed: bool # 与上一次 apply 相比 (signal,timing) 是否改变
@dataclass(frozen=True)
class PatternApplied(UcdEvent):
pattern: PatternSpec
class EventBus:
"""极简同步事件总线(按事件类型分发)。
单线程使用;如需跨线程派发,由订阅者自行 marshal 到 UI 线程。
"""
def __init__(self) -> None:
self._subs: dict[type, list[Callable[[Any], None]]] = {}
def subscribe(self, evt_type: type, cb: Callable[[Any], None]) -> None:
self._subs.setdefault(evt_type, []).append(cb)
def publish(self, evt: UcdEvent) -> None:
for cb in self._subs.get(type(evt), []):
try:
cb(evt)
except Exception: # noqa: BLE001 - 订阅者错误不应影响发布者
log.exception("UCD 事件处理器抛出异常: %r", evt)
# ─── §5 业务字符串解析 / 映射 ────────────────────────────────────
_OUTPUT_FORMAT_TO_COLOR_FORMAT: dict[str, ColorFormat] = {
"RGB": ColorFormat.RGB,
"YCbCr 4:4:4": ColorFormat.YCBCR_444,
"YCbCr 4:2:2": ColorFormat.YCBCR_422,
"YCbCr 4:2:0": ColorFormat.YCBCR_420,
"Y Only": ColorFormat.Y_ONLY,
"IDO Defined": ColorFormat.IDO_DEFINED,
"RAW": ColorFormat.RAW,
"DSC": ColorFormat.DSC,
}
_COLOR_SPACE_TO_COLORIMETRY: dict[str, Colorimetry] = {
"sRGB": Colorimetry.SRGB,
"BT.709": Colorimetry.BT709,
"BT.601": Colorimetry.BT601,
"BT.2020": Colorimetry.BT2020,
"DCI-P3": Colorimetry.DCI_P3,
"AdobeRGB": Colorimetry.ADOBE_RGB,
}
_BIT_DEPTH_STR_TO_BPC: dict[str, int] = {
"8bit": 8,
"10bit": 10,
"12bit": 12,
}
_DATA_RANGE_TO_DYNAMIC_RANGE: dict[str, DynamicRange] = {
"Full": DynamicRange.FULL,
"Limited": DynamicRange.LIMITED,
}
def output_format_to_color_format(s: str) -> ColorFormat:
"""显示用 ``"YCbCr 4:4:4"`` → :class:`ColorFormat`。未知值视为 RGB。"""
return _OUTPUT_FORMAT_TO_COLOR_FORMAT.get(s, ColorFormat.RGB)
def color_space_to_colorimetry(s: str) -> Colorimetry:
"""显示用 ``"BT.709"`` → :class:`Colorimetry`。未知抛 :class:`UcdConfigError`。"""
if s in _COLOR_SPACE_TO_COLORIMETRY:
return _COLOR_SPACE_TO_COLORIMETRY[s]
raise UcdConfigError(f"未知色彩空间: {s!r}")
def bit_depth_str_to_bpc(s: str) -> int:
"""``"10bit"`` → 10。未知抛 :class:`UcdConfigError`。"""
if s in _BIT_DEPTH_STR_TO_BPC:
return _BIT_DEPTH_STR_TO_BPC[s]
raise UcdConfigError(f"未知位深: {s!r}")
def data_range_to_dynamic_range(s: str) -> DynamicRange:
if s in _DATA_RANGE_TO_DYNAMIC_RANGE:
return _DATA_RANGE_TO_DYNAMIC_RANGE[s]
raise UcdConfigError(f"未知数据范围: {s!r}")
def is_ycbcr(color_format: ColorFormat | str | None) -> bool:
"""判断输出是否为 YCbCr 系列。接受 :class:`ColorFormat` 或 UI 字符串。"""
if color_format is None:
return False
if isinstance(color_format, ColorFormat):
return color_format in {
ColorFormat.YCBCR_444,
ColorFormat.YCBCR_422,
ColorFormat.YCBCR_420,
}
return "YCbCr" in str(color_format)
def parse_timing_str(timing_str: str) -> TimingSpec:
"""解析 ``"DMT 3840x2160@60Hz"`` 风格的字符串为 :class:`TimingSpec`。
宽容处理空格 / 大小写 / ``Hz`` 后缀大小写。
解析失败抛 :class:`UcdConfigError`。
"""
if not isinstance(timing_str, str):
raise UcdConfigError(f"timing_str 必须是字符串: {timing_str!r}")
s = " ".join(timing_str.strip().split())
s = s.replace(" x", "x").replace("x ", "x")
parts = s.split(" ", 1)
if len(parts) < 2:
raise UcdConfigError(f"无法解析 timing: {timing_str!r}")
type_str, rest = parts[0].strip().upper(), parts[1].strip()
if "@" not in rest:
raise UcdConfigError(f"无法解析 timing (缺少 '@'): {timing_str!r}")
left, right = (p.strip() for p in rest.split("@", 1))
if "x" not in left:
raise UcdConfigError(f"无法解析分辨率 (缺少 'x'): {timing_str!r}")
wh = left.split("x")
if len(wh) != 2:
raise UcdConfigError(f"无法解析分辨率: {timing_str!r}")
try:
width, height = int(wh[0]), int(wh[1])
except ValueError as exc:
raise UcdConfigError(f"分辨率数字解析失败: {timing_str!r}") from exc
hz_str = right.replace("Hz", "").replace("HZ", "").strip()
try:
refresh_hz = float(hz_str)
except ValueError as exc:
raise UcdConfigError(f"刷新率解析失败: {timing_str!r}") from exc
try:
standard = TimingStandard(type_str.lower())
except ValueError as exc:
raise UcdConfigError(f"未知的分辨率类型: {type_str!r}") from exc
return TimingSpec(
standard=standard,
width=width,
height=height,
refresh_hz=refresh_hz,
)
__all__ = [
# §1
"Interface",
"ColorFormat",
"Colorimetry",
"DynamicRange",
"TimingStandard",
"PatternKind",
"SignalFormat",
"TimingSpec",
"PatternSpec",
# §2
"UcdState",
"assert_transition",
# §3
"UcdError",
"UcdNotConnected",
"UcdStateError",
"UcdConfigError",
"UcdApplyFailed",
"UcdSdkError",
# §4
"UcdEvent",
"ConnectionChanged",
"SignalApplied",
"PatternApplied",
"EventBus",
# §5
"output_format_to_color_format",
"color_space_to_colorimetry",
"bit_depth_str_to_bpc",
"data_range_to_dynamic_range",
"is_ycbcr",
"parse_timing_str",
]