Files
pqAutomationApp/app/device/connection.py

211 lines
6.4 KiB
Python
Raw Normal View History

2026-04-20 10:54:47 +08:00
"""设备连接UCD323 / CA410相关逻辑Step 4 重构)。
pqAutomationApp.PQAutomationApp 中搬迁每个函数第一行 `self = app`
以保留原有 `self.xxx` 属性访问不变
"""
import threading
import time
from tkinter import messagebox
from utils.caSerail import CASerail
def get_available_ucd_ports(app):
"""获取可用的UCD端口列表"""
self = app
return self.ucd.search_device()
def get_available_com_ports(app):
"""获取可用的COM端口列表"""
self = app
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
except Exception as e:
self.log_gui.log(f"获取COM端口列表出错: {e}")
return []
def refresh_com_ports(app):
"""刷新COM端口列表"""
self = app
available_ports = self.get_available_com_ports()
available_list = self.get_available_ucd_ports()
# 更新UCD列表的下拉框选项
ucd_list_current = self.ucd_list_var.get()
if ucd_list_current not in available_list:
self.ucd_list_var.set(available_list[0] if available_list else "")
self.ucd_list_combo.config(values=available_list)
# 更新CA端口的下拉框选项
ca_com_current = self.ca_com_var.get()
if ca_com_current not in available_ports:
self.ca_com_var.set(
available_ports[1]
if len(available_ports) > 1
else (available_ports[0] if available_ports else "")
)
self.ca_com_combo.config(values=available_ports)
# 重置连接状态指示器为灰色
if hasattr(self, "ucd_status_indicator"):
self.ucd_status_indicator.config(bg="gray")
if hasattr(self, "ca_status_indicator"):
self.ca_status_indicator.config(bg="gray")
self.update_config()
def check_com_connections(app):
"""检测COM端口连接状态"""
self = app
# 禁用连接按钮,防止重复点击
self.check_button.configure(state="disabled")
self.refresh_button.configure(state="disabled")
# 更新状态栏
self.status_var.set("正在检测连接...")
self.root.update()
# 使用线程进行连接检测
def check_connections():
try:
# 检测TV连接
ucd_connected = self.check_port_connection(is_ucd=True)
self.root.after(
0,
lambda: self.update_connection_indicator(
self.ucd_status_indicator, ucd_connected
),
)
# 检测CA连接
ca_connected = self.check_port_connection(is_ucd=False)
self.root.after(
0,
lambda: self.update_connection_indicator(
self.ca_status_indicator, ca_connected
),
)
# 更新状态栏
self.root.after(0, lambda: self.status_var.set("连接检测完成"))
# 重新启用所有控件
self.root.after(0, self.enable_com_widgets)
except Exception as e:
self.root.after(0, lambda: self.log_gui.log(f"连接检测出错: {e}"))
self.root.after(0, self.enable_com_widgets)
# 启动线程
threading.Thread(target=check_connections, daemon=True).start()
def update_connection_indicator(app, indicator, connected):
"""更新连接状态指示器颜色"""
self = app
if connected:
indicator.config(bg="green")
else:
indicator.config(bg="red")
def check_port_connection(app, is_ucd=True):
"""检测指定端口是否可以连接"""
self = app
try:
if is_ucd:
if self.ucd.status:
try:
self.ucd.close()
except:
pass
if not self.ucd.open(self.ucd_list_var.get()):
self.log_gui.log(
f"设备 {self.ucd_list_var.get()} 异常UCD323连接失败"
)
return False
else:
return True
else:
# 如果CA对象已存在先关闭
if self.ca is not None:
try:
self.ca.close()
except:
pass
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
self.ca = CASerail()
self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2)
# data = self.ca.set_xyLv_Display()
data = self.ca.set_all_Display()
if data:
data = self.ca.setSynchMode(3)
data = self.ca.setMeasureSpeed(1)
if True:
time.sleep(0.5)
data = self.ca.setZeroCalibration()
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
data = self.ca.setChannel(str_channel)
return True
else:
self.log_gui.log(
f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败"
)
self.ca.close()
self.ca = None
return False
except Exception as e:
self.log_gui.log(f"端口连接失败: {e}")
return False
def enable_com_widgets(app):
"""重新启用所有控件"""
self = app
self.check_button.configure(state="normal")
self.refresh_button.configure(state="normal")
def disconnect_com_connections(app):
"""断开所有串口连接"""
self = app
try:
# 断开TV连接
if self.ucd.status:
try:
self.ucd.close()
except:
pass
finally:
self.ucd.status = False
self.log_gui.log("UCD连接已断开")
# 断开CA连接
if self.ca is not None:
try:
self.ca.close()
except:
pass
finally:
self.ca = None
self.log_gui.log("CA连接已断开")
# 重新启用相关控件
self.enable_com_widgets()
self.ucd_status_indicator.config(bg="gray")
self.ca_status_indicator.config(bg="gray")
self.status_var.set("串口连接已断开")
except Exception as e:
self.log_gui.log(f"断开连接时发生错误: {str(e)}")
messagebox.showerror("错误", f"断开连接失败: {str(e)}")