diff --git a/app/device/connection.py b/app/device/connection.py index 9c94ca2..f97528e 100644 --- a/app/device/connection.py +++ b/app/device/connection.py @@ -1,198 +1,280 @@ -"""设备连接(UCD323 / CA410)相关逻辑(Step 4 重构)。 +"""设备连接管理(UCD323 / CA410)。 -从 pqAutomationApp.PQAutomationApp 中搬迁。每个函数第一行 `self = app` -以保留原有 `self.xxx` 属性访问不变。 +重构目标 +--------- +- 用 :class:`ConnectionController` 类封装连接生命周期,替代旧的 + "把模块级函数当类方法装到 PQAutomationApp 上" 的写法。 +- 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式, + 分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径。 +- UCD 这一侧不再直接调用旧 ``UCDController``,而是通过 + :class:`UCD323Device` + :class:`EventBus`;指示器更新由订阅 + :class:`ConnectionChanged` 事件触发,与 GUI 解耦。 + +模块底部仍保留 7 个旧名字函数作为薄兼容层(``check_com_connections`` +等),供 ``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接, +保证调用点(按钮 command、_dispatch_ui 引用)零修改。 """ +from __future__ import annotations + import threading import time from tkinter import messagebox +from typing import TYPE_CHECKING +from app.ucd_domain import ConnectionChanged, UcdError from drivers.caSerail import CASerail +from drivers.ucd_driver import DeviceInfo + +if TYPE_CHECKING: + from app.ucd_domain import EventBus + from drivers.ucd_driver import UCD323Device + + +# ─── ConnectionController ──────────────────────────────────────── + + +class ConnectionController: + """统一管理 CA410 / UCD323 的连接、断开与可用端口枚举。 + + 设计要点: + - 不持有 :mod:`tkinter` 控件;所有 GUI 更新由订阅事件总线后回调完成。 + - UCD 连接经由 :class:`UCD323Device`,自动发布 + :class:`ConnectionChanged` 事件。 + - CA 连接同样发布 :class:`ConnectionChanged` 事件(带 ``serial=None``, + 指示器订阅时只看 ``connected`` 字段)。 + """ + + def __init__(self, app): + self._app = app + self._bus: "EventBus" = app.event_bus + self._device: "UCD323Device" = app.ucd_device + # 旧 GUI 仍用 self.ca 直接访问 CA410 对象,过渡期保留同步赋值。 + if not hasattr(app, "ca") or app.ca is None: + self._app.ca = None + + # -- 端口枚举 ------------------------------------------------ + + def list_ucd_devices(self) -> list[str]: + """返回 SDK 给出的设备显示字符串列表。""" + try: + return self._device.raw_controller.search_device() or [] + except Exception as exc: # noqa: BLE001 + self._log(f"枚举 UCD 设备失败: {exc}", level="error") + return [] + + def list_com_ports(self) -> list[str]: + try: + import serial.tools.list_ports + + return [p.device for p in serial.tools.list_ports.comports()] + except Exception as exc: # noqa: BLE001 + self._log(f"获取COM端口列表出错: {exc}", level="error") + return [] + + # -- UCD 连接 ------------------------------------------------ + + def connect_ucd(self, display: str) -> bool: + """打开指定 UCD 设备。成功返回 True。""" + if self._device.state.name != "CLOSED": + try: + self._device.close() + except Exception: # noqa: BLE001 + pass + try: + self._device.open(DeviceInfo.parse(display)) + return True + except UcdError as exc: + self._log(f"设备 {display} 异常,UCD323 连接失败: {exc}", level="error") + return False + except Exception as exc: # noqa: BLE001 + self._log(f"UCD323 连接异常: {exc}", level="error") + return False + + def disconnect_ucd(self) -> None: + try: + self._device.close() + except Exception: # noqa: BLE001 + pass + # 旧 controller.status 也要清零,兼容仍读取它的代码 + try: + self._app.ucd.status = False + except Exception: # noqa: BLE001 + pass + self._log("UCD连接已断开", level="info") + + # -- CA 连接 ------------------------------------------------- + + def connect_ca(self) -> bool: + """打开 CA410。成功返回 True 并设置 ``app.ca``。""" + if self._app.ca is not None: + try: + self._app.ca.close() + except Exception: # noqa: BLE001 + pass + self._app.ca = None + + try: + ca = CASerail() + ca.open(self._app.config.device_config["ca_com"], 19200, 7, "E", 2) + if not ca.set_all_Display(): + self._log( + f"端口 {self._app.config.device_config['ca_com']} 异常,色温仪连接失败", + level="error", + ) + ca.close() + return False + ca.setSynchMode(3) + ca.setMeasureSpeed(1) + time.sleep(0.5) + ca.setZeroCalibration() + channel_value = self._app.ca_channel_var.get() + ca.setChannel(f"{int(channel_value):02d}") + self._app.ca = ca + self._bus.publish(ConnectionChanged(True, None)) + return True + except Exception as exc: # noqa: BLE001 + self._log(f"CA410 连接失败: {exc}", level="error") + return False + + def disconnect_ca(self) -> None: + if self._app.ca is None: + return + try: + self._app.ca.close() + except Exception: # noqa: BLE001 + pass + self._app.ca = None + self._bus.publish(ConnectionChanged(False, None)) + self._log("CA连接已断开", level="info") + + # -- 一次性入口 ---------------------------------------------- + + def check_all_async(self) -> None: + """异步并联检测 UCD + CA,通过 ``_dispatch_ui`` 回主线程更新 UI。""" + app = self._app + app.check_button.configure(state="disabled") + app.refresh_button.configure(state="disabled") + app.status_var.set("正在检测连接...") + app.root.update() + + def worker(): + try: + ucd_ok = self.connect_ucd(app.ucd_list_var.get()) + app._dispatch_ui( + app.update_connection_indicator, + app.ucd_status_indicator, ucd_ok, + ) + ca_ok = self.connect_ca() + app._dispatch_ui( + app.update_connection_indicator, + app.ca_status_indicator, ca_ok, + ) + app._dispatch_ui(app.status_var.set, "连接检测完成") + app._dispatch_ui(self._enable_widgets) + except Exception as exc: # noqa: BLE001 + app._dispatch_ui(app.log_gui.log, f"连接检测出错: {exc}") + app._dispatch_ui(self._enable_widgets) + + threading.Thread(target=worker, daemon=True).start() + + def disconnect_all(self) -> None: + try: + self.disconnect_ucd() + self.disconnect_ca() + self._enable_widgets() + self._app.ucd_status_indicator.config(bg="gray") + self._app.ca_status_indicator.config(bg="gray") + self._app.status_var.set("串口连接已断开") + except Exception as exc: # noqa: BLE001 + self._log(f"断开连接时发生错误: {exc}", level="info") + messagebox.showerror("错误", f"断开连接失败: {exc}") + + def refresh_ports(self) -> None: + """刷新 UCD + COM 端口下拉框;指示器复位。""" + app = self._app + com_ports = self.list_com_ports() + ucd_list = self.list_ucd_devices() + + if app.ucd_list_var.get() not in ucd_list: + app.ucd_list_var.set(ucd_list[0] if ucd_list else "") + app.ucd_list_combo.config(values=ucd_list) + + if app.ca_com_var.get() not in com_ports: + app.ca_com_var.set( + com_ports[1] if len(com_ports) > 1 else (com_ports[0] if com_ports else "") + ) + app.ca_com_combo.config(values=com_ports) + + if hasattr(app, "ucd_status_indicator"): + app.ucd_status_indicator.config(bg="gray") + if hasattr(app, "ca_status_indicator"): + app.ca_status_indicator.config(bg="gray") + + app.update_config() + + # -- 内部 ---------------------------------------------------- + + def _enable_widgets(self) -> None: + self._app.check_button.configure(state="normal") + self._app.refresh_button.configure(state="normal") + + def _log(self, msg: str, *, level: str = "info") -> None: + log_gui = getattr(self._app, "log_gui", None) + if log_gui is not None: + log_gui.log(msg, level=level) + + +# ─── 旧名字兼容层 ──────────────────────────────────────────────── +# pqAutomationApp 类体仍以同名属性挂接这些函数;它们都委托给 +# ``self.connection``。Phase 3 完成后这些 shim 可以连同类体的属性 +# 挂接一并删除,让 GUI 直接调用 ``self.connection.xxx``。 + def get_available_ucd_ports(self): - """获取可用的UCD端口列表""" - return self.ucd.search_device() + return self.connection.list_ucd_devices() def get_available_com_ports(self): - """获取可用的COM端口列表""" - try: - import serial.tools.list_ports - - ports = serial.tools.list_ports.comports() - return [port.device for port in ports] - except Exception as e: - self.log_gui.log(f"获取COM端口列表出错: {e}", level="error") - return [] + return self.connection.list_com_ports() def refresh_com_ports(self): - """刷新COM端口列表""" - available_ports = self.get_available_com_ports() - available_list = self.get_available_ucd_ports() - - # 更新UCD列表的下拉框选项 - ucd_list_current = self.ucd_list_var.get() - if ucd_list_current not in available_list: - self.ucd_list_var.set(available_list[0] if available_list else "") - self.ucd_list_combo.config(values=available_list) - - # 更新CA端口的下拉框选项 - ca_com_current = self.ca_com_var.get() - if ca_com_current not in available_ports: - self.ca_com_var.set( - available_ports[1] - if len(available_ports) > 1 - else (available_ports[0] if available_ports else "") - ) - self.ca_com_combo.config(values=available_ports) - - # 重置连接状态指示器为灰色 - if hasattr(self, "ucd_status_indicator"): - self.ucd_status_indicator.config(bg="gray") - if hasattr(self, "ca_status_indicator"): - self.ca_status_indicator.config(bg="gray") - - self.update_config() + self.connection.refresh_ports() def check_com_connections(self): - """检测COM端口连接状态""" - # 禁用连接按钮,防止重复点击 - self.check_button.configure(state="disabled") - self.refresh_button.configure(state="disabled") - - # 更新状态栏 - self.status_var.set("正在检测连接...") - self.root.update() - - # 使用线程进行连接检测 - def check_connections(): - try: - # 检测TV连接 - ucd_connected = self.check_port_connection(is_ucd=True) - self._dispatch_ui( - self.update_connection_indicator, - self.ucd_status_indicator, ucd_connected, - ) - - # 检测CA连接 - ca_connected = self.check_port_connection(is_ucd=False) - self._dispatch_ui( - self.update_connection_indicator, - self.ca_status_indicator, ca_connected, - ) - - # 更新状态栏 - self._dispatch_ui(self.status_var.set, "连接检测完成") - - # 重新启用所有控件 - self._dispatch_ui(self.enable_com_widgets) - except Exception as e: - self._dispatch_ui(self.log_gui.log, f"连接检测出错: {e}") - self._dispatch_ui(self.enable_com_widgets) - - # 启动线程 - threading.Thread(target=check_connections, daemon=True).start() + self.connection.check_all_async() def update_connection_indicator(self, indicator, connected): - """更新连接状态指示器颜色""" - if connected: - indicator.config(bg="green") - else: - indicator.config(bg="red") + indicator.config(bg="green" if connected else "red") def check_port_connection(self, is_ucd=True): - """检测指定端口是否可以连接""" - try: - if is_ucd: - if self.ucd.status: - try: - self.ucd.close() - except: - pass - if not self.ucd.open(self.ucd_list_var.get()): - self.log_gui.log( - f"设备 {self.ucd_list_var.get()} 异常,UCD323连接失败" - , level="error") - return False - else: - return True - else: - # 如果CA对象已存在,先关闭 - if self.ca is not None: - try: - self.ca.close() - except: - pass - channel_value = self.ca_channel_var.get() - str_channel = f"{int(channel_value):02d}" - self.ca = CASerail() - self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2) - # data = self.ca.set_xyLv_Display() - data = self.ca.set_all_Display() - if data: - data = self.ca.setSynchMode(3) - data = self.ca.setMeasureSpeed(1) - if True: - time.sleep(0.5) - data = self.ca.setZeroCalibration() - channel_value = self.ca_channel_var.get() - str_channel = f"{int(channel_value):02d}" - data = self.ca.setChannel(str_channel) - return True - else: - self.log_gui.log( - f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败" - , level="error") - self.ca.close() - self.ca = None - return False - except Exception as e: - self.log_gui.log(f"端口连接失败: {e}", level="error") - return False + """[已弃用] 旗参数反模式;保留仅为兼容旧调用点。""" + if is_ucd: + return self.connection.connect_ucd(self.ucd_list_var.get()) + return self.connection.connect_ca() def enable_com_widgets(self): - """重新启用所有控件""" - self.check_button.configure(state="normal") - self.refresh_button.configure(state="normal") + self.connection._enable_widgets() def disconnect_com_connections(self): - """断开所有串口连接""" - try: - # 断开TV连接 - if self.ucd.status: - try: - self.ucd.close() - except: - pass - finally: - self.ucd.status = False - self.log_gui.log("UCD连接已断开", level="info") - - # 断开CA连接 - if self.ca is not None: - try: - self.ca.close() - except: - pass - finally: - self.ca = None - self.log_gui.log("CA连接已断开", level="info") - - # 重新启用相关控件 - self.enable_com_widgets() - self.ucd_status_indicator.config(bg="gray") - self.ca_status_indicator.config(bg="gray") - self.status_var.set("串口连接已断开") - - except Exception as e: - self.log_gui.log(f"断开连接时发生错误: {str(e)}", level="info") - messagebox.showerror("错误", f"断开连接失败: {str(e)}") + self.connection.disconnect_all() +__all__ = [ + "ConnectionController", + # 兼容层 + "get_available_ucd_ports", + "get_available_com_ports", + "refresh_com_ports", + "check_com_connections", + "update_connection_indicator", + "check_port_connection", + "enable_com_widgets", + "disconnect_com_connections", +] diff --git a/app/services/pattern_service.py b/app/services/pattern_service.py index bd3d728..c759097 100644 --- a/app/services/pattern_service.py +++ b/app/services/pattern_service.py @@ -38,7 +38,7 @@ class PatternService: f" Timing: {self.app.config.current_test_types[test_type]['timing']}", "info", ) - self.app.ucd.set_ucd_params(active_config) + self.app.signal_service.apply_config(active_config) elif test_type == "sdr_movie": data_range = self.app.sdr_data_range_var.get() @@ -60,7 +60,7 @@ class PatternService: active_config = self.app.config.get_temp_config_with_converted_params( mode=mode, converted_params=converted_params ) - self.app.ucd.set_ucd_params(active_config) + self.app.signal_service.apply_config(active_config) success = self.app.signal_service.update_signal_format( color_space=self.app.sdr_color_space_var.get(), data_range=data_range, @@ -92,7 +92,7 @@ class PatternService: active_config = self.app.config.get_temp_config_with_converted_params( mode=mode, converted_params=converted_params ) - self.app.ucd.set_ucd_params(active_config) + self.app.signal_service.apply_config(active_config) success = self.app.signal_service.update_signal_format( color_space=self.app.hdr_color_space_var.get(), data_range=data_range, @@ -123,7 +123,7 @@ class PatternService: raise IndexError(f"pattern 索引越界: {index}") pattern_param = session.pattern_params[index] - if not self.app.ucd.send_current_pattern_params(pattern_param): + if not self.app.signal_service.send_pattern_params(pattern_param): raise RuntimeError(f"发送 pattern 失败: {index}") return pattern_param diff --git a/app/services/ucd_service.py b/app/services/ucd_service.py index 4625f67..cce7fc2 100644 --- a/app/services/ucd_service.py +++ b/app/services/ucd_service.py @@ -176,6 +176,49 @@ class SignalService: def current_resolution(self) -> tuple[int, int]: return self._dev.current_resolution() + @property + def is_connected(self) -> bool: + """UCD 设备是否已打开。供 GUI 做前置校验。""" + ctrl = getattr(self._dev, "raw_controller", None) + return bool(ctrl and getattr(ctrl, "status", False)) + + # -- 过渡期 API(Phase 5):config 驱动的写入 ----------------- + # 现有 GUI / Service 通过 ``PQConfig`` 对象描述当次测试参数, + # 由 :class:`UCDController.set_ucd_params` 翻译为色彩/Timing/Pattern。 + # 在配置层重构落地前,这两个方法作为 SignalService 的统一入口, + # 让上层不再直接接触 ``self.app.ucd``。 + + def apply_config(self, config) -> bool: + """按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern(不 apply 输出)。""" + ctrl = getattr(self._dev, "raw_controller", None) + if ctrl is None: + raise UcdError("apply_config 暂仅支持 UCD323Device") + with self._lock: + return bool(ctrl.set_ucd_params(config)) + + def send_pattern_params(self, params) -> bool: + """以 ``params`` 更新当前 pattern 的参数并 apply。""" + ctrl = getattr(self._dev, "raw_controller", None) + if ctrl is None: + raise UcdError("send_pattern_params 暂仅支持 UCD323Device") + with self._lock: + return bool(ctrl.send_current_pattern_params(params)) + + def apply_and_run(self, config, pattern_params) -> bool: + """``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。 + + 服务于 custom_template_panel 单步流程。 + """ + ctrl = getattr(self._dev, "raw_controller", None) + if ctrl is None: + raise UcdError("apply_and_run 暂仅支持 UCD323Device") + with self._lock: + if not ctrl.set_ucd_params(config): + return False + if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): + return False + return bool(ctrl.run()) + __all__ = [ "SignalService", diff --git a/app/tests/local_dimming.py b/app/tests/local_dimming.py index 83d1864..1736d30 100644 --- a/app/tests/local_dimming.py +++ b/app/tests/local_dimming.py @@ -1,7 +1,7 @@ """Local Dimming 测试逻辑(应用层)。 整合自原 drivers/local_dimming_test.py:窗口图片生成与测试主循环 -直接落在本模块,UCD 通用操作下沉到 drivers.ucd_helpers。 +直接落在本模块,UCD 通用操作通过 SignalService 完成。 """ import atexit @@ -18,7 +18,6 @@ from tkinter import filedialog, messagebox import numpy as np from PIL import Image -from drivers.ucd_helpers import get_current_resolution # -------------------------------------------------------------------------- @@ -90,7 +89,7 @@ def _ensure_window_image(width, height, percentage): def start_local_dimming_test(self): """开始 Local Dimming 测试。""" - if not self.ca or not self.ucd.status: + if not self.ca or not self.signal_service.is_connected: messagebox.showerror("错误", "请先连接 CA410 和 UCD323") return @@ -111,7 +110,7 @@ def start_local_dimming_test(self): log("开始 Local Dimming 测试", level="info") log("=" * 60, level="separator") - width, height = get_current_resolution(self.ucd) + width, height = self.signal_service.current_resolution() total = len(DEFAULT_WINDOW_PERCENTAGES) log(f" 分辨率: {width}x{height}", level="info") log(f" 测试窗口: {DEFAULT_WINDOW_PERCENTAGES}", level="info") @@ -182,7 +181,7 @@ def stop_local_dimming_test(self): def send_ld_window(self, percentage): """发送指定百分比的白色窗口(手动模式)。""" - if not self.ucd.status: + if not self.signal_service.is_connected: messagebox.showwarning("警告", "请先连接 UCD323 设备") return @@ -190,7 +189,7 @@ def send_ld_window(self, percentage): self.current_ld_percentage = percentage def send(): - width, height = get_current_resolution(self.ucd) + width, height = self.signal_service.current_resolution() try: image_path = _ensure_window_image(width, height, percentage) except Exception as e: diff --git a/app/views/panels/ai_image_panel.py b/app/views/panels/ai_image_panel.py index be6f9e6..fab2396 100644 --- a/app/views/panels/ai_image_panel.py +++ b/app/views/panels/ai_image_panel.py @@ -13,7 +13,8 @@ import ttkbootstrap as ttk from PIL import Image, ImageTk from app.services import ai_image as _svc -from drivers.ucd_helpers import get_current_resolution + + # ---------------- 面板创建 ---------------- @@ -638,7 +639,7 @@ def _send_to_ucd(self): # 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。 try: - target_w, target_h = get_current_resolution(ucd) + target_w, target_h = self.signal_service.current_resolution() except Exception: target_w, target_h = 3840, 2160 try: diff --git a/app/views/panels/custom_template_panel.py b/app/views/panels/custom_template_panel.py index d271275..be084c4 100644 --- a/app/views/panels/custom_template_panel.py +++ b/app/views/panels/custom_template_panel.py @@ -310,10 +310,8 @@ def _run_custom_row_single_step(self, item_id, row_no): self._dispatch_ui(self.status_var.set, "单步测试失败:行号超范围") return - self.ucd.set_ucd_params(temp_config) pattern_param = converted_params[row_no - 1] - self.ucd.set_pattern(self.ucd.current_pattern, pattern_param) - self.ucd.run() + self.signal_service.apply_and_run(temp_config, pattern_param) time.sleep(self.pattern_settle_time) diff --git a/app/views/panels/pantone_baseline_panel.py b/app/views/panels/pantone_baseline_panel.py index 431d1e7..856cc70 100644 --- a/app/views/panels/pantone_baseline_panel.py +++ b/app/views/panels/pantone_baseline_panel.py @@ -205,7 +205,7 @@ def _start_pantone_baseline(self): if self._pantone_running: messagebox.showinfo("提示", "Pantone 任务正在执行") return - if not getattr(self, "ucd", None) or not self.ucd.status: + if not self.signal_service.is_connected: messagebox.showwarning("警告", "请先连接 UCD323") return if not getattr(self, "ca", None): @@ -254,7 +254,7 @@ def _resume_pantone_baseline(self): if not self._pantone_paused: messagebox.showinfo("提示", "当前没有可继续的暂停任务") return - if not getattr(self, "ucd", None) or not self.ucd.status: + if not self.signal_service.is_connected: messagebox.showwarning("警告", "请先连接 UCD323") return if not getattr(self, "ca", None): diff --git a/app/views/panels/single_step_panel.py b/app/views/panels/single_step_panel.py index c23c33b..98eed6f 100644 --- a/app/views/panels/single_step_panel.py +++ b/app/views/panels/single_step_panel.py @@ -13,7 +13,8 @@ from tkinter import filedialog, messagebox import ttkbootstrap as ttk from PIL import Image -from drivers.ucd_helpers import get_current_resolution + + _DEFAULT_SAMPLES = [ @@ -387,9 +388,9 @@ def _format_float(value): def _build_color_patch(self, hex_value): - if not getattr(self, "ucd", None) or not self.ucd.status: + if not self.signal_service.is_connected: raise RuntimeError("请先连接 UCD323 设备") - width, height = get_current_resolution(self.ucd) + width, height = self.signal_service.current_resolution() rgb = tuple(int(hex_value[i:i + 2], 16) for i in (1, 3, 5)) temp_dir = os.path.join(tempfile.gettempdir(), "pq_single_step_patches") os.makedirs(temp_dir, exist_ok=True) diff --git a/drivers/UCD323_Function.py b/drivers/UCD323_Function.py index 1afb5af..33f5077 100644 --- a/drivers/UCD323_Function.py +++ b/drivers/UCD323_Function.py @@ -561,10 +561,6 @@ class UCDController: except Exception: return False - # 向后兼容别名 - set_sdr_format = apply_signal_format - set_hdr_format = apply_signal_format - def _get_colorimetry_from_color_space(self, color_space, color_format=None): """将色彩空间字符串转换为UniTAP.ColorInfo.Colorimetry。 BT.2020 在 YCbCr 输出时使用 CM_ITUR_BT2020_YCbCr,RGB 输出时使用 CM_ITUR_BT2020_RGB。 diff --git a/drivers/ucd_helpers.py b/drivers/ucd_helpers.py deleted file mode 100644 index ff5cc69..0000000 --- a/drivers/ucd_helpers.py +++ /dev/null @@ -1,46 +0,0 @@ -"""通用 UCD323/UCDController 辅助函数。 - -保留为兼容层和薄代理,避免业务模块直接依赖控制器内部实现细节。 -""" - - -def get_tx_modules(ucd): - """根据当前接口返回 (pg, ag) 模块。""" - return ucd.get_tx_modules() - - -def get_current_resolution(ucd, default=(3840, 2160)): - """从 UCD 当前 timing 获取 (width, height),失败时返回 default。""" - return ucd.get_current_resolution(default) - - -def send_image_pattern(ucd, image_path): - """通过 UCDController 发送一张本地图片作为显示 Pattern。""" - if not getattr(ucd, "status", False): - return False - - send_via_controller = getattr(ucd, "send_image_pattern", None) - if not callable(send_via_controller): - return False - return bool(send_via_controller(image_path)) - - -def send_solid_rgb_pattern(ucd, rgb, *, raise_on_error=False): - """通过 UCDController 当前状态发送一组纯色 RGB Pattern。""" - if not getattr(ucd, "status", False): - if raise_on_error: - raise RuntimeError("UCD 未连接,无法发送纯色 Pattern") - return False - - send_via_controller = getattr(ucd, "send_solid_rgb_pattern", None) - if not callable(send_via_controller): - if raise_on_error: - raise RuntimeError("UCDController 未实现 send_solid_rgb_pattern") - return False - - ok = bool(send_via_controller(list(rgb))) - if not ok and raise_on_error: - raise RuntimeError(f"发送纯色 Pattern 失败: {rgb}") - return ok - - diff --git a/pqAutomationApp.py b/pqAutomationApp.py index 7cad30a..3fa8eb8 100644 --- a/pqAutomationApp.py +++ b/pqAutomationApp.py @@ -138,6 +138,12 @@ class PQAutomationApp: self.ucd_device = UCD323Device(self.event_bus, self.ucd) self.signal_service = SignalService(self.ucd_device, self.event_bus) + # 连接控制器:统一管理 CA/UCD 生命周期。 + # 旧的 check_com_connections / disconnect_com_connections 等模块级 + # 函数仍以类属性形式挂在 PQAutomationApp 上,内部全部委托给本对象。 + from app.device.connection import ConnectionController + self.connection = ConnectionController(self) + # 初始化测试状态 self.testing = False self.test_thread = None diff --git a/pqAutomationApp.spec b/pqAutomationApp.spec index c46e4d3..409f435 100644 --- a/pqAutomationApp.spec +++ b/pqAutomationApp.spec @@ -110,7 +110,9 @@ a = Analysis( 'drivers.tvSerail', 'drivers.UCD323_Enum', 'drivers.UCD323_Function', - 'drivers.ucd_helpers', + 'drivers.ucd_driver', + 'app.ucd_domain', + 'app.services.ucd_service', ], hookspath=[], hooksconfig={}, diff --git a/settings/pq_config.json b/settings/pq_config.json index 9084d52..8924277 100644 --- a/settings/pq_config.json +++ b/settings/pq_config.json @@ -1,5 +1,5 @@ { - "current_test_type": "screen_module", + "current_test_type": "sdr_movie", "test_types": { "screen_module": { "name": "屏模组性能测试",