"""设备连接管理(UCD323 / CA410)。 重构目标 --------- - 用 :class:`ConnectionController` 类封装连接生命周期,替代旧的 "把模块级函数当类方法装到 PQAutomationApp 上" 的写法。 - 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式, 分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径。 - UCD 这一侧不再直接调用旧 ``UCDController``,而是通过 :class:`UCD323Device` + :class:`EventBus`;指示器更新由订阅 :class:`ConnectionChanged` 事件触发,与 GUI 解耦。 模块底部仍保留 7 个旧名字函数作为薄兼容层(``check_com_connections`` 等),供 ``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接, 保证调用点(按钮 command、_dispatch_ui 引用)零修改。 """ from __future__ import annotations import threading import time from tkinter import messagebox from typing import TYPE_CHECKING from app.ucd_domain import ConnectionChanged, UcdError from drivers.caSerail import CASerail from drivers.ucd_driver import DeviceInfo from typing import TYPE_CHECKING if TYPE_CHECKING: from pqAutomationApp import PQAutomationApp if TYPE_CHECKING: from app.ucd_domain import EventBus from drivers.ucd_driver import UCD323Device # ─── ConnectionController ──────────────────────────────────────── class ConnectionController: """统一管理 CA410 / UCD323 的连接、断开与可用端口枚举。 设计要点: - 不持有 :mod:`tkinter` 控件;所有 GUI 更新由订阅事件总线后回调完成。 - UCD 连接经由 :class:`UCD323Device`,自动发布 :class:`ConnectionChanged` 事件。 - CA 连接同样发布 :class:`ConnectionChanged` 事件(带 ``serial=None``, 指示器订阅时只看 ``connected`` 字段)。 """ def __init__(self, app): self._app = app self._bus: "EventBus" = app.event_bus self._device: "UCD323Device" = app.ucd_device # 旧 GUI 仍用 self.ca 直接访问 CA410 对象,过渡期保留同步赋值。 if not hasattr(app, "ca") or app.ca is None: self._app.ca = None # -- 端口枚举 ------------------------------------------------ def list_ucd_devices(self) -> list[str]: """返回 SDK 给出的设备显示字符串列表。""" try: return self._device.raw_controller.search_device() or [] except Exception as exc: # noqa: BLE001 self._log(f"枚举 UCD 设备失败: {exc}", level="error") return [] def list_com_ports(self) -> list[str]: try: import serial.tools.list_ports return [p.device for p in serial.tools.list_ports.comports()] except Exception as exc: # noqa: BLE001 self._log(f"获取COM端口列表出错: {exc}", level="error") return [] # -- UCD 连接 ------------------------------------------------ def connect_ucd(self, display: str) -> bool: """打开指定 UCD 设备。成功返回 True。""" if self._device.state.name != "CLOSED": try: self._device.close() except Exception: # noqa: BLE001 pass try: self._device.open(DeviceInfo.parse(display)) return True except UcdError as exc: self._log(f"设备 {display} 异常,UCD323 连接失败: {exc}", level="error") return False except Exception as exc: # noqa: BLE001 self._log(f"UCD323 连接异常: {exc}", level="error") return False def disconnect_ucd(self) -> None: try: self._device.close() except Exception: # noqa: BLE001 pass # 旧 controller.status 也要清零,兼容仍读取它的代码 try: self._app.ucd.status = False except Exception: # noqa: BLE001 pass self._log("UCD连接已断开", level="info") # -- CA 连接 ------------------------------------------------- def connect_ca(self) -> bool: """打开 CA410。成功返回 True 并设置 ``app.ca``。""" if self._app.ca is not None: try: self._app.ca.close() except Exception: # noqa: BLE001 pass self._app.ca = None try: ca = CASerail() ca.open(self._app.config.device_config["ca_com"], 19200, 7, "E", 2) if not ca.set_all_Display(): self._log( f"端口 {self._app.config.device_config['ca_com']} 异常,色温仪连接失败", level="error", ) ca.close() return False ca.setSynchMode(3) ca.setMeasureSpeed(1) time.sleep(0.5) ca.setZeroCalibration() channel_value = self._app.ca_channel_var.get() ca.setChannel(f"{int(channel_value):02d}") self._app.ca = ca self._bus.publish(ConnectionChanged(True, None)) return True except Exception as exc: # noqa: BLE001 self._log(f"CA410 连接失败: {exc}", level="error") return False def disconnect_ca(self) -> None: if self._app.ca is None: return try: self._app.ca.close() except Exception: # noqa: BLE001 pass self._app.ca = None self._bus.publish(ConnectionChanged(False, None)) self._log("CA连接已断开", level="info") # -- 一次性入口 ---------------------------------------------- def check_all_async(self) -> None: """异步并联检测 UCD + CA,通过 ``_dispatch_ui`` 回主线程更新 UI。""" app = self._app app.check_button.configure(state="disabled") app.refresh_button.configure(state="disabled") app.status_var.set("正在检测连接...") app.root.update() def worker(): try: ucd_ok = self.connect_ucd(app.ucd_list_var.get()) app._dispatch_ui( app.update_connection_indicator, app.ucd_status_indicator, ucd_ok, ) ca_ok = self.connect_ca() app._dispatch_ui( app.update_connection_indicator, app.ca_status_indicator, ca_ok, ) app._dispatch_ui(app.status_var.set, "连接检测完成") app._dispatch_ui(self._enable_widgets) except Exception as exc: # noqa: BLE001 app._dispatch_ui(app.log_gui.log, f"连接检测出错: {exc}") app._dispatch_ui(self._enable_widgets) threading.Thread(target=worker, daemon=True).start() def disconnect_all(self) -> None: try: self.disconnect_ucd() self.disconnect_ca() self._enable_widgets() self._app.ucd_status_indicator.config(bg="gray") self._app.ca_status_indicator.config(bg="gray") self._app.status_var.set("串口连接已断开") except Exception as exc: # noqa: BLE001 self._log(f"断开连接时发生错误: {exc}", level="info") messagebox.showerror("错误", f"断开连接失败: {exc}") def refresh_ports(self) -> None: """刷新 UCD + COM 端口下拉框;指示器复位。""" app = self._app com_ports = self.list_com_ports() ucd_list = self.list_ucd_devices() if app.ucd_list_var.get() not in ucd_list: app.ucd_list_var.set(ucd_list[0] if ucd_list else "") app.ucd_list_combo.config(values=ucd_list) if app.ca_com_var.get() not in com_ports: app.ca_com_var.set( com_ports[1] if len(com_ports) > 1 else (com_ports[0] if com_ports else "") ) app.ca_com_combo.config(values=com_ports) if hasattr(app, "ucd_status_indicator"): app.ucd_status_indicator.config(bg="gray") if hasattr(app, "ca_status_indicator"): app.ca_status_indicator.config(bg="gray") app.update_config() # -- 内部 ---------------------------------------------------- def _enable_widgets(self) -> None: self._app.check_button.configure(state="normal") self._app.refresh_button.configure(state="normal") def _log(self, msg: str, *, level: str = "info") -> None: log_gui = getattr(self._app, "log_gui", None) if log_gui is not None: log_gui.log(msg, level=level) # ─── 旧名字兼容层 ──────────────────────────────────────────────── # pqAutomationApp 类体仍以同名属性挂接这些函数;它们都委托给 # ``self.connection``。Phase 3 完成后这些 shim 可以连同类体的属性 # 挂接一并删除,让 GUI 直接调用 ``self.connection.xxx``。 def get_available_ucd_ports(self: "PQAutomationApp"): return self.connection.list_ucd_devices() def get_available_com_ports(self: "PQAutomationApp"): return self.connection.list_com_ports() def refresh_com_ports(self: "PQAutomationApp"): self.connection.refresh_ports() def check_com_connections(self: "PQAutomationApp"): self.connection.check_all_async() def update_connection_indicator(self: "PQAutomationApp", indicator, connected): indicator.config(bg="green" if connected else "red") def check_port_connection(self: "PQAutomationApp", is_ucd=True): """[已弃用] 旗参数反模式;保留仅为兼容旧调用点。""" if is_ucd: return self.connection.connect_ucd(self.ucd_list_var.get()) return self.connection.connect_ca() def enable_com_widgets(self: "PQAutomationApp"): self.connection._enable_widgets() def disconnect_com_connections(self: "PQAutomationApp"): self.connection.disconnect_all() __all__ = [ "ConnectionController", # 兼容层 "get_available_ucd_ports", "get_available_com_ports", "refresh_com_ports", "check_com_connections", "update_connection_indicator", "check_port_connection", "enable_com_widgets", "disconnect_com_connections", ] class DeviceConnectionMixin: """由 tools/refactor_to_mixins.py 自动生成。 把本模块的自由函数挂到 PQAutomationApp 上,便于 F12 跳转与类型推断。 """ get_available_ucd_ports = get_available_ucd_ports get_available_com_ports = get_available_com_ports refresh_com_ports = refresh_com_ports check_com_connections = check_com_connections update_connection_indicator = update_connection_indicator check_port_connection = check_port_connection enable_com_widgets = enable_com_widgets disconnect_com_connections = disconnect_com_connections