Files
pqAutomationApp/app/device/connection.py
2026-04-21 15:31:48 +08:00

199 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""设备连接UCD323 / CA410相关逻辑Step 4 重构)。
从 pqAutomationApp.PQAutomationApp 中搬迁。每个函数第一行 `self = app`
以保留原有 `self.xxx` 属性访问不变。
"""
import threading
import time
from tkinter import messagebox
from drivers.caSerail import CASerail
def get_available_ucd_ports(self):
"""获取可用的UCD端口列表"""
return self.ucd.search_device()
def get_available_com_ports(self):
"""获取可用的COM端口列表"""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
except Exception as e:
self.log_gui.log(f"获取COM端口列表出错: {e}", level="error")
return []
def refresh_com_ports(self):
"""刷新COM端口列表"""
available_ports = self.get_available_com_ports()
available_list = self.get_available_ucd_ports()
# 更新UCD列表的下拉框选项
ucd_list_current = self.ucd_list_var.get()
if ucd_list_current not in available_list:
self.ucd_list_var.set(available_list[0] if available_list else "")
self.ucd_list_combo.config(values=available_list)
# 更新CA端口的下拉框选项
ca_com_current = self.ca_com_var.get()
if ca_com_current not in available_ports:
self.ca_com_var.set(
available_ports[1]
if len(available_ports) > 1
else (available_ports[0] if available_ports else "")
)
self.ca_com_combo.config(values=available_ports)
# 重置连接状态指示器为灰色
if hasattr(self, "ucd_status_indicator"):
self.ucd_status_indicator.config(bg="gray")
if hasattr(self, "ca_status_indicator"):
self.ca_status_indicator.config(bg="gray")
self.update_config()
def check_com_connections(self):
"""检测COM端口连接状态"""
# 禁用连接按钮,防止重复点击
self.check_button.configure(state="disabled")
self.refresh_button.configure(state="disabled")
# 更新状态栏
self.status_var.set("正在检测连接...")
self.root.update()
# 使用线程进行连接检测
def check_connections():
try:
# 检测TV连接
ucd_connected = self.check_port_connection(is_ucd=True)
self._dispatch_ui(
self.update_connection_indicator,
self.ucd_status_indicator, ucd_connected,
)
# 检测CA连接
ca_connected = self.check_port_connection(is_ucd=False)
self._dispatch_ui(
self.update_connection_indicator,
self.ca_status_indicator, ca_connected,
)
# 更新状态栏
self._dispatch_ui(self.status_var.set, "连接检测完成")
# 重新启用所有控件
self._dispatch_ui(self.enable_com_widgets)
except Exception as e:
self._dispatch_ui(self.log_gui.log, f"连接检测出错: {e}")
self._dispatch_ui(self.enable_com_widgets)
# 启动线程
threading.Thread(target=check_connections, daemon=True).start()
def update_connection_indicator(self, indicator, connected):
"""更新连接状态指示器颜色"""
if connected:
indicator.config(bg="green")
else:
indicator.config(bg="red")
def check_port_connection(self, is_ucd=True):
"""检测指定端口是否可以连接"""
try:
if is_ucd:
if self.ucd.status:
try:
self.ucd.close()
except:
pass
if not self.ucd.open(self.ucd_list_var.get()):
self.log_gui.log(
f"设备 {self.ucd_list_var.get()} 异常UCD323连接失败"
, level="error")
return False
else:
return True
else:
# 如果CA对象已存在先关闭
if self.ca is not None:
try:
self.ca.close()
except:
pass
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
self.ca = CASerail()
self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2)
# data = self.ca.set_xyLv_Display()
data = self.ca.set_all_Display()
if data:
data = self.ca.setSynchMode(3)
data = self.ca.setMeasureSpeed(1)
if True:
time.sleep(0.5)
data = self.ca.setZeroCalibration()
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
data = self.ca.setChannel(str_channel)
return True
else:
self.log_gui.log(
f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败"
, level="error")
self.ca.close()
self.ca = None
return False
except Exception as e:
self.log_gui.log(f"端口连接失败: {e}", level="error")
return False
def enable_com_widgets(self):
"""重新启用所有控件"""
self.check_button.configure(state="normal")
self.refresh_button.configure(state="normal")
def disconnect_com_connections(self):
"""断开所有串口连接"""
try:
# 断开TV连接
if self.ucd.status:
try:
self.ucd.close()
except:
pass
finally:
self.ucd.status = False
self.log_gui.log("UCD连接已断开", level="info")
# 断开CA连接
if self.ca is not None:
try:
self.ca.close()
except:
pass
finally:
self.ca = None
self.log_gui.log("CA连接已断开", level="info")
# 重新启用相关控件
self.enable_com_widgets()
self.ucd_status_indicator.config(bg="gray")
self.ca_status_indicator.config(bg="gray")
self.status_var.set("串口连接已断开")
except Exception as e:
self.log_gui.log(f"断开连接时发生错误: {str(e)}", level="info")
messagebox.showerror("错误", f"断开连接失败: {str(e)}")