Files
pqAutomationApp/app/device/connection.py

388 lines
14 KiB
Python
Raw Normal View History

"""设备连接管理UCD323 / CA410
2026-05-24 11:21:30 +08:00
重构目标
---------
- :class:`ConnectionController` 类封装连接生命周期替代旧的
"把模块级函数当类方法装到 PQAutomationApp 上" 的写法
- 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式
分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径
- UCD 这一侧不再直接调用旧 ``UCDController``而是通过
:class:`UCD323Device` + :class:`EventBus`指示器更新由订阅
:class:`ConnectionChanged` 事件触发 GUI 解耦
模块底部仍保留 7 个旧名字函数作为薄兼容层``check_com_connections``
``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接
保证调用点按钮 command_dispatch_ui 引用零修改
2026-04-20 10:54:47 +08:00
"""
2026-05-24 11:21:30 +08:00
from __future__ import annotations
2026-04-20 10:54:47 +08:00
import threading
import time
from tkinter import messagebox
2026-05-24 11:21:30 +08:00
from typing import TYPE_CHECKING
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
from app.ucd_domain import ConnectionChanged, UcdError
2026-04-20 11:48:38 +08:00
from drivers.caSerail import CASerail
2026-05-24 11:21:30 +08:00
from drivers.ucd_driver import DeviceInfo
from app.views.modern_styles import get_theme_palette
2026-04-20 10:54:47 +08:00
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pqAutomationApp import PQAutomationApp
2026-05-24 11:21:30 +08:00
if TYPE_CHECKING:
from app.ucd_domain import EventBus
from drivers.ucd_driver import UCD323Device
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
# ─── ConnectionController ────────────────────────────────────────
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
class ConnectionController:
"""统一管理 CA410 / UCD323 的连接、断开与可用端口枚举。
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
设计要点
- 不持有 :mod:`tkinter` 控件所有 GUI 更新由订阅事件总线后回调完成
- UCD 连接经由 :class:`UCD323Device`自动发布
:class:`ConnectionChanged` 事件
- CA 连接同样发布 :class:`ConnectionChanged` 事件 ``serial=None``
指示器订阅时只看 ``connected`` 字段
"""
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def __init__(self, app):
self._app = app
self._bus: "EventBus" = app.event_bus
self._device: "UCD323Device" = app.ucd_device
# 旧 GUI 仍用 self.ca 直接访问 CA410 对象,过渡期保留同步赋值。
if not hasattr(app, "ca") or app.ca is None:
self._app.ca = None
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
# -- 端口枚举 ------------------------------------------------
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def list_ucd_devices(self) -> list[str]:
"""返回 SDK 给出的设备显示字符串列表。"""
try:
return self._device.raw_controller.search_device() or []
except Exception as exc: # noqa: BLE001
self._log(f"枚举 UCD 设备失败: {exc}", level="error")
return []
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def list_com_ports(self) -> list[str]:
2026-04-20 10:54:47 +08:00
try:
2026-05-24 11:21:30 +08:00
import serial.tools.list_ports
return [p.device for p in serial.tools.list_ports.comports()]
except Exception as exc: # noqa: BLE001
self._log(f"获取COM端口列表出错: {exc}", level="error")
return []
# -- UCD 连接 ------------------------------------------------
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def connect_ucd(self, display: str) -> bool:
"""打开指定 UCD 设备。成功返回 True。"""
if self._device.state.name != "CLOSED":
try:
self._device.close()
except Exception: # noqa: BLE001
pass
try:
self._device.open(DeviceInfo.parse(display))
return True
except UcdError as exc:
self._log(f"设备 {display} 异常UCD323 连接失败: {exc}", level="error")
return False
except Exception as exc: # noqa: BLE001
self._log(f"UCD323 连接异常: {exc}", level="error")
return False
def disconnect_ucd(self) -> None:
try:
self._device.close()
except Exception: # noqa: BLE001
pass
# 旧 controller.status 也要清零,兼容仍读取它的代码
try:
self._app.ucd.status = False
except Exception: # noqa: BLE001
pass
self._log("UCD连接已断开", level="info")
# -- CA 连接 -------------------------------------------------
def connect_ca(self) -> bool:
"""打开 CA410。成功返回 True 并设置 ``app.ca``。"""
if self._app.ca is not None:
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
try:
ca = CASerail()
ca.open(self._app.config.device_config["ca_com"], 19200, 7, "E", 2)
if not ca.set_all_Display():
self._log(
f"端口 {self._app.config.device_config['ca_com']} 异常,色温仪连接失败",
level="error",
)
ca.close()
return False
ca.setSynchMode(3)
ca.setMeasureSpeed(1)
time.sleep(0.5)
ca.setZeroCalibration()
channel_value = self._app.ca_channel_var.get()
ca.setChannel(f"{int(channel_value):02d}")
self._app.ca = ca
self._bus.publish(ConnectionChanged(True, None))
return True
except Exception as exc: # noqa: BLE001
self._log(f"CA410 连接失败: {exc}", level="error")
return False
def disconnect_ca(self) -> None:
if self._app.ca is None:
return
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
self._bus.publish(ConnectionChanged(False, None))
self._log("CA连接已断开", level="info")
# -- 一次性入口 ----------------------------------------------
def check_all_async(self) -> None:
"""异步并联检测 UCD + CA通过 ``_dispatch_ui`` 回主线程更新 UI。"""
app = self._app
app.check_button.configure(state="disabled")
app.refresh_button.configure(state="disabled")
app.status_var.set("正在检测连接...")
app.root.update()
def worker():
try:
ucd_ok = self.connect_ucd(app.ucd_list_var.get())
app._dispatch_ui(
app.update_connection_indicator,
app.ucd_status_indicator, ucd_ok,
)
ca_ok = self.connect_ca()
app._dispatch_ui(
app.update_connection_indicator,
app.ca_status_indicator, ca_ok,
)
app._dispatch_ui(app.status_var.set, "连接检测完成")
app._dispatch_ui(self._enable_widgets)
except Exception as exc: # noqa: BLE001
app._dispatch_ui(app.log_gui.log, f"连接检测出错: {exc}")
app._dispatch_ui(self._enable_widgets)
threading.Thread(target=worker, daemon=True).start()
def disconnect_all(self) -> None:
try:
self.disconnect_ucd()
self.disconnect_ca()
self._enable_widgets()
self._app.refresh_connection_indicators()
2026-05-24 11:21:30 +08:00
self._app.status_var.set("串口连接已断开")
except Exception as exc: # noqa: BLE001
self._log(f"断开连接时发生错误: {exc}", level="info")
messagebox.showerror("错误", f"断开连接失败: {exc}")
def refresh_ports(self) -> None:
"""刷新 UCD + COM 端口下拉框;指示器复位。"""
app = self._app
com_ports = self.list_com_ports()
ucd_list = self.list_ucd_devices()
if app.ucd_list_var.get() not in ucd_list:
app.ucd_list_var.set(ucd_list[0] if ucd_list else "")
app.ucd_list_combo.config(values=ucd_list)
if app.ca_com_var.get() not in com_ports:
app.ca_com_var.set(
com_ports[1] if len(com_ports) > 1 else (com_ports[0] if com_ports else "")
2026-04-20 10:54:47 +08:00
)
2026-05-24 11:21:30 +08:00
app.ca_com_combo.config(values=com_ports)
2026-04-20 10:54:47 +08:00
app.refresh_connection_indicators()
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
app.update_config()
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
# -- 内部 ----------------------------------------------------
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def _enable_widgets(self) -> None:
self._app.check_button.configure(state="normal")
self._app.refresh_button.configure(state="normal")
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def _log(self, msg: str, *, level: str = "info") -> None:
log_gui = getattr(self._app, "log_gui", None)
if log_gui is not None:
log_gui.log(msg, level=level)
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
# ─── 旧名字兼容层 ────────────────────────────────────────────────
# pqAutomationApp 类体仍以同名属性挂接这些函数;它们都委托给
# ``self.connection``。Phase 3 完成后这些 shim 可以连同类体的属性
# 挂接一并删除,让 GUI 直接调用 ``self.connection.xxx``。
2026-04-20 10:54:47 +08:00
def get_available_ucd_ports(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
return self.connection.list_ucd_devices()
2026-04-20 10:54:47 +08:00
def get_available_com_ports(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
return self.connection.list_com_ports()
2026-04-20 10:54:47 +08:00
def refresh_com_ports(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
self.connection.refresh_ports()
2026-04-20 10:54:47 +08:00
2026-05-24 11:21:30 +08:00
def check_com_connections(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
self.connection.check_all_async()
def update_connection_indicator(self: "PQAutomationApp", indicator, connected):
_draw_connection_indicator(indicator, "green" if connected else "red")
def refresh_connection_indicators(self: "PQAutomationApp"):
"""根据当前设备状态重画 UCD / CA 指示灯。"""
if hasattr(self, "ucd_status_indicator"):
ucd_connected = bool(getattr(self.ucd, "status", False))
_draw_connection_indicator(
self.ucd_status_indicator,
"green" if ucd_connected else "gray",
)
if hasattr(self, "ca_status_indicator"):
ca_connected = getattr(self, "ca", None) is not None
_draw_connection_indicator(
self.ca_status_indicator,
"green" if ca_connected else "gray",
)
def _draw_connection_indicator(canvas, state: str) -> None:
palette = get_theme_palette()
color_map = {
"green": "#2ECC71",
"red": "#E74C3C",
"gray": "#9AA3AD",
}
fill = color_map.get(state, state)
border = palette["border"]
bg = palette["card_bg"]
try:
canvas.configure(bg=bg, highlightbackground=border, highlightcolor=border)
canvas.delete("all")
# 保持原有视觉:方形状态灯(红/绿/灰)
canvas.create_rectangle(0, 0, 15, 15, fill=fill, outline=border, width=1)
except Exception:
try:
canvas.config(bg=fill)
except Exception:
pass
2026-05-24 11:21:30 +08:00
def check_port_connection(self: "PQAutomationApp", is_ucd=True):
2026-05-24 11:21:30 +08:00
"""[已弃用] 旗参数反模式;保留仅为兼容旧调用点。"""
if is_ucd:
return self.connection.connect_ucd(self.ucd_list_var.get())
return self.connection.connect_ca()
def enable_com_widgets(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
self.connection._enable_widgets()
2026-04-20 10:54:47 +08:00
def disconnect_com_connections(self: "PQAutomationApp"):
2026-05-24 11:21:30 +08:00
self.connection.disconnect_all()
def _get_ca_measure_lock(self: "PQAutomationApp"):
lock = getattr(self, "_ca_measure_lock", None)
if lock is None:
lock = threading.RLock()
self._ca_measure_lock = lock
return lock
def _read_ca_display(self: "PQAutomationApp", mode: int):
"""在锁内切换 CA410 Display 模式并立即读取,避免模式串扰。"""
if getattr(self, "ca", None) is None:
raise RuntimeError("请先连接 CA410 色度计")
with _get_ca_measure_lock(self):
self.ca.set_Display(mode)
return self.ca.readAllDisplay()
def read_ca_xyLv(self: "PQAutomationApp"):
"""读取 xy/Lv/XYZDisplay 0"""
return _read_ca_display(self, 0)
def read_ca_tcp_duv(self: "PQAutomationApp"):
"""读取 Tcp/duv/Lv/XYZDisplay 1"""
return _read_ca_display(self, 1)
def read_ca_uvLv(self: "PQAutomationApp"):
"""读取 u'/v'/Lv/XYZDisplay 5"""
return _read_ca_display(self, 5)
def read_ca_xyz(self: "PQAutomationApp"):
"""读取 XYZDisplay 7"""
return _read_ca_display(self, 7)
def read_ca_lambda_pe(self: "PQAutomationApp"):
"""读取 λd/Pe/Lv/XYZDisplay 8"""
return _read_ca_display(self, 8)
2026-05-24 11:21:30 +08:00
__all__ = [
"ConnectionController",
# 兼容层
"get_available_ucd_ports",
"get_available_com_ports",
"refresh_com_ports",
"check_com_connections",
"update_connection_indicator",
"refresh_connection_indicators",
2026-05-24 11:21:30 +08:00
"check_port_connection",
"enable_com_widgets",
"disconnect_com_connections",
]
class DeviceConnectionMixin:
"""由 tools/refactor_to_mixins.py 自动生成。
把本模块的自由函数挂到 PQAutomationApp 便于 F12 跳转与类型推断
"""
get_available_ucd_ports = get_available_ucd_ports
get_available_com_ports = get_available_com_ports
refresh_com_ports = refresh_com_ports
check_com_connections = check_com_connections
update_connection_indicator = update_connection_indicator
refresh_connection_indicators = refresh_connection_indicators
check_port_connection = check_port_connection
enable_com_widgets = enable_com_widgets
disconnect_com_connections = disconnect_com_connections
_get_ca_measure_lock = _get_ca_measure_lock
_read_ca_display = _read_ca_display
read_ca_xyLv = read_ca_xyLv
read_ca_tcp_duv = read_ca_tcp_duv
read_ca_uvLv = read_ca_uvLv
read_ca_xyz = read_ca_xyz
read_ca_lambda_pe = read_ca_lambda_pe