重构UCD模块

This commit is contained in:
xinzhu.yin
2026-06-11 15:53:41 +08:00
parent 38222ff002
commit cc7218411c
11 changed files with 395 additions and 188 deletions

View File

@@ -54,9 +54,10 @@ class UCDController:
self._close_device_object(temp_dev)
raise role_error
pg, _ = self.get_tx_modules()
pg, ag = self.get_tx_modules()
self.timing_manager = pg.timing_manager
self.color_info = UniTAP.ColorInfo()
self._stop_audio_output(ag)
self.status = True
return True
@@ -82,6 +83,10 @@ class UCDController:
"""关闭设备"""
try:
if self.dev:
try:
self._stop_audio_output()
except Exception:
pass
self._close_device_object(self.dev)
self._reset_state()
@@ -152,6 +157,28 @@ class UCDController:
return self.role.dptx.pg, self.role.dptx.ag
raise ValueError(f"不支持的接口类型: {interface}")
def _stop_audio_output(self, ag=None) -> None:
"""关闭 HDMI/DP 音频发生器。PQ 测试仅需视频图案,避免电视持续输出测试音。"""
if not self.status or not self.role:
return
try:
if ag is None:
_, ag = self.get_tx_modules()
ag.stop_generate()
log.info("UCDController._stop_audio_output done")
except Exception:
log.exception("UCDController._stop_audio_output failed")
def _apply_pg_output(self, pg) -> bool:
"""提交 PG 输出,并确保音频发生器处于关闭状态。"""
try:
ok = bool(pg.apply())
except Exception:
log.exception("UCDController._apply_pg_output pg.apply failed")
return False
self._stop_audio_output()
return ok
def _resolve_timing(self, pg=None):
"""优先从 current_timing 读取 timing必要时回退到 TX 模块。"""
if self.current_timing is not None:
@@ -253,9 +280,9 @@ class UCDController:
self.apply_pattern()
pg, _ = self.get_tx_modules()
log.info("UCDController.run calling pg.apply()")
pg.apply()
log.info("UCDController.run done")
return True
ok = self._apply_pg_output(pg)
log.info("UCDController.run done ok=%s", ok)
return ok
def send_image_pattern(self, image_path):
"""发送图片 Pattern依赖当前 timing/color_info 状态)。"""
@@ -264,10 +291,11 @@ class UCDController:
try:
pg, _ = self.get_tx_modules()
self.apply_video_mode()
# 仅切换图案,不重复 set_vm重复 apply video mode 会触发电视 HDMI 重锁发声。
if getattr(self, "_last_sent_config", None) is None:
self.apply_video_mode()
pg.set_pattern(pattern=image_path)
pg.apply()
return True
return self._apply_pg_output(pg)
except Exception:
return False
@@ -323,6 +351,9 @@ class UCDController:
current_dynamic_range = self.color_info.dynamic_range
color_format = UCDEnum.ColorInfo.get_color_format(cf)
if color_format is None:
fmt_key = UCDEnum.SignalFormat.OutputFormat.get_format_key(cf)
color_format = UCDEnum.ColorInfo.get_color_format(fmt_key)
if color_format is None:
return False
@@ -369,12 +400,17 @@ class UCDController:
self.color_info.dynamic_range,
self.color_info.bpc,
)
if not self.format_changed:
log.info("UCDController.set_video_mode skipped pg.set_vm(): config unchanged")
return True
video_mode = UniTAP.VideoMode(
timing=self.current_timing, color_info=self.color_info
)
pg, _ = self.get_tx_modules()
log.info("UCDController.set_video_mode calling pg.set_vm()")
pg.set_vm(vm=video_mode)
self._stop_audio_output()
log.info("UCDController.set_video_mode done")
self._last_sent_config = current_config
return True

View File

