Files

199 lines
6.3 KiB
Python
Raw Permalink Normal View History

2026-04-21 15:31:48 +08:00
"""设备连接UCD323 / CA410相关逻辑Step 4 重构)。
2026-04-20 10:54:47 +08:00
pqAutomationApp.PQAutomationApp 中搬迁每个函数第一行 `self = app`
以保留原有 `self.xxx` 属性访问不变
"""
import threading
import time
from tkinter import messagebox
2026-04-20 11:48:38 +08:00
from drivers.caSerail import CASerail
2026-04-20 10:54:47 +08:00
def get_available_ucd_ports(self):
2026-04-20 10:54:47 +08:00
"""获取可用的UCD端口列表"""
return self.ucd.search_device()
def get_available_com_ports(self):
2026-04-20 10:54:47 +08:00
"""获取可用的COM端口列表"""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
except Exception as e:
2026-04-21 15:31:48 +08:00
self.log_gui.log(f"获取COM端口列表出错: {e}", level="error")
2026-04-20 10:54:47 +08:00
return []
def refresh_com_ports(self):
2026-04-20 10:54:47 +08:00
"""刷新COM端口列表"""
available_ports = self.get_available_com_ports()
available_list = self.get_available_ucd_ports()
# 更新UCD列表的下拉框选项
ucd_list_current = self.ucd_list_var.get()
if ucd_list_current not in available_list:
self.ucd_list_var.set(available_list[0] if available_list else "")
self.ucd_list_combo.config(values=available_list)
# 更新CA端口的下拉框选项
ca_com_current = self.ca_com_var.get()
if ca_com_current not in available_ports:
self.ca_com_var.set(
available_ports[1]
if len(available_ports) > 1
else (available_ports[0] if available_ports else "")
)
self.ca_com_combo.config(values=available_ports)
# 重置连接状态指示器为灰色
if hasattr(self, "ucd_status_indicator"):
self.ucd_status_indicator.config(bg="gray")
if hasattr(self, "ca_status_indicator"):
self.ca_status_indicator.config(bg="gray")
self.update_config()
def check_com_connections(self):
2026-04-20 10:54:47 +08:00
"""检测COM端口连接状态"""
# 禁用连接按钮,防止重复点击
self.check_button.configure(state="disabled")
self.refresh_button.configure(state="disabled")
# 更新状态栏
self.status_var.set("正在检测连接...")
self.root.update()
# 使用线程进行连接检测
def check_connections():
try:
# 检测TV连接
ucd_connected = self.check_port_connection(is_ucd=True)
2026-04-20 15:34:45 +08:00
self._dispatch_ui(
self.update_connection_indicator,
self.ucd_status_indicator, ucd_connected,
2026-04-20 10:54:47 +08:00
)
# 检测CA连接
ca_connected = self.check_port_connection(is_ucd=False)
2026-04-20 15:34:45 +08:00
self._dispatch_ui(
self.update_connection_indicator,
self.ca_status_indicator, ca_connected,
2026-04-20 10:54:47 +08:00
)
# 更新状态栏
2026-04-20 15:34:45 +08:00
self._dispatch_ui(self.status_var.set, "连接检测完成")
2026-04-20 10:54:47 +08:00
# 重新启用所有控件
2026-04-20 15:34:45 +08:00
self._dispatch_ui(self.enable_com_widgets)
2026-04-20 10:54:47 +08:00
except Exception as e:
2026-04-20 15:34:45 +08:00
self._dispatch_ui(self.log_gui.log, f"连接检测出错: {e}")
self._dispatch_ui(self.enable_com_widgets)
2026-04-20 10:54:47 +08:00
# 启动线程
threading.Thread(target=check_connections, daemon=True).start()
def update_connection_indicator(self, indicator, connected):
2026-04-20 10:54:47 +08:00
"""更新连接状态指示器颜色"""
if connected:
indicator.config(bg="green")
else:
indicator.config(bg="red")
def check_port_connection(self, is_ucd=True):
2026-04-20 10:54:47 +08:00
"""检测指定端口是否可以连接"""
try:
if is_ucd:
if self.ucd.status:
try:
self.ucd.close()
except:
pass
if not self.ucd.open(self.ucd_list_var.get()):
self.log_gui.log(
f"设备 {self.ucd_list_var.get()} 异常UCD323连接失败"
2026-04-21 15:31:48 +08:00
, level="error")
2026-04-20 10:54:47 +08:00
return False
else:
return True
else:
# 如果CA对象已存在先关闭
if self.ca is not None:
try:
self.ca.close()
except:
pass
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
self.ca = CASerail()
self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2)
# data = self.ca.set_xyLv_Display()
data = self.ca.set_all_Display()
if data:
data = self.ca.setSynchMode(3)
data = self.ca.setMeasureSpeed(1)
if True:
time.sleep(0.5)
data = self.ca.setZeroCalibration()
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
data = self.ca.setChannel(str_channel)
return True
else:
self.log_gui.log(
f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败"
2026-04-21 15:31:48 +08:00
, level="error")
2026-04-20 10:54:47 +08:00
self.ca.close()
self.ca = None
return False
except Exception as e:
2026-04-21 15:31:48 +08:00
self.log_gui.log(f"端口连接失败: {e}", level="error")
2026-04-20 10:54:47 +08:00
return False
def enable_com_widgets(self):
2026-04-20 10:54:47 +08:00
"""重新启用所有控件"""
self.check_button.configure(state="normal")
self.refresh_button.configure(state="normal")
def disconnect_com_connections(self):
2026-04-20 10:54:47 +08:00
"""断开所有串口连接"""
try:
# 断开TV连接
if self.ucd.status:
try:
self.ucd.close()
except:
pass
finally:
self.ucd.status = False
2026-04-21 15:31:48 +08:00
self.log_gui.log("UCD连接已断开", level="info")
2026-04-20 10:54:47 +08:00
# 断开CA连接
if self.ca is not None:
try:
self.ca.close()
except:
pass
finally:
self.ca = None
2026-04-21 15:31:48 +08:00
self.log_gui.log("CA连接已断开", level="info")
2026-04-20 10:54:47 +08:00
# 重新启用相关控件
self.enable_com_widgets()
self.ucd_status_indicator.config(bg="gray")
self.ca_status_indicator.config(bg="gray")
self.status_var.set("串口连接已断开")
except Exception as e:
2026-04-21 15:31:48 +08:00
self.log_gui.log(f"断开连接时发生错误: {str(e)}", level="info")
2026-04-20 10:54:47 +08:00
messagebox.showerror("错误", f"断开连接失败: {str(e)}")