199 lines
6.2 KiB
Python
199 lines
6.2 KiB
Python
"""设备连接(UCD323 / CA410)相关逻辑(Step 4 重构)。
|
||
|
||
从 pqAutomationApp.PQAutomationApp 中搬迁。每个函数第一行 `self = app`
|
||
以保留原有 `self.xxx` 属性访问不变。
|
||
"""
|
||
|
||
import threading
|
||
import time
|
||
from tkinter import messagebox
|
||
|
||
from drivers.caSerail import CASerail
|
||
|
||
def get_available_ucd_ports(self):
|
||
"""获取可用的UCD端口列表"""
|
||
return self.ucd.search_device()
|
||
|
||
|
||
def get_available_com_ports(self):
|
||
"""获取可用的COM端口列表"""
|
||
try:
|
||
import serial.tools.list_ports
|
||
|
||
ports = serial.tools.list_ports.comports()
|
||
return [port.device for port in ports]
|
||
except Exception as e:
|
||
self.log_gui.log(f"获取COM端口列表出错: {e}")
|
||
return []
|
||
|
||
|
||
def refresh_com_ports(self):
|
||
"""刷新COM端口列表"""
|
||
available_ports = self.get_available_com_ports()
|
||
available_list = self.get_available_ucd_ports()
|
||
|
||
# 更新UCD列表的下拉框选项
|
||
ucd_list_current = self.ucd_list_var.get()
|
||
if ucd_list_current not in available_list:
|
||
self.ucd_list_var.set(available_list[0] if available_list else "")
|
||
self.ucd_list_combo.config(values=available_list)
|
||
|
||
# 更新CA端口的下拉框选项
|
||
ca_com_current = self.ca_com_var.get()
|
||
if ca_com_current not in available_ports:
|
||
self.ca_com_var.set(
|
||
available_ports[1]
|
||
if len(available_ports) > 1
|
||
else (available_ports[0] if available_ports else "")
|
||
)
|
||
self.ca_com_combo.config(values=available_ports)
|
||
|
||
# 重置连接状态指示器为灰色
|
||
if hasattr(self, "ucd_status_indicator"):
|
||
self.ucd_status_indicator.config(bg="gray")
|
||
if hasattr(self, "ca_status_indicator"):
|
||
self.ca_status_indicator.config(bg="gray")
|
||
|
||
self.update_config()
|
||
|
||
|
||
def check_com_connections(self):
|
||
"""检测COM端口连接状态"""
|
||
# 禁用连接按钮,防止重复点击
|
||
self.check_button.configure(state="disabled")
|
||
self.refresh_button.configure(state="disabled")
|
||
|
||
# 更新状态栏
|
||
self.status_var.set("正在检测连接...")
|
||
self.root.update()
|
||
|
||
# 使用线程进行连接检测
|
||
def check_connections():
|
||
try:
|
||
# 检测TV连接
|
||
ucd_connected = self.check_port_connection(is_ucd=True)
|
||
self._dispatch_ui(
|
||
self.update_connection_indicator,
|
||
self.ucd_status_indicator, ucd_connected,
|
||
)
|
||
|
||
# 检测CA连接
|
||
ca_connected = self.check_port_connection(is_ucd=False)
|
||
self._dispatch_ui(
|
||
self.update_connection_indicator,
|
||
self.ca_status_indicator, ca_connected,
|
||
)
|
||
|
||
# 更新状态栏
|
||
self._dispatch_ui(self.status_var.set, "连接检测完成")
|
||
|
||
# 重新启用所有控件
|
||
self._dispatch_ui(self.enable_com_widgets)
|
||
except Exception as e:
|
||
self._dispatch_ui(self.log_gui.log, f"连接检测出错: {e}")
|
||
self._dispatch_ui(self.enable_com_widgets)
|
||
|
||
# 启动线程
|
||
threading.Thread(target=check_connections, daemon=True).start()
|
||
|
||
|
||
def update_connection_indicator(self, indicator, connected):
|
||
"""更新连接状态指示器颜色"""
|
||
if connected:
|
||
indicator.config(bg="green")
|
||
else:
|
||
indicator.config(bg="red")
|
||
|
||
|
||
def check_port_connection(self, is_ucd=True):
|
||
"""检测指定端口是否可以连接"""
|
||
try:
|
||
if is_ucd:
|
||
if self.ucd.status:
|
||
try:
|
||
self.ucd.close()
|
||
except:
|
||
pass
|
||
if not self.ucd.open(self.ucd_list_var.get()):
|
||
self.log_gui.log(
|
||
f"设备 {self.ucd_list_var.get()} 异常,UCD323连接失败"
|
||
)
|
||
return False
|
||
else:
|
||
return True
|
||
else:
|
||
# 如果CA对象已存在,先关闭
|
||
if self.ca is not None:
|
||
try:
|
||
self.ca.close()
|
||
except:
|
||
pass
|
||
channel_value = self.ca_channel_var.get()
|
||
str_channel = f"{int(channel_value):02d}"
|
||
self.ca = CASerail()
|
||
self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2)
|
||
# data = self.ca.set_xyLv_Display()
|
||
data = self.ca.set_all_Display()
|
||
if data:
|
||
data = self.ca.setSynchMode(3)
|
||
data = self.ca.setMeasureSpeed(1)
|
||
if True:
|
||
time.sleep(0.5)
|
||
data = self.ca.setZeroCalibration()
|
||
channel_value = self.ca_channel_var.get()
|
||
str_channel = f"{int(channel_value):02d}"
|
||
data = self.ca.setChannel(str_channel)
|
||
return True
|
||
else:
|
||
self.log_gui.log(
|
||
f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败"
|
||
)
|
||
self.ca.close()
|
||
self.ca = None
|
||
return False
|
||
except Exception as e:
|
||
self.log_gui.log(f"端口连接失败: {e}")
|
||
return False
|
||
|
||
|
||
def enable_com_widgets(self):
|
||
"""重新启用所有控件"""
|
||
self.check_button.configure(state="normal")
|
||
self.refresh_button.configure(state="normal")
|
||
|
||
|
||
def disconnect_com_connections(self):
|
||
"""断开所有串口连接"""
|
||
try:
|
||
# 断开TV连接
|
||
if self.ucd.status:
|
||
try:
|
||
self.ucd.close()
|
||
except:
|
||
pass
|
||
finally:
|
||
self.ucd.status = False
|
||
self.log_gui.log("UCD连接已断开")
|
||
|
||
# 断开CA连接
|
||
if self.ca is not None:
|
||
try:
|
||
self.ca.close()
|
||
except:
|
||
pass
|
||
finally:
|
||
self.ca = None
|
||
self.log_gui.log("CA连接已断开")
|
||
|
||
# 重新启用相关控件
|
||
self.enable_com_widgets()
|
||
self.ucd_status_indicator.config(bg="gray")
|
||
self.ca_status_indicator.config(bg="gray")
|
||
self.status_var.set("串口连接已断开")
|
||
|
||
except Exception as e:
|
||
self.log_gui.log(f"断开连接时发生错误: {str(e)}")
|
||
messagebox.showerror("错误", f"断开连接失败: {str(e)}")
|
||
|
||
|