@@ -3,11 +3,12 @@
唯一暴露给上层的入口为 :class:`IUcdDevice` 抽象接口,以及实现:
:class:`UCD323Device`(生产)和 :class:`FakeUcdDevice`(单测)。
Phase 1 实现策略
-----------------
为保证零行为变更,:class:`UCD323Device` 当前**内部委托**给已有的
:class:`drivers.UCD323_Function.UCDController`。后续 Phase 2 会将
SDK 调用直接搬入本模块,届时可删除旧 ``UCDController`` 文件
实现策略
--------
:class:`UCD323Device` 对外暴露完整的 :class:`IUcdDevice` 接口SDK 调用
当前仍委托给 :class:`drivers.UCD323_Function.UCDController`。
上层Service / GUI**不得**直接访问 ``UCDController``。
后续可将 SDK 调用逐步迁入本模块并删除旧文件。
文件分区:
§1 DeviceInfo / list_devices
@@ -141,6 +142,45 @@ class IUcdDevice(ABC):
def current_resolution(self) -> tuple[int, int]:
"""读取当前 timing 的 (width, height);未连接时返回默认 (3840, 2160)。"""
@abstractmethod
def search_devices(self) -> list[str]:
"""枚举可用设备的 SDK 显示字符串列表。"""
@property
@abstractmethod
def format_changed(self) -> bool:
"""最近一次视频模式提交是否相对上次发生变化。"""
@property
@abstractmethod
def last_error(self) -> str | None:
"""最近一次配置/应用失败时的错误描述。"""
@abstractmethod
def apply_signal_format(
self,
*,
color_space: str | None = None,
data_range: str | None = None,
bit_depth: str | None = None,
color_format: str | None = None,
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
"""仅更新信号格式(沿用当前 timing不切换图案。"""
@abstractmethod
def set_ucd_params(self, config) -> bool:
"""按 PQConfig stage 色彩 / Timing / Pattern 类型(不 apply 输出)。"""
@abstractmethod
def send_current_pattern_params(self, pattern_params) -> bool:
"""更新当前 pattern 参数并 apply 到硬件。"""
@abstractmethod
def apply_config_and_run(self, config, pattern_params) -> bool:
"""set_ucd_params + set_pattern + run 复合操作。"""
# ─── §3 UCD323Device 真实实现 ────────────────────────────────────
@@ -211,14 +251,6 @@ class UCD323Device(IUcdDevice):
def info(self) -> DeviceInfo | None:
return self._info
@property
def raw_controller(self) -> "UCDController":
"""Phase 1 过渡期:给暂未迁移的旧调用点的逃生通道。
新代码**不**应使用本属性,迁移完成后即可删除。
"""
return self._controller
# -- 生命周期 ------------------------------------------------
def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None:
@@ -336,6 +368,74 @@ class UCD323Device(IUcdDevice):
except Exception: # noqa: BLE001
return (3840, 2160)
def search_devices(self) -> list[str]:
try:
return self._controller.search_device() or []
except Exception as exc: # noqa: BLE001
raise UcdSdkError("枚举 UCD 设备失败") from exc
@property
def format_changed(self) -> bool:
return bool(getattr(self._controller, "format_changed", True))
@property
def last_error(self) -> str | None:
err = getattr(self._controller, "last_error", None)
return str(err) if err else None
def apply_signal_format(
self,
*,
color_space: str | None = None,
data_range: str | None = None,
bit_depth: str | None = None,
color_format: str | None = None,
max_cll: int | None = None,
max_fall: int | None = None,
) -> bool:
with self._acquire_device_lock("apply_signal_format"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 apply_signal_format")
return bool(
self._controller.apply_signal_format(
color_space=color_space,
data_range=data_range,
bit_depth=bit_depth,
color_format=color_format,
max_cll=max_cll,
max_fall=max_fall,
)
)
def set_ucd_params(self, config) -> bool:
with self._acquire_device_lock("set_ucd_params"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 set_ucd_params")
return bool(self._controller.set_ucd_params(config))
def send_current_pattern_params(self, pattern_params) -> bool:
with self._acquire_device_lock("send_current_pattern_params"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 send_current_pattern_params")
ok = bool(self._controller.send_current_pattern_params(pattern_params))
if ok:
self._state = UcdState.APPLIED
return ok
def apply_config_and_run(self, config, pattern_params) -> bool:
with self._acquire_device_lock("apply_config_and_run"):
if self._state == UcdState.CLOSED:
raise UcdNotConnected("UCD 未连接,无法 apply_config_and_run")
ctrl = self._controller
if not ctrl.set_ucd_params(config):
return False
if not ctrl.set_pattern(ctrl.current_pattern, pattern_params):
return False
ok = bool(ctrl.run())
if ok:
self._state = UcdState.APPLIED
return ok
# -- 内部辅助 ------------------------------------------------
def _apply_dynamic_range(self, signal: SignalFormat) -> None:
@@ -387,7 +487,10 @@ class UCD323Device(IUcdDevice):
)
try:
pg, _ = self._controller.get_tx_modules()
pg.apply()
if not self._controller._apply_pg_output(pg):
raise UcdApplyFailed("controller.apply_pg_output 返回 False")
except UcdApplyFailed:
raise
except Exception as exc:
raise UcdSdkError("pg.apply() 失败") from exc
return True
@@ -466,8 +569,8 @@ class FakeUcdDevice(IUcdDevice):
return (signal, timing) != self._last_applied
def set_pattern(self, pattern: PatternSpec) -> None:
if self._state not in (UcdState.CONFIGURED, UcdState.APPLIED):
raise UcdStateError(f"非法状态 {self._state.name}")
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("set_pattern", pattern))
self._pattern = pattern
@@ -488,6 +591,44 @@ class FakeUcdDevice(IUcdDevice):
return (3840, 2160)
return (self._timing.width, self._timing.height)
def search_devices(self) -> list[str]:
return []
@property
def format_changed(self) -> bool:
return self._last_applied is None
@property
def last_error(self) -> str | None:
return None
def apply_signal_format(self, **kwargs) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("apply_signal_format", kwargs))
return True
def set_ucd_params(self, config) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("set_ucd_params", config))
self._state = UcdState.OPENED
return True
def send_current_pattern_params(self, pattern_params) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("send_current_pattern_params", pattern_params))
self._state = UcdState.APPLIED
return True
def apply_config_and_run(self, config, pattern_params) -> bool:
if self._state == UcdState.CLOSED:
raise UcdNotConnected()
self.calls.append(("apply_config_and_run", config, pattern_params))
self._state = UcdState.APPLIED
return True
__all__ = [
"DeviceInfo",