Files
pqAutomationApp/app/device/connection.py

301 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""设备连接管理UCD323 / CA410
重构目标
---------
- 用 :class:`ConnectionController` 类封装连接生命周期,替代旧的
"把模块级函数当类方法装到 PQAutomationApp 上" 的写法。
- 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式,
分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径。
- UCD 这一侧不再直接调用旧 ``UCDController``,而是通过
:class:`UCD323Device` + :class:`EventBus`;指示器更新由订阅
:class:`ConnectionChanged` 事件触发,与 GUI 解耦。
模块底部仍保留 7 个旧名字函数作为薄兼容层(``check_com_connections``
等),供 ``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接,
保证调用点(按钮 command、_dispatch_ui 引用)零修改。
"""
from __future__ import annotations
import threading
import time
from tkinter import messagebox
from typing import TYPE_CHECKING
from app.ucd_domain import ConnectionChanged, UcdError
from drivers.caSerail import CASerail
from drivers.ucd_driver import DeviceInfo
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pqAutomationApp import PQAutomationApp
if TYPE_CHECKING:
from app.ucd_domain import EventBus
from drivers.ucd_driver import UCD323Device
# ─── ConnectionController ────────────────────────────────────────
class ConnectionController:
"""统一管理 CA410 / UCD323 的连接、断开与可用端口枚举。
设计要点:
- 不持有 :mod:`tkinter` 控件;所有 GUI 更新由订阅事件总线后回调完成。
- UCD 连接经由 :class:`UCD323Device`,自动发布
:class:`ConnectionChanged` 事件。
- CA 连接同样发布 :class:`ConnectionChanged` 事件(带 ``serial=None``
指示器订阅时只看 ``connected`` 字段)。
"""
def __init__(self, app):
self._app = app
self._bus: "EventBus" = app.event_bus
self._device: "UCD323Device" = app.ucd_device
# 旧 GUI 仍用 self.ca 直接访问 CA410 对象,过渡期保留同步赋值。
if not hasattr(app, "ca") or app.ca is None:
self._app.ca = None
# -- 端口枚举 ------------------------------------------------
def list_ucd_devices(self) -> list[str]:
"""返回 SDK 给出的设备显示字符串列表。"""
try:
return self._device.raw_controller.search_device() or []
except Exception as exc: # noqa: BLE001
self._log(f"枚举 UCD 设备失败: {exc}", level="error")
return []
def list_com_ports(self) -> list[str]:
try:
import serial.tools.list_ports
return [p.device for p in serial.tools.list_ports.comports()]
except Exception as exc: # noqa: BLE001
self._log(f"获取COM端口列表出错: {exc}", level="error")
return []
# -- UCD 连接 ------------------------------------------------
def connect_ucd(self, display: str) -> bool:
"""打开指定 UCD 设备。成功返回 True。"""
if self._device.state.name != "CLOSED":
try:
self._device.close()
except Exception: # noqa: BLE001
pass
try:
self._device.open(DeviceInfo.parse(display))
return True
except UcdError as exc:
self._log(f"设备 {display} 异常UCD323 连接失败: {exc}", level="error")
return False
except Exception as exc: # noqa: BLE001
self._log(f"UCD323 连接异常: {exc}", level="error")
return False
def disconnect_ucd(self) -> None:
try:
self._device.close()
except Exception: # noqa: BLE001
pass
# 旧 controller.status 也要清零,兼容仍读取它的代码
try:
self._app.ucd.status = False
except Exception: # noqa: BLE001
pass
self._log("UCD连接已断开", level="info")
# -- CA 连接 -------------------------------------------------
def connect_ca(self) -> bool:
"""打开 CA410。成功返回 True 并设置 ``app.ca``。"""
if self._app.ca is not None:
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
try:
ca = CASerail()
ca.open(self._app.config.device_config["ca_com"], 19200, 7, "E", 2)
if not ca.set_all_Display():
self._log(
f"端口 {self._app.config.device_config['ca_com']} 异常,色温仪连接失败",
level="error",
)
ca.close()
return False
ca.setSynchMode(3)
ca.setMeasureSpeed(1)
time.sleep(0.5)
ca.setZeroCalibration()
channel_value = self._app.ca_channel_var.get()
ca.setChannel(f"{int(channel_value):02d}")
self._app.ca = ca
self._bus.publish(ConnectionChanged(True, None))
return True
except Exception as exc: # noqa: BLE001
self._log(f"CA410 连接失败: {exc}", level="error")
return False
def disconnect_ca(self) -> None:
if self._app.ca is None:
return
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
self._bus.publish(ConnectionChanged(False, None))
self._log("CA连接已断开", level="info")
# -- 一次性入口 ----------------------------------------------
def check_all_async(self) -> None:
"""异步并联检测 UCD + CA通过 ``_dispatch_ui`` 回主线程更新 UI。"""
app = self._app
app.check_button.configure(state="disabled")
app.refresh_button.configure(state="disabled")
app.status_var.set("正在检测连接...")
app.root.update()
def worker():
try:
ucd_ok = self.connect_ucd(app.ucd_list_var.get())
app._dispatch_ui(
app.update_connection_indicator,
app.ucd_status_indicator, ucd_ok,
)
ca_ok = self.connect_ca()
app._dispatch_ui(
app.update_connection_indicator,
app.ca_status_indicator, ca_ok,
)
app._dispatch_ui(app.status_var.set, "连接检测完成")
app._dispatch_ui(self._enable_widgets)
except Exception as exc: # noqa: BLE001
app._dispatch_ui(app.log_gui.log, f"连接检测出错: {exc}")
app._dispatch_ui(self._enable_widgets)
threading.Thread(target=worker, daemon=True).start()
def disconnect_all(self) -> None:
try:
self.disconnect_ucd()
self.disconnect_ca()
self._enable_widgets()
self._app.ucd_status_indicator.config(bg="gray")
self._app.ca_status_indicator.config(bg="gray")
self._app.status_var.set("串口连接已断开")
except Exception as exc: # noqa: BLE001
self._log(f"断开连接时发生错误: {exc}", level="info")
messagebox.showerror("错误", f"断开连接失败: {exc}")
def refresh_ports(self) -> None:
"""刷新 UCD + COM 端口下拉框;指示器复位。"""
app = self._app
com_ports = self.list_com_ports()
ucd_list = self.list_ucd_devices()
if app.ucd_list_var.get() not in ucd_list:
app.ucd_list_var.set(ucd_list[0] if ucd_list else "")
app.ucd_list_combo.config(values=ucd_list)
if app.ca_com_var.get() not in com_ports:
app.ca_com_var.set(
com_ports[1] if len(com_ports) > 1 else (com_ports[0] if com_ports else "")
)
app.ca_com_combo.config(values=com_ports)
if hasattr(app, "ucd_status_indicator"):
app.ucd_status_indicator.config(bg="gray")
if hasattr(app, "ca_status_indicator"):
app.ca_status_indicator.config(bg="gray")
app.update_config()
# -- 内部 ----------------------------------------------------
def _enable_widgets(self) -> None:
self._app.check_button.configure(state="normal")
self._app.refresh_button.configure(state="normal")
def _log(self, msg: str, *, level: str = "info") -> None:
log_gui = getattr(self._app, "log_gui", None)
if log_gui is not None:
log_gui.log(msg, level=level)
# ─── 旧名字兼容层 ────────────────────────────────────────────────
# pqAutomationApp 类体仍以同名属性挂接这些函数;它们都委托给
# ``self.connection``。Phase 3 完成后这些 shim 可以连同类体的属性
# 挂接一并删除,让 GUI 直接调用 ``self.connection.xxx``。
def get_available_ucd_ports(self: "PQAutomationApp"):
return self.connection.list_ucd_devices()
def get_available_com_ports(self: "PQAutomationApp"):
return self.connection.list_com_ports()
def refresh_com_ports(self: "PQAutomationApp"):
self.connection.refresh_ports()
def check_com_connections(self: "PQAutomationApp"):
self.connection.check_all_async()
def update_connection_indicator(self: "PQAutomationApp", indicator, connected):
indicator.config(bg="green" if connected else "red")
def check_port_connection(self: "PQAutomationApp", is_ucd=True):
"""[已弃用] 旗参数反模式;保留仅为兼容旧调用点。"""
if is_ucd:
return self.connection.connect_ucd(self.ucd_list_var.get())
return self.connection.connect_ca()
def enable_com_widgets(self: "PQAutomationApp"):
self.connection._enable_widgets()
def disconnect_com_connections(self: "PQAutomationApp"):
self.connection.disconnect_all()
__all__ = [
"ConnectionController",
# 兼容层
"get_available_ucd_ports",
"get_available_com_ports",
"refresh_com_ports",
"check_com_connections",
"update_connection_indicator",
"check_port_connection",
"enable_com_widgets",
"disconnect_com_connections",
]
class DeviceConnectionMixin:
"""由 tools/refactor_to_mixins.py 自动生成。
把本模块的自由函数挂到 PQAutomationApp 上,便于 F12 跳转与类型推断。
"""
get_available_ucd_ports = get_available_ucd_ports
get_available_com_ports = get_available_com_ports
refresh_com_ports = refresh_com_ports
check_com_connections = check_com_connections
update_connection_indicator = update_connection_indicator
check_port_connection = check_port_connection
enable_com_widgets = enable_com_widgets
disconnect_com_connections = disconnect_com_connections