删除所有外部旧引用

This commit is contained in:
xinzhu.yin
2026-05-24 11:21:30 +08:00
parent 1b66fff35b
commit 29f7d39fe9
13 changed files with 319 additions and 237 deletions

View File

@@ -1,198 +1,280 @@
"""设备连接UCD323 / CA410相关逻辑Step 4 重构)
"""设备连接管理UCD323 / CA410
从 pqAutomationApp.PQAutomationApp 中搬迁。每个函数第一行 `self = app`
以保留原有 `self.xxx` 属性访问不变。
重构目标
---------
- 用 :class:`ConnectionController` 类封装连接生命周期,替代旧的
"把模块级函数当类方法装到 PQAutomationApp 上" 的写法。
- 拆掉 ``check_port_connection(is_ucd)`` 的布尔旗参数反模式,
分离为 ``connect_ucd`` / ``connect_ca`` 两条独立路径。
- UCD 这一侧不再直接调用旧 ``UCDController``,而是通过
:class:`UCD323Device` + :class:`EventBus`;指示器更新由订阅
:class:`ConnectionChanged` 事件触发,与 GUI 解耦。
模块底部仍保留 7 个旧名字函数作为薄兼容层(``check_com_connections``
等),供 ``pqAutomationApp.PQAutomationApp`` 类体继续以同名属性挂接,
保证调用点(按钮 command、_dispatch_ui 引用)零修改。
"""
from __future__ import annotations
import threading
import time
from tkinter import messagebox
from typing import TYPE_CHECKING
from app.ucd_domain import ConnectionChanged, UcdError
from drivers.caSerail import CASerail
from drivers.ucd_driver import DeviceInfo
if TYPE_CHECKING:
from app.ucd_domain import EventBus
from drivers.ucd_driver import UCD323Device
# ─── ConnectionController ────────────────────────────────────────
class ConnectionController:
"""统一管理 CA410 / UCD323 的连接、断开与可用端口枚举。
设计要点:
- 不持有 :mod:`tkinter` 控件;所有 GUI 更新由订阅事件总线后回调完成。
- UCD 连接经由 :class:`UCD323Device`,自动发布
:class:`ConnectionChanged` 事件。
- CA 连接同样发布 :class:`ConnectionChanged` 事件(带 ``serial=None``
指示器订阅时只看 ``connected`` 字段)。
"""
def __init__(self, app):
self._app = app
self._bus: "EventBus" = app.event_bus
self._device: "UCD323Device" = app.ucd_device
# 旧 GUI 仍用 self.ca 直接访问 CA410 对象,过渡期保留同步赋值。
if not hasattr(app, "ca") or app.ca is None:
self._app.ca = None
# -- 端口枚举 ------------------------------------------------
def list_ucd_devices(self) -> list[str]:
"""返回 SDK 给出的设备显示字符串列表。"""
try:
return self._device.raw_controller.search_device() or []
except Exception as exc: # noqa: BLE001
self._log(f"枚举 UCD 设备失败: {exc}", level="error")
return []
def list_com_ports(self) -> list[str]:
try:
import serial.tools.list_ports
return [p.device for p in serial.tools.list_ports.comports()]
except Exception as exc: # noqa: BLE001
self._log(f"获取COM端口列表出错: {exc}", level="error")
return []
# -- UCD 连接 ------------------------------------------------
def connect_ucd(self, display: str) -> bool:
"""打开指定 UCD 设备。成功返回 True。"""
if self._device.state.name != "CLOSED":
try:
self._device.close()
except Exception: # noqa: BLE001
pass
try:
self._device.open(DeviceInfo.parse(display))
return True
except UcdError as exc:
self._log(f"设备 {display} 异常UCD323 连接失败: {exc}", level="error")
return False
except Exception as exc: # noqa: BLE001
self._log(f"UCD323 连接异常: {exc}", level="error")
return False
def disconnect_ucd(self) -> None:
try:
self._device.close()
except Exception: # noqa: BLE001
pass
# 旧 controller.status 也要清零,兼容仍读取它的代码
try:
self._app.ucd.status = False
except Exception: # noqa: BLE001
pass
self._log("UCD连接已断开", level="info")
# -- CA 连接 -------------------------------------------------
def connect_ca(self) -> bool:
"""打开 CA410。成功返回 True 并设置 ``app.ca``。"""
if self._app.ca is not None:
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
try:
ca = CASerail()
ca.open(self._app.config.device_config["ca_com"], 19200, 7, "E", 2)
if not ca.set_all_Display():
self._log(
f"端口 {self._app.config.device_config['ca_com']} 异常,色温仪连接失败",
level="error",
)
ca.close()
return False
ca.setSynchMode(3)
ca.setMeasureSpeed(1)
time.sleep(0.5)
ca.setZeroCalibration()
channel_value = self._app.ca_channel_var.get()
ca.setChannel(f"{int(channel_value):02d}")
self._app.ca = ca
self._bus.publish(ConnectionChanged(True, None))
return True
except Exception as exc: # noqa: BLE001
self._log(f"CA410 连接失败: {exc}", level="error")
return False
def disconnect_ca(self) -> None:
if self._app.ca is None:
return
try:
self._app.ca.close()
except Exception: # noqa: BLE001
pass
self._app.ca = None
self._bus.publish(ConnectionChanged(False, None))
self._log("CA连接已断开", level="info")
# -- 一次性入口 ----------------------------------------------
def check_all_async(self) -> None:
"""异步并联检测 UCD + CA通过 ``_dispatch_ui`` 回主线程更新 UI。"""
app = self._app
app.check_button.configure(state="disabled")
app.refresh_button.configure(state="disabled")
app.status_var.set("正在检测连接...")
app.root.update()
def worker():
try:
ucd_ok = self.connect_ucd(app.ucd_list_var.get())
app._dispatch_ui(
app.update_connection_indicator,
app.ucd_status_indicator, ucd_ok,
)
ca_ok = self.connect_ca()
app._dispatch_ui(
app.update_connection_indicator,
app.ca_status_indicator, ca_ok,
)
app._dispatch_ui(app.status_var.set, "连接检测完成")
app._dispatch_ui(self._enable_widgets)
except Exception as exc: # noqa: BLE001
app._dispatch_ui(app.log_gui.log, f"连接检测出错: {exc}")
app._dispatch_ui(self._enable_widgets)
threading.Thread(target=worker, daemon=True).start()
def disconnect_all(self) -> None:
try:
self.disconnect_ucd()
self.disconnect_ca()
self._enable_widgets()
self._app.ucd_status_indicator.config(bg="gray")
self._app.ca_status_indicator.config(bg="gray")
self._app.status_var.set("串口连接已断开")
except Exception as exc: # noqa: BLE001
self._log(f"断开连接时发生错误: {exc}", level="info")
messagebox.showerror("错误", f"断开连接失败: {exc}")
def refresh_ports(self) -> None:
"""刷新 UCD + COM 端口下拉框;指示器复位。"""
app = self._app
com_ports = self.list_com_ports()
ucd_list = self.list_ucd_devices()
if app.ucd_list_var.get() not in ucd_list:
app.ucd_list_var.set(ucd_list[0] if ucd_list else "")
app.ucd_list_combo.config(values=ucd_list)
if app.ca_com_var.get() not in com_ports:
app.ca_com_var.set(
com_ports[1] if len(com_ports) > 1 else (com_ports[0] if com_ports else "")
)
app.ca_com_combo.config(values=com_ports)
if hasattr(app, "ucd_status_indicator"):
app.ucd_status_indicator.config(bg="gray")
if hasattr(app, "ca_status_indicator"):
app.ca_status_indicator.config(bg="gray")
app.update_config()
# -- 内部 ----------------------------------------------------
def _enable_widgets(self) -> None:
self._app.check_button.configure(state="normal")
self._app.refresh_button.configure(state="normal")
def _log(self, msg: str, *, level: str = "info") -> None:
log_gui = getattr(self._app, "log_gui", None)
if log_gui is not None:
log_gui.log(msg, level=level)
# ─── 旧名字兼容层 ────────────────────────────────────────────────
# pqAutomationApp 类体仍以同名属性挂接这些函数;它们都委托给
# ``self.connection``。Phase 3 完成后这些 shim 可以连同类体的属性
# 挂接一并删除,让 GUI 直接调用 ``self.connection.xxx``。
def get_available_ucd_ports(self):
"""获取可用的UCD端口列表"""
return self.ucd.search_device()
return self.connection.list_ucd_devices()
def get_available_com_ports(self):
"""获取可用的COM端口列表"""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
except Exception as e:
self.log_gui.log(f"获取COM端口列表出错: {e}", level="error")
return []
return self.connection.list_com_ports()
def refresh_com_ports(self):
"""刷新COM端口列表"""
available_ports = self.get_available_com_ports()
available_list = self.get_available_ucd_ports()
# 更新UCD列表的下拉框选项
ucd_list_current = self.ucd_list_var.get()
if ucd_list_current not in available_list:
self.ucd_list_var.set(available_list[0] if available_list else "")
self.ucd_list_combo.config(values=available_list)
# 更新CA端口的下拉框选项
ca_com_current = self.ca_com_var.get()
if ca_com_current not in available_ports:
self.ca_com_var.set(
available_ports[1]
if len(available_ports) > 1
else (available_ports[0] if available_ports else "")
)
self.ca_com_combo.config(values=available_ports)
# 重置连接状态指示器为灰色
if hasattr(self, "ucd_status_indicator"):
self.ucd_status_indicator.config(bg="gray")
if hasattr(self, "ca_status_indicator"):
self.ca_status_indicator.config(bg="gray")
self.update_config()
self.connection.refresh_ports()
def check_com_connections(self):
"""检测COM端口连接状态"""
# 禁用连接按钮,防止重复点击
self.check_button.configure(state="disabled")
self.refresh_button.configure(state="disabled")
# 更新状态栏
self.status_var.set("正在检测连接...")
self.root.update()
# 使用线程进行连接检测
def check_connections():
try:
# 检测TV连接
ucd_connected = self.check_port_connection(is_ucd=True)
self._dispatch_ui(
self.update_connection_indicator,
self.ucd_status_indicator, ucd_connected,
)
# 检测CA连接
ca_connected = self.check_port_connection(is_ucd=False)
self._dispatch_ui(
self.update_connection_indicator,
self.ca_status_indicator, ca_connected,
)
# 更新状态栏
self._dispatch_ui(self.status_var.set, "连接检测完成")
# 重新启用所有控件
self._dispatch_ui(self.enable_com_widgets)
except Exception as e:
self._dispatch_ui(self.log_gui.log, f"连接检测出错: {e}")
self._dispatch_ui(self.enable_com_widgets)
# 启动线程
threading.Thread(target=check_connections, daemon=True).start()
self.connection.check_all_async()
def update_connection_indicator(self, indicator, connected):
"""更新连接状态指示器颜色"""
if connected:
indicator.config(bg="green")
else:
indicator.config(bg="red")
indicator.config(bg="green" if connected else "red")
def check_port_connection(self, is_ucd=True):
"""检测指定端口是否可以连接"""
try:
if is_ucd:
if self.ucd.status:
try:
self.ucd.close()
except:
pass
if not self.ucd.open(self.ucd_list_var.get()):
self.log_gui.log(
f"设备 {self.ucd_list_var.get()} 异常UCD323连接失败"
, level="error")
return False
else:
return True
else:
# 如果CA对象已存在先关闭
if self.ca is not None:
try:
self.ca.close()
except:
pass
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
self.ca = CASerail()
self.ca.open(self.config.device_config["ca_com"], 19200, 7, "E", 2)
# data = self.ca.set_xyLv_Display()
data = self.ca.set_all_Display()
if data:
data = self.ca.setSynchMode(3)
data = self.ca.setMeasureSpeed(1)
if True:
time.sleep(0.5)
data = self.ca.setZeroCalibration()
channel_value = self.ca_channel_var.get()
str_channel = f"{int(channel_value):02d}"
data = self.ca.setChannel(str_channel)
return True
else:
self.log_gui.log(
f"端口 {self.config.device_config["ca_com"]} 异常,色温仪连接失败"
, level="error")
self.ca.close()
self.ca = None
return False
except Exception as e:
self.log_gui.log(f"端口连接失败: {e}", level="error")
return False
"""[已弃用] 旗参数反模式;保留仅为兼容旧调用点。"""
if is_ucd:
return self.connection.connect_ucd(self.ucd_list_var.get())
return self.connection.connect_ca()
def enable_com_widgets(self):
"""重新启用所有控件"""
self.check_button.configure(state="normal")
self.refresh_button.configure(state="normal")
self.connection._enable_widgets()
def disconnect_com_connections(self):
"""断开所有串口连接"""
try:
# 断开TV连接
if self.ucd.status:
try:
self.ucd.close()
except:
pass
finally:
self.ucd.status = False
self.log_gui.log("UCD连接已断开", level="info")
# 断开CA连接
if self.ca is not None:
try:
self.ca.close()
except:
pass
finally:
self.ca = None
self.log_gui.log("CA连接已断开", level="info")
# 重新启用相关控件
self.enable_com_widgets()
self.ucd_status_indicator.config(bg="gray")
self.ca_status_indicator.config(bg="gray")
self.status_var.set("串口连接已断开")
except Exception as e:
self.log_gui.log(f"断开连接时发生错误: {str(e)}", level="info")
messagebox.showerror("错误", f"断开连接失败: {str(e)}")
self.connection.disconnect_all()
__all__ = [
"ConnectionController",
# 兼容层
"get_available_ucd_ports",
"get_available_com_ports",
"refresh_com_ports",
"check_com_connections",
"update_connection_indicator",
"check_port_connection",
"enable_com_widgets",
"disconnect_com_connections",
]

View File

@@ -38,7 +38,7 @@ class PatternService:
f" Timing: {self.app.config.current_test_types[test_type]['timing']}",
"info",
)
self.app.ucd.set_ucd_params(active_config)
self.app.signal_service.apply_config(active_config)
elif test_type == "sdr_movie":
data_range = self.app.sdr_data_range_var.get()
@@ -60,7 +60,7 @@ class PatternService:
active_config = self.app.config.get_temp_config_with_converted_params(
mode=mode, converted_params=converted_params
)
self.app.ucd.set_ucd_params(active_config)
self.app.signal_service.apply_config(active_config)
success = self.app.signal_service.update_signal_format(
color_space=self.app.sdr_color_space_var.get(),
data_range=data_range,
@@ -92,7 +92,7 @@ class PatternService:
active_config = self.app.config.get_temp_config_with_converted_params(
mode=mode, converted_params=converted_params
)
self.app.ucd.set_ucd_params(active_config)
self.app.signal_service.apply_config(active_config)
success = self.app.signal_service.update_signal_format(
color_space=self.app.hdr_color_space_var.get(),
data_range=data_range,
@@ -123,7 +123,7 @@ class PatternService:
raise IndexError(f"pattern 索引越界: {index}")
pattern_param = session.pattern_params[index]
if not self.app.ucd.send_current_pattern_params(pattern_param):
if not self.app.signal_service.send_pattern_params(pattern_param):
raise RuntimeError(f"发送 pattern 失败: {index}")
return pattern_param

View File

@@ -176,6 +176,49 @@ class SignalService:
def current_resolution(self) -> tuple[int, int]:
return self._dev.current_resolution()
@property
def is_connected(self) -> bool:
"""UCD 设备是否已打开。供 GUI 做前置校验。"""
ctrl = getattr(self._dev, "raw_controller", None)
return bool(ctrl and getattr(ctrl, "status", False))
# -- 过渡期 APIPhase 5config 驱动的写入 -----------------
# 现有 GUI / Service 通过 ``PQConfig`` 对象描述当次测试参数,
# 由 :class:`UCDController.set_ucd_params` 翻译为色彩/Timing/Pattern。
# 在配置层重构落地前,这两个方法作为 SignalService 的统一入口,
# 让上层不再直接接触 ``self.app.ucd``。
def apply_config(self, config) -> bool:
"""按 :class:`PQConfig` 写入色彩 / Timing / 当前 Pattern不 apply 输出)。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_config 暂仅支持 UCD323Device")
with self._lock:
return bool(ctrl.set_ucd_params(config))
def send_pattern_params(self, params) -> bool:
"""以 ``params`` 更新当前 pattern 的参数并 apply。"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("send_pattern_params 暂仅支持 UCD323Device")
with self._lock:
return bool(ctrl.send_current_pattern_params(params))
def apply_and_run(self, config, pattern_params) -> bool:
"""``set_ucd_params`` + ``set_pattern`` + ``run`` 的复合操作。
服务于 custom_template_panel 单步流程。
"""
ctrl = getattr(self._dev, "raw_controller", None)
if ctrl is None:
raise UcdError("apply_and_run 暂仅支持 UCD323Device")
with self._lock:
if not ctrl.set_ucd_params(config):
return False
if not ctrl.set_pattern(ctrl.current_pattern, pattern_params):
return False
return bool(ctrl.run())
__all__ = [
"SignalService",

View File

@@ -1,7 +1,7 @@
"""Local Dimming 测试逻辑(应用层)。
整合自原 drivers/local_dimming_test.py窗口图片生成与测试主循环
直接落在本模块UCD 通用操作下沉到 drivers.ucd_helpers
直接落在本模块UCD 通用操作通过 SignalService 完成
"""
import atexit
@@ -18,7 +18,6 @@ from tkinter import filedialog, messagebox
import numpy as np
from PIL import Image
from drivers.ucd_helpers import get_current_resolution
# --------------------------------------------------------------------------
@@ -90,7 +89,7 @@ def _ensure_window_image(width, height, percentage):
def start_local_dimming_test(self):
"""开始 Local Dimming 测试。"""
if not self.ca or not self.ucd.status:
if not self.ca or not self.signal_service.is_connected:
messagebox.showerror("错误", "请先连接 CA410 和 UCD323")
return
@@ -111,7 +110,7 @@ def start_local_dimming_test(self):
log("开始 Local Dimming 测试", level="info")
log("=" * 60, level="separator")
width, height = get_current_resolution(self.ucd)
width, height = self.signal_service.current_resolution()
total = len(DEFAULT_WINDOW_PERCENTAGES)
log(f" 分辨率: {width}x{height}", level="info")
log(f" 测试窗口: {DEFAULT_WINDOW_PERCENTAGES}", level="info")
@@ -182,7 +181,7 @@ def stop_local_dimming_test(self):
def send_ld_window(self, percentage):
"""发送指定百分比的白色窗口(手动模式)。"""
if not self.ucd.status:
if not self.signal_service.is_connected:
messagebox.showwarning("警告", "请先连接 UCD323 设备")
return
@@ -190,7 +189,7 @@ def send_ld_window(self, percentage):
self.current_ld_percentage = percentage
def send():
width, height = get_current_resolution(self.ucd)
width, height = self.signal_service.current_resolution()
try:
image_path = _ensure_window_image(width, height, percentage)
except Exception as e:

View File

@@ -13,7 +13,8 @@ import ttkbootstrap as ttk
from PIL import Image, ImageTk
from app.services import ai_image as _svc
from drivers.ucd_helpers import get_current_resolution
# ---------------- 面板创建 ----------------
@@ -638,7 +639,7 @@ def _send_to_ucd(self):
# 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。
try:
target_w, target_h = get_current_resolution(ucd)
target_w, target_h = self.signal_service.current_resolution()
except Exception:
target_w, target_h = 3840, 2160
try:

View File

@@ -310,10 +310,8 @@ def _run_custom_row_single_step(self, item_id, row_no):
self._dispatch_ui(self.status_var.set, "单步测试失败:行号超范围")
return
self.ucd.set_ucd_params(temp_config)
pattern_param = converted_params[row_no - 1]
self.ucd.set_pattern(self.ucd.current_pattern, pattern_param)
self.ucd.run()
self.signal_service.apply_and_run(temp_config, pattern_param)
time.sleep(self.pattern_settle_time)

View File

@@ -205,7 +205,7 @@ def _start_pantone_baseline(self):
if self._pantone_running:
messagebox.showinfo("提示", "Pantone 任务正在执行")
return
if not getattr(self, "ucd", None) or not self.ucd.status:
if not self.signal_service.is_connected:
messagebox.showwarning("警告", "请先连接 UCD323")
return
if not getattr(self, "ca", None):
@@ -254,7 +254,7 @@ def _resume_pantone_baseline(self):
if not self._pantone_paused:
messagebox.showinfo("提示", "当前没有可继续的暂停任务")
return
if not getattr(self, "ucd", None) or not self.ucd.status:
if not self.signal_service.is_connected:
messagebox.showwarning("警告", "请先连接 UCD323")
return
if not getattr(self, "ca", None):

View File

@@ -13,7 +13,8 @@ from tkinter import filedialog, messagebox
import ttkbootstrap as ttk
from PIL import Image
from drivers.ucd_helpers import get_current_resolution
_DEFAULT_SAMPLES = [
@@ -387,9 +388,9 @@ def _format_float(value):
def _build_color_patch(self, hex_value):
if not getattr(self, "ucd", None) or not self.ucd.status:
if not self.signal_service.is_connected:
raise RuntimeError("请先连接 UCD323 设备")
width, height = get_current_resolution(self.ucd)
width, height = self.signal_service.current_resolution()
rgb = tuple(int(hex_value[i:i + 2], 16) for i in (1, 3, 5))
temp_dir = os.path.join(tempfile.gettempdir(), "pq_single_step_patches")
os.makedirs(temp_dir, exist_ok=True)