diff --git a/app/logging_setup.py b/app/logging_setup.py index 465bbb2..98b9a9c 100644 --- a/app/logging_setup.py +++ b/app/logging_setup.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import os +import threading from datetime import datetime from typing import Optional @@ -93,6 +94,11 @@ class TkLogHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: # noqa: D401 if getattr(record, _FROM_GUI_FLAG, False): return + # Tkinter widgets are not thread-safe. Forwarding background-thread logs + # into GUI controls may block/hang the worker thread. Keep those logs in + # file handlers only, and only mirror main-thread logs to GUI. + if threading.current_thread() is not threading.main_thread(): + return try: message = self.format(record) except Exception: diff --git a/app/plots/gamut_background.py b/app/plots/gamut_background.py index e53c871..97c811c 100644 --- a/app/plots/gamut_background.py +++ b/app/plots/gamut_background.py @@ -104,6 +104,7 @@ def _render_chromaticity(kind: str, bbox: _BBox) -> np.ndarray: fig.canvas.draw() # 从 canvas 抓取 RGBA 数组 buf = np.asarray(fig.canvas.buffer_rgba()).copy() + buf = np.flipud(buf) plt.close(fig) try: diff --git a/app/plots/plot_accuracy.py b/app/plots/plot_accuracy.py index 6ffb9d1..1ea2994 100644 --- a/app/plots/plot_accuracy.py +++ b/app/plots/plot_accuracy.py @@ -9,9 +9,12 @@ from typing import TYPE_CHECKING from matplotlib.patches import Rectangle from matplotlib.lines import Line2D +from matplotlib.patches import Circle from app.plots.gamut_background import get_cie1976_background from app.tests.color_accuracy import get_accuracy_color_standards +from app.pq.color_patch_map import get_patch_color +from app.pq.color_patch_map import get_patch_color_from_xy if TYPE_CHECKING: from pqAutomationApp import PQAutomationApp @@ -55,14 +58,6 @@ _COLOR_MAP = { } -def _grade_color(delta_e: float) -> str: - if delta_e < 3: - return "#1FAE45" # 绿 - if delta_e < 5: - return "#E08A00" # 橙 - return "#D81B1B" # 红 - - def _xy_to_uv(x: float, y: float): """CIE 1931 xy → CIE 1976 u'v'""" denom = -2.0 * x + 12.0 * y + 3.0 @@ -76,7 +71,7 @@ def _xy_to_uv(x: float, y: float): # ============================================================ def _draw_left_panel(ax, color_patches, delta_e_values, font_scale=1.0, dark_mode=False): - """左侧仅保留大条形图。""" + """左侧仅保留大条形图""" ax.clear() n = len(color_patches) @@ -86,15 +81,14 @@ def _draw_left_panel(ax, color_patches, delta_e_values, font_scale=1.0, dark_mod y_pos = list(range(n)) bar_colors = [_COLOR_MAP.get(name, "#888888") for name in color_patches] - edge_colors = [_grade_color(dE) for dE in delta_e_values] ax.barh( y_pos, delta_e_values, height=0.72, color=bar_colors, - edgecolor=edge_colors, - linewidth=1.0, + edgecolor="#202020", + linewidth=0.5, zorder=3, ) @@ -129,9 +123,12 @@ def _draw_uv_diagram(ax, color_patches, measurements, standards, font_scale=1.0, bg, bbox = get_cie1976_background() xmin, xmax, ymin, ymax = bbox ax.imshow( - bg, extent=(xmin, xmax, ymin, ymax), - origin="upper", interpolation="bicubic", - zorder=0, aspect="auto", + bg, + extent=(xmin, xmax, ymin, ymax), + origin="lower", + interpolation="bicubic", + zorder=0, + aspect="auto", ) ax.set_xlim(xmin, xmax) ax.set_ylim(ymin, ymax) @@ -145,73 +142,122 @@ def _draw_uv_diagram(ax, color_patches, measurements, standards, font_scale=1.0, legend_label_color = "#FFF" if dark_mode else "#111" legend_bg = "#111" if dark_mode else "#FFFFFF" legend_edge = "#FFF" if dark_mode else "#333" - outer_edge = "#FFFFFF" if dark_mode else "#333333" + outer_edge = "#FFFFFF" if dark_mode else "#222222" ax.set_facecolor("#000" if dark_mode else "#FFFFFF") ax.set_aspect("equal", adjustable="box") - ax.set_title("CIE 1976 u'v'", fontsize=max(8, 11 * font_scale), fontweight="bold", - color=text_color, pad=4) + + ax.set_title( + "CIE 1976 u'v'", + fontsize=max(8, 11 * font_scale), + fontweight="bold", + color=text_color, + pad=4, + ) + ax.set_xlabel("u'", fontsize=max(7, 9 * font_scale), color=sub_text_color, labelpad=1) ax.set_ylabel("v'", fontsize=max(7, 9 * font_scale), color=sub_text_color, labelpad=1) + ax.tick_params(axis="both", labelsize=max(6, 8 * font_scale), colors=tick_color) + for sp in ax.spines.values(): sp.set_color(outer_edge) sp.set_linewidth(0.9) for name, meas in zip(color_patches, measurements): + if meas is None or len(meas) < 2: continue + mx, my = meas[0], meas[1] sxy = standards.get(name) + if sxy is None: continue + sx, sy = sxy m_u, m_v = _xy_to_uv(mx, my) s_u, s_v = _xy_to_uv(sx, sy) - face = _COLOR_MAP.get(name, "#FFFFFF") + # face = get_patch_color_from_xy(name, (sx, sy)).strip().upper() + face = _COLOR_MAP.get(name, "#888888") + print(name, face) - # 目标点:仅空心方框(不填充标准颜色) + # 目标点(Target) 空心方框 ax.scatter( - [s_u], [s_v], - s=56, marker="s", - facecolors="none", edgecolors=outer_edge, - linewidths=1.25, zorder=18, + s_u, + s_v, + s=90, + marker="s", + facecolors="none", + edgecolors=outer_edge, + linewidths=1.6, + zorder=18, ) - # 实测点:白色外圈 + 内层圆点 + + # 实测点(Actual) 彩色实心 + 白色描边 ax.scatter( - [m_u], [m_v], - s=52, marker="o", - facecolors="none", edgecolors=outer_edge, - linewidths=1.0, zorder=19, - ) - ax.scatter( - [m_u], [m_v], - s=24, marker="o", - facecolors=face, edgecolors="#111111", - linewidths=0.85, zorder=20, + [m_u], + [m_v], + s=80, + marker="o", + color=face, + edgecolors=outer_edge, + linewidths=1.2, + zorder=20, ) + # # Δu'v' 偏差连线 + # ax.plot( + # [s_u, m_u], + # [s_v, m_v], + # color=face, + # linewidth=1.0, + # alpha=0.8, + # zorder=15, + # ) + legend_handles = [ - Line2D([0], [0], marker="s", linestyle="none", - markerfacecolor="#CCCCCC", markeredgecolor=outer_edge, - markersize=7, label="目标 (Target)"), - Line2D([0], [0], marker="o", linestyle="none", - markerfacecolor="#CCCCCC", markeredgecolor="#000000", - markersize=7, label="实测 (Actual)"), + Line2D( + [0], + [0], + marker="s", + linestyle="none", + markerfacecolor="none", + markeredgecolor=outer_edge, + markersize=9, + markeredgewidth=1.4, + label="目标 (Target)", + ), + Line2D( + [0], + [0], + marker="o", + linestyle="none", + markerfacecolor="#AAAAAA", + markeredgecolor=outer_edge, + markersize=9, + markeredgewidth=1.2, + label="实测 (Actual)", + ), ] + leg = ax.legend( handles=legend_handles, - loc="lower right", fontsize=max(6, 8 * font_scale), - framealpha=0.88, labelcolor=legend_label_color, + loc="lower right", + fontsize=max(6, 8 * font_scale), + framealpha=0.9, + labelcolor=legend_label_color, ) - if leg is not None: + + if leg: leg.get_frame().set_facecolor(legend_bg) leg.get_frame().set_edgecolor(legend_edge) leg.set_zorder(50) + def _draw_result_judgement(ax, accuracy_data, font_scale=1.0, dark_mode=False): """底部结果条""" ax.clear() @@ -353,8 +399,11 @@ def plot_accuracy(self: "PQAutomationApp", accuracy_data, test_type): except Exception: pass - self.accuracy_canvas.draw() + # Select the tab first so the canvas is visible and winfo_width/height + # return real pixel dimensions before the figure is rendered. self.chart_notebook.select(self.accuracy_chart_frame) + self.accuracy_canvas.get_tk_widget().update_idletasks() + self.accuracy_canvas.draw() class PlotAccuracyMixin: diff --git a/app/pq/color_patch_map.py b/app/pq/color_patch_map.py new file mode 100644 index 0000000..9f68f89 --- /dev/null +++ b/app/pq/color_patch_map.py @@ -0,0 +1,101 @@ +import re +import numpy as np + +# ============================================================ +# 标准 ColorChecker / TV Patch 颜色 +# ============================================================ +COLOR_PATCH_MAP = { +"White": "#FFFFFF", +"Gray 80": "#E6E6E6", +"Gray 65": "#D1D1D1", +"Gray 50": "#BABABA", +"Gray 35": "#9E9E9E", +"Black": "#000000", + +"Dark Skin": "#735242", +"Light Skin": "#C29682", +"Blue Sky": "#5E7A9C", +"Foliage": "#596B42", +"Blue Flower": "#8280B0", +"Bluish Green": "#63BDA8", +"Orange": "#D97829", +"Purplish Blue": "#4A5CA3", +"Moderate Red": "#C25461", +"Purple": "#5C3D6B", +"Yellow Green": "#9EBA40", +"Orange Yellow": "#E6A12E", + +"Blue (Legacy)": "#333D96", +"Green (Legacy)": "#479447", +"Red (Legacy)": "#B0303B", +"Yellow (Legacy)": "#EDC721", +"Magenta (Legacy)": "#BA5491", +"Cyan (Legacy)": "#0085A3", + +"100% Red": "#FF0000", +"100% Green": "#00FF00", +"100% Blue": "#0000FF", +"100% Cyan": "#00FFFF", +"100% Magenta": "#FF00FF", +"100% Yellow": "#FFFF00", + +} + +# ============================================================ +# 名称标准化 +# ============================================================ +def normalize_patch_name(name: str) -> str: + if not isinstance(name, str): + return "" + name = name.strip().lower() + # collapse spaces + name = " ".join(name.split()) + return name + + +# ============================================================ +# 获取 patch 颜色 +# ============================================================ +def get_patch_color(name: str): + norm = normalize_patch_name(name) + for key, color in COLOR_PATCH_MAP.items(): + + if normalize_patch_name(key) == norm: + return color + + return None + +# ============================================================ +# xy → sRGB fallback +# ============================================================ +def xy_to_srgb(x, y, Y=1.0): + if y == 0: + return "#AAAAAA" + + X = (x * Y) / y + Z = (1 - x - y) * Y / y + + rgb = np.dot( + [[3.2406, -1.5372, -0.4986], + [-0.9689, 1.8758, 0.0415], + [0.0557, -0.2040, 1.0570]], + [X, Y, Z] + ) + + rgb = np.clip(rgb, 0, 1) + rgb = (rgb ** (1 / 2.2)) * 255 + + r, g, b = rgb.astype(int) + return "#{:02X}{:02X}{:02X}".format(r, g, b) + +def get_patch_color_from_xy(name, xy=None): + color = get_patch_color(name) + + if color: + return color + + if xy is not None: + x, y = xy + return xy_to_srgb(x, y) + + return "#AAAAAA" \ No newline at end of file diff --git a/app/services/ai_image.py b/app/services/ai_image.py index a47da28..424394a 100644 --- a/app/services/ai_image.py +++ b/app/services/ai_image.py @@ -293,62 +293,107 @@ def _call_pqtest_upload(file_path: str, timeout: float = API_UPLOAD_TIMEOUT, aut iw, ih = img.size except Exception as exc: raise ValueError(f"无法读取图片: {exc}") from exc - - # 检查大小,如需则缩放 + size = os.path.getsize(file_path) needs_resize = (iw > UPLOAD_MAX_PIXELS or ih > UPLOAD_MAX_PIXELS or size > UPLOAD_MAX_BYTES) - + file_stem = os.path.splitext(os.path.basename(file_path))[0] + upload_ext = ext + mime = mimetypes.guess_type(file_path)[0] or "application/octet-stream" + if needs_resize: if not auto_resize: if iw > UPLOAD_MAX_PIXELS or ih > UPLOAD_MAX_PIXELS: raise ValueError(f"分辨率超过 4096×4096(当前 {iw}×{ih})") - else: - raise ValueError(f"图片超过 10MB 限制(当前 {size/1024/1024:.2f}MB)") - - # 自动缩放:等比例缩放至 4096×4096 以内 - logger.info("[AIImage][UPLOAD] 自动缩放 %dx%d (%.1fMB) 至 ≤4096×4096", - iw, ih, size/1024/1024) - scale = min(UPLOAD_MAX_PIXELS / iw, UPLOAD_MAX_PIXELS / ih, 1.0) - new_w, new_h = max(1, int(iw * scale)), max(1, int(ih * scale)) - + raise ValueError(f"图片超过 10MB 限制(当前 {size/1024/1024:.2f}MB)") + + logger.info( + "[AIImage][UPLOAD] 自动处理超限图片 %dx%d (%.2fMB)", + iw, + ih, + size / 1024 / 1024, + ) + with Image.open(file_path) as img: - img_resized = img.resize((new_w, new_h), Image.LANCZOS) - # 重压至 10MB 以下 - # 首先尝试原格式 - tmp_io = BytesIO() - fmt = "PNG" if ext == ".png" else "JPEG" - save_kw = {"format": fmt} - img_resized.save(tmp_io, **save_kw) - tmp_bytes = tmp_io.getvalue() - - if len(tmp_bytes) <= UPLOAD_MAX_BYTES: - file_bytes = tmp_bytes + working = img.copy() + + # 先做一次分辨率约束,避免后续压缩开销过大。 + scale = min(UPLOAD_MAX_PIXELS / max(1, working.width), UPLOAD_MAX_PIXELS / max(1, working.height), 1.0) + if scale < 1.0: + working = working.resize( + (max(1, int(working.width * scale)), max(1, int(working.height * scale))), + Image.LANCZOS, + ) + + best_bytes = b"" + best_mime = mime + best_ext = upload_ext + + # 第一优先:保持原格式。 + try: + raw_io = BytesIO() + if ext == ".png": + working.save(raw_io, format="PNG", optimize=True) + raw_mime, raw_ext = "image/png", ".png" else: - # 原格式太大,转换为 JPEG 并压缩 - logger.info("[AIImage][UPLOAD] 原格式超过限制,转为 JPEG 并压缩") - quality = 95 - while quality >= 50: - tmp_io = BytesIO() - img_resized.save(tmp_io, format="JPEG", quality=quality, optimize=True) - tmp_bytes = tmp_io.getvalue() - if len(tmp_bytes) <= UPLOAD_MAX_BYTES: - file_bytes = tmp_bytes + rgb = working.convert("RGB") if working.mode not in {"RGB", "L"} else working + rgb.save(raw_io, format="JPEG", quality=95, optimize=True) + raw_mime, raw_ext = "image/jpeg", ".jpg" + best_bytes = raw_io.getvalue() + best_mime = raw_mime + best_ext = raw_ext + except Exception as exc: + logger.warning("[AIImage][UPLOAD] 原格式编码失败,准备转 JPEG: %s", exc) + + # 仍超限时,转 JPEG + 渐进压缩;如仍超限则继续降分辨率。 + if len(best_bytes) > UPLOAD_MAX_BYTES: + if best_ext != ".jpg": + logger.info("[AIImage][UPLOAD] 原格式仍超限,切换 JPEG 压缩") + working_jpg = working.convert("RGB") if working.mode != "RGB" else working + while True: + compressed = b"" + for q in (95, 90, 85, 80, 75, 70, 65, 60, 55, 50): + tmp = BytesIO() + working_jpg.save(tmp, format="JPEG", quality=q, optimize=True) + data = tmp.getvalue() + compressed = data + if len(data) <= UPLOAD_MAX_BYTES: break - quality -= 5 - else: - # 即使 quality=50 还是太大,也用这个版本上传(后端会处理) - file_bytes = tmp_bytes - - logger.info("[AIImage][UPLOAD] 缩放完成 %dx%d (%.1fMB)", - new_w, new_h, len(file_bytes)/1024/1024) - iw, ih = new_w, new_h + best_bytes = compressed + best_mime = "image/jpeg" + best_ext = ".jpg" + if len(best_bytes) <= UPLOAD_MAX_BYTES: + break + + next_w = max(256, int(working_jpg.width * 0.9)) + next_h = max(256, int(working_jpg.height * 0.9)) + if next_w == working_jpg.width and next_h == working_jpg.height: + break + if next_w <= 256 or next_h <= 256: + break + working_jpg = working_jpg.resize((next_w, next_h), Image.LANCZOS) + + if len(best_bytes) > UPLOAD_MAX_BYTES: + raise ValueError( + f"自动压缩后仍超过 10MB(当前 {len(best_bytes)/1024/1024:.2f}MB),请更换图片" + ) + + file_bytes = best_bytes + mime = best_mime + upload_ext = best_ext + iw, ih = working.width, working.height + logger.info( + "[AIImage][UPLOAD] 自动处理完成 %dx%d %.2fMB (%s)", + iw, + ih, + len(file_bytes) / 1024 / 1024, + mime, + ) else: with open(file_path, "rb") as f: file_bytes = f.read() - mime = mimetypes.guess_type(file_path)[0] or "application/octet-stream" + filename = f"{file_stem}{upload_ext}" boundary = "----pqAuto" + uuid.uuid4().hex - filename = os.path.basename(file_path) crlf = b"\r\n" body = b"".join([ b"--", boundary.encode("ascii"), crlf, diff --git a/app/services/pattern_service.py b/app/services/pattern_service.py index 81ef958..3b7de95 100644 --- a/app/services/pattern_service.py +++ b/app/services/pattern_service.py @@ -21,8 +21,23 @@ class PatternService: def __init__(self, app): self.app = app + def _build_apply_config_error(self, test_type): + timing = self.app.config.current_test_types.get(test_type, {}).get("timing", "-") + detail = "" + try: + ctrl = getattr(self.app.signal_service.device, "raw_controller", None) + if ctrl is not None: + d = getattr(ctrl, "last_error", None) + if d: + detail = f", detail={d}" + except Exception: + pass + return f"UCD profile apply_config failed for {test_type}, timing={timing}{detail}" + def prepare_session(self, mode, *, test_type=None, log_details=False): test_type = test_type or self.app.config.current_test_type + if hasattr(self.app.config, "set_current_test_type"): + self.app.config.set_current_test_type(test_type) if not self.app.config.set_current_pattern(mode): raise ValueError(f"未知的图案模式: {mode}") @@ -64,7 +79,8 @@ class PatternService: ("Timing", self.app.config.current_test_types[test_type]["timing"]), ]: self._log(f" {label}: {value}", "info") - self.app.signal_service.apply_config(active_config) + if not self.app.signal_service.apply_config(active_config): + raise RuntimeError(self._build_apply_config_error(test_type)) success = self.app.signal_service.update_signal_format( color_space=color_space, data_range=data_range, @@ -97,7 +113,10 @@ class PatternService: active_config = self.app.config.get_temp_config_with_converted_params( mode=mode, converted_params=converted_params ) - self.app.signal_service.apply_config(active_config) + if hasattr(active_config, "set_current_test_type"): + active_config.set_current_test_type(test_type) + if not self.app.signal_service.apply_config(active_config): + raise RuntimeError(self._build_apply_config_error(test_type)) success = self.app.signal_service.update_signal_format( color_space=self.app.sdr_color_space_var.get(), data_range=data_range, @@ -129,7 +148,10 @@ class PatternService: active_config = self.app.config.get_temp_config_with_converted_params( mode=mode, converted_params=converted_params ) - self.app.signal_service.apply_config(active_config) + if hasattr(active_config, "set_current_test_type"): + active_config.set_current_test_type(test_type) + if not self.app.signal_service.apply_config(active_config): + raise RuntimeError(self._build_apply_config_error(test_type)) success = self.app.signal_service.update_signal_format( color_space=self.app.hdr_color_space_var.get(), data_range=data_range, diff --git a/app/services/ucd_service.py b/app/services/ucd_service.py index cce7fc2..a333feb 100644 --- a/app/services/ucd_service.py +++ b/app/services/ucd_service.py @@ -12,7 +12,9 @@ from __future__ import annotations +from contextlib import contextmanager import logging +import sys import threading from app.ucd_domain import ( @@ -34,6 +36,10 @@ from drivers.ucd_driver import IUcdDevice log = logging.getLogger(__name__) +_LOCK_TIMEOUT_SECONDS = 8.0 +_DEBUG_LOCK_TIMEOUT_SECONDS = 0.3 +_PATTERN_LOCK_TIMEOUT_SECONDS = 0.8 + # ─── 视图字符串 → 值对象 转换工具 ──────────────────────────────── @@ -85,6 +91,53 @@ class SignalService: self._dev = device self._bus = bus self._lock = threading.RLock() + self._lock_owner_tid: int | None = None + self._lock_owner_name: str | None = None + + def _effective_lock_timeout(self, timeout_override: float | None = None) -> float: + """调试模式下缩短锁等待,避免单步时表现为 UI 长时间无响应。""" + if timeout_override is not None: + return timeout_override + if sys.gettrace() is not None: + return _DEBUG_LOCK_TIMEOUT_SECONDS + return _LOCK_TIMEOUT_SECONDS + + @contextmanager + def _acquire_service_lock(self, op_name: str, timeout_override: float | None = None): + timeout = self._effective_lock_timeout(timeout_override) + current = threading.current_thread() + log.info( + "SignalService.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s", + op_name, + timeout, + threading.get_ident(), + current.name, + self._lock_owner_tid, + self._lock_owner_name, + ) + acquired = self._lock.acquire(timeout=timeout) + if not acquired: + raise UcdError( + "UCD busy: lock timeout in " + f"SignalService.{op_name} ({timeout:.1f}s), " + f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}" + ) + prev_owner_tid = self._lock_owner_tid + prev_owner_name = self._lock_owner_name + self._lock_owner_tid = threading.get_ident() + self._lock_owner_name = current.name + log.info( + "SignalService.%s lock acquired tid=%s thread=%s", + op_name, + self._lock_owner_tid, + self._lock_owner_name, + ) + try: + yield + finally: + self._lock_owner_tid = prev_owner_tid + self._lock_owner_name = prev_owner_name + self._lock.release() # -- 高层接口 ------------------------------------------------ @@ -100,7 +153,7 @@ class SignalService: Returns: ``format_changed``——本次相对上一次 :meth:`apply` 是否变化。 """ - with self._lock: + with self._acquire_service_lock("apply"): log.info( "SignalService.apply signal=%s timing=%s pattern=%s", signal, @@ -114,7 +167,7 @@ class SignalService: def send_pattern(self, pattern: PatternSpec) -> None: """在已 configure 的信号上仅更新图案后 apply。""" - with self._lock: + with self._acquire_service_lock("send_pattern", _PATTERN_LOCK_TIMEOUT_SECONDS): log.info("SignalService.send_pattern pattern=%s", pattern.kind.value) self._dev.set_pattern(pattern) self._dev.apply() @@ -155,7 +208,7 @@ class SignalService: ctrl = getattr(self._dev, "raw_controller", None) if ctrl is None: raise UcdError("update_signal_format 暂仅支持 UCD323Device") - with self._lock: + with self._acquire_service_lock("update_signal_format"): return bool( ctrl.apply_signal_format( color_space=color_space, @@ -193,7 +246,7 @@ class SignalService: ctrl = getattr(self._dev, "raw_controller", None) if ctrl is None: raise UcdError("apply_config 暂仅支持 UCD323Device") - with self._lock: + with self._acquire_service_lock("apply_config"): return bool(ctrl.set_ucd_params(config)) def send_pattern_params(self, params) -> bool: @@ -201,7 +254,7 @@ class SignalService: ctrl = getattr(self._dev, "raw_controller", None) if ctrl is None: raise UcdError("send_pattern_params 暂仅支持 UCD323Device") - with self._lock: + with self._acquire_service_lock("send_pattern_params"): return bool(ctrl.send_current_pattern_params(params)) def apply_and_run(self, config, pattern_params) -> bool: @@ -212,7 +265,7 @@ class SignalService: ctrl = getattr(self._dev, "raw_controller", None) if ctrl is None: raise UcdError("apply_and_run 暂仅支持 UCD323Device") - with self._lock: + with self._acquire_service_lock("apply_and_run"): if not ctrl.set_ucd_params(config): return False if not ctrl.set_pattern(ctrl.current_pattern, pattern_params): diff --git a/app/views/panels/ai_image_panel.py b/app/views/panels/ai_image_panel.py index e013777..bb5f749 100644 --- a/app/views/panels/ai_image_panel.py +++ b/app/views/panels/ai_image_panel.py @@ -803,17 +803,22 @@ def _on_upload_done(self: "PQAutomationApp", name: str, url: str, exc): def _clear_reference_image(self: "PQAutomationApp"): - """清除手动上传的参考图,同时清除当前会话的自动链路参考。""" + """清除手动上传的参考图。 + + v2.1 规则要求:从第二轮开始应使用最近一次成功生成图作为输入, + 因此这里不清除会话级自动链路参考;若需彻底重置,请点“新对话”。 + """ if getattr(self, "_ai_image_requesting", False): return self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" - sid = _svc.get_session_id() - refs = getattr(self, "_ai_image_session_refs", None) - if isinstance(refs, dict): - refs.pop(sid, None) _refresh_ref_label(self) - self.ai_image_status_var.set("已清除参考图,切换为文生图模式") + sid = _svc.get_session_id() + refs = getattr(self, "_ai_image_session_refs", None) or {} + if (refs.get(sid) or "").strip(): + self.ai_image_status_var.set("已清除手动参考图,当前会话仍沿用上一轮生成图") + else: + self.ai_image_status_var.set("已清除参考图,当前为文生图模式") def _session_id_for_item(self: "PQAutomationApp", item_id: str) -> str: diff --git a/app/views/panels/calman_panel.py b/app/views/panels/calman_panel.py index 060e630..a75a5b7 100644 --- a/app/views/panels/calman_panel.py +++ b/app/views/panels/calman_panel.py @@ -232,6 +232,62 @@ def _apply_calman_tree_style(self: "PQAutomationApp") -> None: self.calman_data_tree.configure(style="Calman.Treeview") +def _calman_log(self: "PQAutomationApp", message: str, level: str = "info") -> None: + """统一输出 Calman 面板日志。""" + logger = getattr(self, "log_gui", None) + if logger is None: + return + self._dispatch_ui(self.log_gui.log, f"CALMAN: {message}", level) + + +def _build_calman_config_summary(self: "PQAutomationApp") -> str: + """生成顶部配置摘要,跟随当前测试类型展示 UCD 参数。""" + cfg = getattr(self, "config", None) + test_type = getattr(cfg, "current_test_type", "screen_module") + test_cfg = {} + if cfg is not None: + test_cfg = getattr(cfg, "current_test_types", {}).get(test_type, {}) + + if test_type == "screen_module": + color_space = getattr(getattr(self, "screen_module_color_space_var", None), "get", lambda: test_cfg.get("colorimetry", "-"))() + output_format = getattr(getattr(self, "screen_module_output_format_var", None), "get", lambda: test_cfg.get("color_format", "-"))() + bit_depth = getattr(getattr(self, "screen_module_bit_depth_var", None), "get", lambda: f"{int(test_cfg.get('bpc', 8))}bit")() + data_range = getattr(getattr(self, "screen_module_data_range_var", None), "get", lambda: test_cfg.get("data_range", "Full"))() + timing = test_cfg.get("timing", "-") + profile_name = "Screen" + elif test_type == "sdr_movie": + color_space = getattr(getattr(self, "sdr_color_space_var", None), "get", lambda: test_cfg.get("colorimetry", "-"))() + output_format = getattr(getattr(self, "sdr_output_format_var", None), "get", lambda: test_cfg.get("color_format", "-"))() + bit_depth = getattr(getattr(self, "sdr_bit_depth_var", None), "get", lambda: f"{int(test_cfg.get('bpc', 8))}bit")() + data_range = getattr(getattr(self, "sdr_data_range_var", None), "get", lambda: test_cfg.get("data_range", "Full"))() + timing = test_cfg.get("timing", "-") + profile_name = "SDR" + elif test_type == "hdr_movie": + color_space = getattr(getattr(self, "hdr_color_space_var", None), "get", lambda: test_cfg.get("colorimetry", "-"))() + output_format = getattr(getattr(self, "hdr_output_format_var", None), "get", lambda: test_cfg.get("color_format", "-"))() + bit_depth = getattr(getattr(self, "hdr_bit_depth_var", None), "get", lambda: f"{int(test_cfg.get('bpc', 10))}bit")() + data_range = getattr(getattr(self, "hdr_data_range_var", None), "get", lambda: test_cfg.get("data_range", "Limited"))() + timing = test_cfg.get("timing", "-") + profile_name = "HDR" + else: + color_space = test_cfg.get("colorimetry", "-") + output_format = test_cfg.get("color_format", "-") + bit_depth = test_cfg.get("bpc", "-") + data_range = test_cfg.get("data_range", "-") + timing = test_cfg.get("timing", "-") + profile_name = test_type + + return ( + f"Profile: {profile_name} | Timing: {timing} | CS: {color_space} | " + f"Fmt: {output_format} | Depth: {bit_depth} | Range: {data_range}" + ) + + +def _refresh_calman_config_summary(self: "PQAutomationApp") -> None: + if hasattr(self, "calman_config_summary_var"): + self.calman_config_summary_var.set(_build_calman_config_summary(self)) + + def create_calman_panel(self: "PQAutomationApp") -> None: """创建 CALMAN 风格灰阶测试面板,注册到 panel_manager。""" palette = _get_calman_palette() @@ -242,6 +298,7 @@ def create_calman_panel(self: "PQAutomationApp") -> None: self.calman_results = {} self.calman_stop_event = threading.Event() self.calman_running = False + self.calman_patch_send_busy = False self.calman_current_level = None self.calman_last_record = None self.calman_last_step_seconds = None @@ -298,6 +355,15 @@ def create_calman_panel(self: "PQAutomationApp") -> None: ) self.calman_elapsed_label.pack(side=tk.LEFT) + self.calman_config_summary_var = tk.StringVar(value="") + self.calman_config_summary_label = ttk.Label( + control_row, + textvariable=self.calman_config_summary_var, + foreground=palette["status_fg"], + anchor=tk.W, + ) + self.calman_config_summary_label.pack(side=tk.LEFT, padx=(12, 0), fill=tk.X, expand=True) + metrics_row = ttk.Frame(chart_frame) metrics_row.grid(row=2, column=0, sticky=tk.EW, pady=(2, 0)) metrics_row.columnconfigure((0, 1, 2, 3), weight=1) @@ -478,7 +544,13 @@ def create_calman_panel(self: "PQAutomationApp") -> None: ) def _bind_click(widget, p=pct): - widget.bind("", lambda _e, pp=p: send_patch(self, pp)) + def _on_click(_e, pp=p): + send_patch(self, pp) + # Prevent event bubbling from canvas -> parent cell, which would + # otherwise trigger duplicated sends for a single click. + return "break" + + widget.bind("", _on_click) for w in (cell, target_canvas): _bind_click(w) @@ -581,6 +653,7 @@ def create_calman_panel(self: "PQAutomationApp") -> None: right.bind("", lambda _e: _adaptive_matrix_columns(self)) _refresh_metric_table(self) + _refresh_calman_config_summary(self) _update_target_strip(self) _update_actual_strip(self) _redraw_calman_charts(self) @@ -592,6 +665,7 @@ def create_calman_panel(self: "PQAutomationApp") -> None: def toggle_calman_panel(self: "PQAutomationApp") -> None: """切换 CALMAN 灰阶面板显示。""" self.show_panel("calman") + _refresh_calman_config_summary(self) # --------------------------------------------------------------------------- @@ -604,23 +678,43 @@ def send_patch(self: "PQAutomationApp", pct: int) -> None: if not self.signal_service.is_connected: messagebox.showwarning("提示", "请先连接 UCD323 设备") return + if getattr(self, "calman_patch_send_busy", False): + _calman_log(self, f"send busy, ignore click pct={pct}", "warning") + self.calman_status_var.set("发送进行中,请稍候...") + return rgb_val = int(round(pct * 255 / 100)) self.calman_current_level = pct self.calman_status_var.set(f"发送 {pct}%(RGB={rgb_val})...") _highlight_patch(self, pct) + _refresh_calman_config_summary(self) + _calman_log(self, f"click patch pct={pct}, rgb=({rgb_val}, {rgb_val}, {rgb_val})") + self.calman_patch_send_busy = True def worker(): try: - self.signal_service.send_solid_rgb((rgb_val, rgb_val, rgb_val)) + _calman_log(self, f"send_solid_rgb start pct={pct}") + test_type = getattr(self.config, "current_test_type", "screen_module") + if hasattr(self, "pattern_service") and self.pattern_service is not None: + self.pattern_service.send_rgb( + (rgb_val, rgb_val, rgb_val), + test_type=test_type, + ) + else: + self.signal_service.send_solid_rgb((rgb_val, rgb_val, rgb_val)) + _calman_log(self, f"send_solid_rgb success pct={pct}") + _calman_log(self, f"ucd profile applied test_type={test_type}") self._dispatch_ui( self.log_gui.log, f"CALMAN: 已发送 {pct}% 灰阶 (RGB={rgb_val})", "info", ) self._dispatch_ui(self.calman_status_var.set, f"{pct}% 已发送") except Exception as exc: + _calman_log(self, f"send_solid_rgb failed pct={pct}: {exc}", "error") self._dispatch_ui(self.log_gui.log, f"发送失败: {exc}", "error") self._dispatch_ui(self.calman_status_var.set, f"发送失败: {exc}") + finally: + self.calman_patch_send_busy = False threading.Thread(target=worker, daemon=True).start() @@ -693,14 +787,32 @@ def measure_current_patch(self: "PQAutomationApp") -> None: def worker(): t0 = time.perf_counter() + _calman_log(self, f"measure start pct={pct}") self._dispatch_ui(self.calman_status_var.set, f"采集 {pct}% 中...") rec = _measure_once(self, pct) if rec is None: + _calman_log(self, f"measure failed pct={pct}", "error") self._dispatch_ui(self.calman_status_var.set, "采集失败") return step_s = time.perf_counter() - t0 self.calman_last_step_seconds = step_s self.calman_results[pct] = rec + _calman_log( + self, + ( + "measure success pct={pct}, x={x:.4f}, y={y:.4f}, Y={Y:.3f}, " + "cct={cct:.0f}, gamma={gamma:.3f}, de2000={de:.3f}, step={step:.2f}s" + ).format( + pct=pct, + x=rec["x"], + y=rec["y"], + Y=rec["Y"], + cct=rec["cct"] if rec["cct"] == rec["cct"] else float("nan"), + gamma=rec["gamma"] if rec["gamma"] == rec["gamma"] else float("nan"), + de=rec["de2000"] if rec["de2000"] == rec["de2000"] else float("nan"), + step=step_s, + ), + ) self._dispatch_ui(_apply_record_to_ui, self, rec) self._dispatch_ui( self.calman_status_var.set, f"{pct}% 采集完成 ({step_s:.2f}s)" @@ -726,15 +838,28 @@ def start_sequence_test(self: "PQAutomationApp") -> None: settle = float(getattr(self, "pattern_settle_time", 0.4)) self.calman_progress["value"] = 0 self.calman_progress_var.set("0 / 0") + _refresh_calman_config_summary(self) + _calman_log(self, f"sequence start levels={len(self.calman_levels)}, settle={settle:.2f}s") def worker(): seq_t0 = time.perf_counter() try: + test_type = getattr(self.config, "current_test_type", "screen_module") + rgb_session = None + if hasattr(self, "pattern_service") and self.pattern_service is not None: + rgb_session = self.pattern_service.prepare_session( + "rgb", + test_type=test_type, + log_details=False, + ) + _calman_log(self, f"sequence ucd profile applied test_type={test_type}") + order = sorted(self.calman_levels, reverse=True) total = len(order) self._dispatch_ui(self.calman_progress_var.set, f"0 / {total}") for i, pct in enumerate(order, 1): if self.calman_stop_event.is_set(): + _calman_log(self, f"sequence stop requested at step={i-1}/{total}", "warning") self._dispatch_ui(self.log_gui.log, "已停止连续测试", "warning") break step_t0 = time.perf_counter() @@ -742,12 +867,20 @@ def start_sequence_test(self: "PQAutomationApp") -> None: self._dispatch_ui( self.calman_status_var.set, f"[{i}/{total}] 发送 {pct}%" ) + _calman_log(self, f"sequence send step={i}/{total}, pct={pct}, rgb={rgb_val}") self._dispatch_ui(_highlight_patch, self, pct) try: - self.signal_service.send_solid_rgb( - (rgb_val, rgb_val, rgb_val) - ) + if rgb_session is not None: + self.pattern_service.send_rgb( + (rgb_val, rgb_val, rgb_val), + session=rgb_session, + ) + else: + self.signal_service.send_solid_rgb( + (rgb_val, rgb_val, rgb_val) + ) except Exception as exc: + _calman_log(self, f"sequence send failed step={i}/{total}, pct={pct}: {exc}", "error") self._dispatch_ui( self.log_gui.log, f"发送 {pct}% 失败: {exc}", "error" ) @@ -755,11 +888,30 @@ def start_sequence_test(self: "PQAutomationApp") -> None: self.calman_current_level = pct # 等待稳定,停止事件触发时尽快退出 if self.calman_stop_event.wait(settle): + _calman_log(self, f"sequence interrupted during settle step={i}/{total}, pct={pct}", "warning") break rec = _measure_once(self, pct) if rec is None: + _calman_log(self, f"sequence measure failed step={i}/{total}, pct={pct}", "error") continue self.calman_results[pct] = rec + _calman_log( + self, + ( + "sequence measure step={i}/{total}, pct={pct}, x={x:.4f}, y={y:.4f}, Y={Y:.3f}, " + "cct={cct:.0f}, gamma={gamma:.3f}, de2000={de:.3f}" + ).format( + i=i, + total=total, + pct=pct, + x=rec["x"], + y=rec["y"], + Y=rec["Y"], + cct=rec["cct"] if rec["cct"] == rec["cct"] else float("nan"), + gamma=rec["gamma"] if rec["gamma"] == rec["gamma"] else float("nan"), + de=rec["de2000"] if rec["de2000"] == rec["de2000"] else float("nan"), + ), + ) self._dispatch_ui(_apply_record_to_ui, self, rec) step_s = time.perf_counter() - step_t0 total_s = time.perf_counter() - seq_t0 @@ -772,9 +924,11 @@ def start_sequence_test(self: "PQAutomationApp") -> None: total_s, ) else: + _calman_log(self, f"sequence complete total={total}") self._dispatch_ui(self.calman_status_var.set, "连续测试完成") self._dispatch_ui(self.log_gui.log, "CALMAN: 连续测试完成", "success") return + _calman_log(self, "sequence stopped", "warning") self._dispatch_ui(self.calman_status_var.set, "已停止") finally: self.calman_running = False @@ -785,14 +939,17 @@ def start_sequence_test(self: "PQAutomationApp") -> None: def stop_sequence_test(self: "PQAutomationApp") -> None: """请求停止连续测试。""" if self.calman_running: + _calman_log(self, "stop requested", "warning") self.calman_stop_event.set() self.calman_status_var.set("正在停止...") else: + _calman_log(self, "stop requested but no sequence is running", "warning") self.calman_status_var.set("当前没有运行中的连续测试") def clear_results(self: "PQAutomationApp") -> None: """清空结果表和图表。""" + _calman_log(self, "clear results") self.calman_results.clear() self.calman_last_record = None self.calman_reading_var.set( @@ -1135,6 +1292,8 @@ def refresh_calman_theme(self: "PQAutomationApp") -> None: if hasattr(self, "calman_elapsed_label"): self.calman_elapsed_label.configure(foreground=palette["status_fg"]) + if hasattr(self, "calman_config_summary_label"): + self.calman_config_summary_label.configure(foreground=palette["status_fg"]) if hasattr(self, "calman_status_label"): self.calman_status_label.configure(foreground=palette["status_fg"]) if hasattr(self, "calman_reading_summary_label"): @@ -1151,6 +1310,7 @@ def refresh_calman_theme(self: "PQAutomationApp") -> None: ) _refresh_metric_table(self) + _refresh_calman_config_summary(self) _redraw_calman_charts(self) diff --git a/app/views/panels/main_layout.py b/app/views/panels/main_layout.py index e1160bf..d941437 100644 --- a/app/views/panels/main_layout.py +++ b/app/views/panels/main_layout.py @@ -831,6 +831,18 @@ def _on_toggle_theme(self: "PQAutomationApp") -> None: self.update_sidebar_selection() except Exception: pass + # 以新的 dark_mode 值重绘当前测试类型的所有图表 + if hasattr(self, "_chart_snapshots") and hasattr(self, "config"): + test_type = getattr(self.config, "current_test_type", None) + if test_type: + snapshots = self._chart_snapshots.get(test_type, {}) + for chart_name, args in snapshots.items(): + plot_fn = getattr(self, f"plot_{chart_name}", None) + if plot_fn: + try: + plot_fn(*args) + except Exception: + pass def update_config_info_display(self: "PQAutomationApp"): diff --git a/cache/pq_ai_api_v21_extracted.txt b/cache/pq_ai_api_v21_extracted.txt new file mode 100644 index 0000000..17ec34a --- /dev/null +++ b/cache/pq_ai_api_v21_extracted.txt @@ -0,0 +1,131 @@ + +===== PAGE 1 ===== +接口文档 +域名地址 +测试环境: +http://10.201.44.70:9008/ai-agent/ +生产环境: +https://r d-mokadisplay .tcl.com/ai-agent/ +一、上传图片接口: +1. 接口明细 +接口路径:api/v1/pqt est/uplo ad +Cont ent-Type:multip art/form-data +请求方式:POST +2. 请求参数说明 +参数名 参数类型 是否必填 参数描述 +file File(二进制文件)是待上传图片的二进制文件。请求体格式必须为multipart/form-data ,且包含一个名 +为file 的文件字段(如:@"D:\Desktop\PQtest\3-O.png" ) +备注:仅支持 PNG/JP G/JPEG 格式图片,大小不超过 10MB,分辨率最大为 4096×4096 p x +3. 请求及响应示例 +上传图片请求示例 +curl -X POST "https://rd-mokadisplay.tcl.com/ai-agent/api/v1/pqtest/upload" \ + -F "file=@D:\Desktop\PQtest\3-O.png"1 / 5 + +===== PAGE 2 ===== + +上传图片响应示例 +{ + "code": 200, + "message": "", + "data": { + "upload_image_url": "https://ai.file.qhmoka.com/prod-ai-portal/pq- +image/input/2026-05-28/a2264f477d4d487493058306701cdb44/3-O.png" + } +} +报错响应示例 +{ + "code":400, + "message":"不支持的图片格式,仅支持 PNG/JPG/JPEG", + "data":{"upload_image_url": ""} +} +二、生图接口 +1. 接口明细 +接口路径:api/v1/pqt est/generat e +Cont ent-Type:application/json +请求方式:POST +2. 请求参数说明 +参数名 参数类型是否必填 参数描述 +user_message String 是 用户的自然语言需求/指令文本,作为本次生成 PQ 测试图的输入内容2 / 5 + +===== PAGE 3 ===== +参数名 参数类型是否必填 参数描述 +session_id String 是会话标识,用于把多次请求归到同一会话(便于复用/关联上下文)。在同一会话窗口 +下请使用同一session_id。使用 uuid 生成唯一字符串 +upload_image_urlString 否 上传的参考图片地址,来自上传接口返回的 URL +备注1:相较于上一版本,本版本新增了 uplo ad_image _url 字段。未传入时,默认为“文生图”模 +式(与上版本一致);传入时,则启用“图生图”模式。 +备注2:在多轮对话的“图生图”模式下,需将上一轮对话返回的 imageUrl 作为第二轮对话的 +upload_image _url 传入。 +3. 请求及响应示例 +单轮对话生图请求示例 +{ + "user_message":"请复刻这张图,生成标准 PQ 测试图 ", + "session_id":"48f1a351-67f2-40db-a57d-20d66249bc93", + "upload_image_url":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/input/2026-05-21/681136ecf35549bcb7969905fb728991/05213.png" +} + +单轮对话生图响应示例 +{ + "code":200, + "message":"", + "data":{"imageUrl":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/2026-05-22/19/05be786be62eb9aa.png"} +}3 / 5 + +===== PAGE 4 ===== +多轮对话生图请求示例 +# 第一轮请求 +{ + "user_message":"请复刻这张图,生成标准 PQ 测试图 ", + "session_id":"48f1a351-67f2-40db-a57d-20d66249bc93", + "upload_image_url":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/input/2026-05-21/681136ecf35549bcb7969905fb728991/05213.png" +} +# 第二轮请求 +{ + "user_message":"把图片上部分的白和红圆圈对换位置,把图片下部分蓝和绿圆圈对换位置,同时把 +背景颜色换为黄色 ", + "session_id":"48f1a351-67f2-40db-a57d-20d66249bc93", # 确保两轮 session_id +相同 + "upload_image_url":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/2026-05-22/19/05be786be62eb9aa.png" # 这里是第一轮响应返回的 imageUrl +} +多轮对话生图响应示例 +# 第一轮响应 +{ + "code":200, + "message":"", + "data":{"imageUrl":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/2026-05-22/19/05be786be62eb9aa.png"} +} +# 第二轮响应 +{ + "code":200, + "message":"", + "data":{"imageUrl":"https://test.file.qhmoka.com/test-ai-portal/pq- +image/2026-05-22/19/703f0896ffcc8c57.png"} +} +报错响应示例 +# 大模型调用超时,超过120s +{ + "code":500, + "message":"生成失败 ",4 / 5 + +===== PAGE 5 ===== + "data":{"imageUrl":""} +} + +三、关于多轮对话(图片修改)的图片传参逻辑补充说明 +核心规则 +从第二轮请求开始,每一轮都必须把"最近一次成功返回的图片"作为本轮请求的输入图片传入。 +详细说明 +1.首轮请求 +•用户可传图、也可不传图(无强制要求)。 +2.后续每一轮请求(第 2 轮及以后) +•必须带上 上一轮成功返回的图片 作为输入。 +•与首轮用户是否传过图无关,只要是后续轮次就要带。 +3.异常情况处理 +•如果上一轮请求失败 / 没有返回图片,则向前回溯,使用 最近一次成功返回图片的那一 +轮的结果作为本轮输入。 +•简单说:始终使用"最近一张成功生成的图片"。5 / 5 \ No newline at end of file diff --git a/cache/pq_ai_api_v21_summary.txt b/cache/pq_ai_api_v21_summary.txt new file mode 100644 index 0000000..afb476d --- /dev/null +++ b/cache/pq_ai_api_v21_summary.txt @@ -0,0 +1,17 @@ +[upload] 示例 上传图片请求示例 curl -X POST "https://rd-mokadisplay.tcl.com/ai-agent/api/v1/pqtest/upload" \ -F "file=@D:\Desktop\PQtest\3-O.png"1 / 5 ===== PAGE 2 ===== 上传图片响应示例 { "code": 200, "message": "", "data": { "upload_image_url": "https://a + +[session_id] 是 用户的自然语言需求/指令文本,作为本次生成 PQ 测试图的输入内容2 / 5 ===== PAGE 3 ===== 参数名 参数类型是否必填 参数描述 session_id String 是会话标识,用于把多次请求归到同一会话(便于复用/关联上下文)。在同一会话窗口 下请使用同一session_id。使用 uuid 生成唯一字符串 upload_image_urlString 否 上传的参考图片地址,来自上传接口返回的 URL 备注1:相较于上一版本,本版本新增了 uplo ad_ima + +[upload_image_url] === PAGE 2 ===== 上传图片响应示例 { "code": 200, "message": "", "data": { "upload_image_url": "https://ai.file.qhmoka.com/prod-ai-portal/pq- image/input/2026-05-28/a2264f477d4d487493058306701cdb44/3-O.png" } } 报错响应示例 { "code":400, "message": + +[imageUrl] _url 字段。未传入时,默认为“文生图”模 式(与上版本一致);传入时,则启用“图生图”模式。 备注2:在多轮对话的“图生图”模式下,需将上一轮对话返回的 imageUrl 作为第二轮对话的 upload_image _url 传入。 3. 请求及响应示例 单轮对话生图请求示例 { "user_message":"请复刻这张图,生成标准 PQ 测试图 ", "session_id":"48f1a351-67f2-40db-a57d-20d66249bc93", + +[请求示例] png" ) 备注:仅支持 PNG/JP G/JPEG 格式图片,大小不超过 10MB,分辨率最大为 4096×4096 p x 3. 请求及响应示例 上传图片请求示例 curl -X POST "https://rd-mokadisplay.tcl.com/ai-agent/api/v1/pqtest/upload" \ -F "file=@D:\Desktop\PQtest\3-O.png"1 / 5 ===== PAGE 2 ===== 上传图片响应示例 { " + +[响应示例] test\3-O.png" ) 备注:仅支持 PNG/JP G/JPEG 格式图片,大小不超过 10MB,分辨率最大为 4096×4096 p x 3. 请求及响应示例 上传图片请求示例 curl -X POST "https://rd-mokadisplay.tcl.com/ai-agent/api/v1/pqtest/upload" \ -F "file=@D:\Desktop\PQtest\3-O.png"1 / 5 ===== PAGE 2 ===== 上传图片响 + +[code] -F "file=@D:\Desktop\PQtest\3-O.png"1 / 5 ===== PAGE 2 ===== 上传图片响应示例 { "code": 200, "message": "", "data": { "upload_image_url": "https://ai.file.qhmoka.com/prod-ai-portal/pq- image/input/2026-05-28/a2264f477d4d487493058306701cd + +[message] esktop\PQtest\3-O.png"1 / 5 ===== PAGE 2 ===== 上传图片响应示例 { "code": 200, "message": "", "data": { "upload_image_url": "https://ai.file.qhmoka.com/prod-ai-portal/pq- image/input/2026-05-28/a2264f477d4d487493058306701cdb44/3-O.png" } } + +[data] nt/ 一、上传图片接口: 1. 接口明细 接口路径:api/v1/pqt est/uplo ad Cont ent-Type:multip art/form-data 请求方式:POST 2. 请求参数说明 参数名 参数类型 是否必填 参数描述 file File(二进制文件)是待上传图片的二进制文件。请求体格式必须为multipart/form-data ,且包含一个名 为file 的文件字段(如:@"D:\Desktop\PQtest\3-O.png" ) 备注:仅支持 PNG \ No newline at end of file diff --git a/docs/PQ生图后端接口文档v2.pdf b/docs/PQ生图后端接口文档v2.pdf deleted file mode 100644 index aa079fb..0000000 Binary files a/docs/PQ生图后端接口文档v2.pdf and /dev/null differ diff --git a/docs/PQ生图接口文档v2.1.pdf b/docs/PQ生图接口文档v2.1.pdf new file mode 100644 index 0000000..87741f7 Binary files /dev/null and b/docs/PQ生图接口文档v2.1.pdf differ diff --git a/drivers/UCD323_Enum.py b/drivers/UCD323_Enum.py index 6fdee11..18d17bb 100644 --- a/drivers/UCD323_Enum.py +++ b/drivers/UCD323_Enum.py @@ -110,7 +110,16 @@ class UCDEnum: } if not colorimetry_str: return None - return colorimetry_map.get(colorimetry_str.lower(), None) + # Normalize: strip hyphens, spaces, dots, underscores so that + # "DCI-P3" → "dcip3", "BT.709" → "bt709", "BT.2020 YCbCr" → "bt2020ycbcr" + normalized = ( + colorimetry_str.lower() + .replace("-", "") + .replace(" ", "") + .replace(".", "") + .replace("_", "") + ) + return colorimetry_map.get(normalized, colorimetry_map.get(colorimetry_str.lower(), None)) class VideoPatternInfo: class VideoPattern(IntEnum): diff --git a/drivers/UCD323_Function.py b/drivers/UCD323_Function.py index 33f5077..746a320 100644 --- a/drivers/UCD323_Function.py +++ b/drivers/UCD323_Function.py @@ -1,10 +1,14 @@ # -*- coding: UTF-8 -*- +import logging import UniTAP import time import gc from drivers.UCD323_Enum import UCDEnum +log = logging.getLogger(__name__) + + class UCDController: """UCD323信号发生器控制类""" @@ -23,6 +27,7 @@ class UCDController: self.current_pattern_param = None self.current_pattern_params = None self.current_pattern_index = 0 + self.last_error = None def search_device(self): """搜索可用设备""" @@ -140,6 +145,7 @@ class UCDController: raise RuntimeError("UCD 未打开,无法获取 TX 模块") interface = getattr(self, "current_interface", None) + log.info("UCDController.get_tx_modules interface=%s", interface) if interface in (None, "HDMI"): return self.role.hdtx.pg, self.role.hdtx.ag if interface in ("DP", "Type-C"): @@ -189,6 +195,7 @@ class UCDController: def set_ucd_params(self, config): """设置UCD323参数""" + self.last_error = None self.config = config test_type = self.config.current_test_type @@ -200,16 +207,34 @@ class UCDController: colorimetry = self.config.current_test_types[test_type]["colorimetry"] if not self.set_color_mode(color_format, bpc, colorimetry): + self.last_error = ( + f"set_color_mode failed: color_format={color_format}, bpc={bpc}, colorimetry={colorimetry}" + ) + log.error( + "UCDController.set_ucd_params set_color_mode failed test_type=%s color_format=%s bpc=%s colorimetry=%s", + test_type, + color_format, + bpc, + colorimetry, + ) return False timing_str = self.config.current_test_types[test_type]["timing"] - self.set_timing_from_string(timing_str) + if not self.set_timing_from_string(timing_str): + self.last_error = f"set_timing_from_string failed: timing={timing_str}" + log.error( + "UCDController.set_ucd_params set_timing_from_string failed test_type=%s timing=%s", + test_type, + timing_str, + ) + return False self.current_pattern_index = 0 pattern_mode = self.config.current_pattern["pattern_mode"] pattern = UCDEnum.VideoPatternInfo.get_video_pattern(pattern_mode) if pattern is None: + self.last_error = f"get_video_pattern failed: pattern_mode={pattern_mode}" return False self.current_pattern = pattern @@ -219,10 +244,17 @@ class UCDController: def run(self): """运行设备""" + log.info( + "UCDController.run start current_pattern=%s has_pattern_param=%s", + getattr(self.current_pattern, "name", self.current_pattern), + self.current_pattern_param is not None, + ) self.apply_video_mode() self.apply_pattern() pg, _ = self.get_tx_modules() + log.info("UCDController.run calling pg.apply()") pg.apply() + log.info("UCDController.run done") return True def send_image_pattern(self, image_path): @@ -245,12 +277,15 @@ class UCDController: return False try: + log.info("UCDController.send_solid_rgb_pattern rgb=%s", rgb) self.current_pattern = UCDEnum.VideoPatternInfo.get_video_pattern("solidcolor") if self.current_pattern is None: + log.error("UCDController.send_solid_rgb_pattern failed: solidcolor pattern not found") return False return self.send_current_pattern_params(list(rgb)) except Exception: + log.exception("UCDController.send_solid_rgb_pattern exception") return False def send_current_pattern_params(self, pattern_params): @@ -260,17 +295,27 @@ class UCDController: try: if self.current_pattern is None: + log.error("UCDController.send_current_pattern_params failed: current_pattern is None") return False + log.info( + "UCDController.send_current_pattern_params pattern=%s params=%s", + getattr(self.current_pattern, "name", self.current_pattern), + pattern_params, + ) if pattern_params is not None and not self.set_pattern( self.current_pattern, pattern_params, ): + log.error("UCDController.send_current_pattern_params failed: set_pattern returned False") return False + log.info("UCDController.send_current_pattern_params calling run()") self.run() + log.info("UCDController.send_current_pattern_params done") return True except Exception: + log.exception("UCDController.send_current_pattern_params exception") return False def set_color_mode(self, cf, bpc, cm): @@ -298,8 +343,11 @@ class UCDController: def apply_video_mode(self): """应用当前 color_info 和 timing""" if self.current_timing: + log.info("UCDController.apply_video_mode start timing=%s", self.current_timing) self.set_video_mode() + log.info("UCDController.apply_video_mode done") return True + log.warning("UCDController.apply_video_mode skipped: current_timing is None") return False def set_video_mode(self): @@ -313,28 +361,48 @@ class UCDController: self.color_info.bpc, ) self.format_changed = (current_config != getattr(self, "_last_sent_config", None)) + log.info( + "UCDController.set_video_mode format_changed=%s color_format=%s colorimetry=%s dynamic_range=%s bpc=%s", + self.format_changed, + self.color_info.color_format, + self.color_info.colorimetry, + self.color_info.dynamic_range, + self.color_info.bpc, + ) video_mode = UniTAP.VideoMode( timing=self.current_timing, color_info=self.color_info ) pg, _ = self.get_tx_modules() + log.info("UCDController.set_video_mode calling pg.set_vm()") pg.set_vm(vm=video_mode) + log.info("UCDController.set_video_mode done") self._last_sent_config = current_config return True def set_pattern(self, pattern, pattern_params=None): """设置pattern""" - if self.current_timing: - needs_params = { - UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, - UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips, - UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes, - UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern, - UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow, - } - if pattern in needs_params and pattern_params: - self.set_pattern_params(pattern, pattern_params) - return True - return False + if self.current_timing is None: + # Pattern-only updates (e.g. Calman patch click) can still be applied on + # an already active output mode. Missing timing should not block pattern staging. + log.warning("UCDController.set_pattern current_timing is None; continue with pattern-only apply") + + needs_params = { + UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, + UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, + UCDEnum.VideoPatternInfo.VideoPatternParams.WhiteVStrips, + UCDEnum.VideoPatternInfo.VideoPatternParams.GradientRGBStripes, + UCDEnum.VideoPatternInfo.VideoPatternParams.MotionPattern, + UCDEnum.VideoPatternInfo.VideoPatternParams.SquareWindow, + } + log.info( + "UCDController.set_pattern pattern=%s pattern_params=%s needs_params=%s", + getattr(pattern, "name", pattern), + pattern_params, + pattern in needs_params, + ) + if pattern in needs_params and pattern_params is not None: + self.set_pattern_params(pattern, pattern_params) + return True def set_next_pattern(self): """设置下一个pattern""" @@ -350,25 +418,40 @@ class UCDController: def set_pattern_params(self, pattern, pattern_params): """设置pattern参数""" - if pattern: - if pattern == UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor: + if pattern is not None: + solid_color_patterns = { + UCDEnum.VideoPatternInfo.VideoPatternParams.SolidColor, + UCDEnum.VideoPatternInfo.VideoPattern.SolidColor, + } + if pattern in solid_color_patterns: + log.info("UCDController.set_pattern_params solid_color rgb=%s", pattern_params) self.current_pattern_param = UniTAP.SolidColorParams( first=pattern_params[0], second=pattern_params[1], third=pattern_params[2], ) return True + log.warning("UCDController.set_pattern_params unsupported pattern=%s", getattr(pattern, "name", pattern)) return False def apply_pattern(self): """应用当前pattern""" - if self.current_pattern: + if self.current_pattern is not None: + log.info( + "UCDController.apply_pattern start pattern=%s has_params=%s", + getattr(self.current_pattern, "name", self.current_pattern), + self.current_pattern_param is not None, + ) pg, _ = self.get_tx_modules() + log.info("UCDController.apply_pattern calling pg.set_pattern()") pg.set_pattern(self.current_pattern) - if self.current_pattern_param: + if self.current_pattern_param is not None: + log.info("UCDController.apply_pattern calling pg.set_pattern_params()") pg.set_pattern_params(self.current_pattern_param) + log.info("UCDController.apply_pattern done") return True + log.warning("UCDController.apply_pattern skipped: current_pattern is None") return False def search_timing(self, width, height, refresh_rate, resolution_type=None): @@ -383,15 +466,30 @@ class UCDController: elif resolution_type == "cvt": standard = UniTAP.common.timing.Timing.Standard.SD_CVT - timing = self.timing_manager.search( - h_active=width, - v_active=height, - f_rate=int(refresh_rate) * 1000, - standard=standard, - ) + rr = float(refresh_rate) + # Try both exact and NTSC-compatible rates (e.g. 120000 / 119880). + f_rate_candidates = [ + int(round(rr * 1000)), + int(rr * 1000), + int(round((rr * 1000.0) * 1000.0 / 1001.0)), + ] + # 去重并保持顺序 + f_rate_candidates = list(dict.fromkeys(f_rate_candidates)) - if timing: - return timing + standards = [standard] + if standard is not None: + standards.append(None) + + for std in standards: + for f_rate in f_rate_candidates: + timing = self.timing_manager.search( + h_active=width, + v_active=height, + f_rate=f_rate, + standard=std, + ) + if timing: + return timing else: for res_type in ["dmt", "cta", "cvt", "ovt"]: result = self.search_timing(width, height, refresh_rate, res_type) @@ -492,18 +590,54 @@ class UCDController: def set_timing_from_string(self, timing_str): """根据格式化timing字符串设置设备timing""" - spec = self.parse_formatted_timing(timing_str) + try: + spec = self.parse_formatted_timing(timing_str) + except Exception: + log.exception("UCDController.set_timing_from_string parse failed timing=%s", timing_str) + return False + rtype = spec["resolution_type"] + rid = spec.get("resolution_id") width = spec["width"] height = spec["height"] fr = spec["refresh_rate"] - if rtype != "ovt": - timing = self.search_timing(width, height, fr, rtype) - if timing: - self.current_timing = timing - return True + if rid is not None and self.set_timing_from_id(rtype, rid): + log.info( + "UCDController.set_timing_from_string success by id timing=%s parsed=(%s id=%s)", + timing_str, + rtype, + rid, + ) + return True + # Respect selected timing family first (DMT/CTA/CVT/OVT). + timing = self.search_timing(width, height, fr, rtype) + if timing is None: + # Fallback only for robustness: some SDKs may not classify a timing + # exactly as requested family even though width/height/fps matches. + timing = self.search_timing(width, height, fr, None) + + if timing: + self.current_timing = timing + log.info( + "UCDController.set_timing_from_string success timing=%s parsed=(%s %sx%s@%s)", + timing_str, + rtype, + width, + height, + fr, + ) + return True + + log.error( + "UCDController.set_timing_from_string no timing matched timing=%s parsed=(%s %sx%s@%s)", + timing_str, + rtype, + width, + height, + fr, + ) return False def set_timing_from_id(self, rtype, rid): @@ -515,6 +649,12 @@ class UCDController: timing = self.timing_manager.get_cta(rid) elif rtype.lower() == "cvt": timing = self.timing_manager.get_cvt(rid) + elif rtype.lower() == "ovt": + get_ovt = getattr(self.timing_manager, "get_ovt", None) + if callable(get_ovt): + timing = get_ovt(rid) + else: + return False else: raise ValueError(f"不支持的分辨率类型: {rtype}") diff --git a/drivers/ucd_driver.py b/drivers/ucd_driver.py index 7f09bb4..43a6efe 100644 --- a/drivers/ucd_driver.py +++ b/drivers/ucd_driver.py @@ -18,6 +18,7 @@ SDK 调用直接搬入本模块,届时可删除旧 ``UCDController`` 文件。 from __future__ import annotations +from contextlib import contextmanager import logging import threading from abc import ABC, abstractmethod @@ -48,6 +49,8 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) +_DEVICE_LOCK_TIMEOUT_SECONDS = 8.0 + # ─── §1 DeviceInfo / list_devices ──────────────────────────────── @@ -154,12 +157,50 @@ class UCD323Device(IUcdDevice): self._info: DeviceInfo | None = None self._interface: Interface = Interface.HDMI self._lock = threading.RLock() + self._lock_owner_tid: int | None = None + self._lock_owner_name: str | None = None self._curr_signal: SignalFormat | None = None self._curr_timing: TimingSpec | None = None self._curr_pattern: PatternSpec | None = None self._last_applied: tuple[SignalFormat, TimingSpec] | None = None + @contextmanager + def _acquire_device_lock(self, op_name: str): + current = threading.current_thread() + log.info( + "UCD323Device.%s acquiring lock timeout=%.1fs tid=%s thread=%s owner_tid=%s owner_thread=%s", + op_name, + _DEVICE_LOCK_TIMEOUT_SECONDS, + threading.get_ident(), + current.name, + self._lock_owner_tid, + self._lock_owner_name, + ) + acquired = self._lock.acquire(timeout=_DEVICE_LOCK_TIMEOUT_SECONDS) + if not acquired: + raise UcdStateError( + "UCD device busy: lock timeout in " + f"UCD323Device.{op_name}, " + f"owner_tid={self._lock_owner_tid}, owner_thread={self._lock_owner_name}" + ) + prev_owner_tid = self._lock_owner_tid + prev_owner_name = self._lock_owner_name + self._lock_owner_tid = threading.get_ident() + self._lock_owner_name = current.name + log.info( + "UCD323Device.%s lock acquired tid=%s thread=%s", + op_name, + self._lock_owner_tid, + self._lock_owner_name, + ) + try: + yield + finally: + self._lock_owner_tid = prev_owner_tid + self._lock_owner_name = prev_owner_name + self._lock.release() + # -- 读访问 -------------------------------------------------- @property @@ -181,7 +222,7 @@ class UCD323Device(IUcdDevice): # -- 生命周期 ------------------------------------------------ def open(self, info: DeviceInfo, *, interface: Interface = Interface.HDMI) -> None: - with self._lock: + with self._acquire_device_lock("open"): assert_transition(self._state, UcdState.OPENED) if interface is not Interface.HDMI: # Phase 1:底层 UCDController.open() 写死了 HDMISource。 @@ -200,7 +241,7 @@ class UCD323Device(IUcdDevice): self._bus.publish(ConnectionChanged(True, info.serial)) def close(self) -> None: - with self._lock: + with self._acquire_device_lock("close"): if self._state == UcdState.CLOSED: return try: @@ -219,7 +260,7 @@ class UCD323Device(IUcdDevice): # -- 配置 ---------------------------------------------------- def configure(self, signal: SignalFormat, timing: TimingSpec) -> bool: - with self._lock: + with self._acquire_device_lock("configure"): if self._state == UcdState.CLOSED: raise UcdNotConnected("UCD 未连接,无法 configure") try: @@ -249,7 +290,7 @@ class UCD323Device(IUcdDevice): return (signal, timing) != self._last_applied def set_pattern(self, pattern: PatternSpec) -> None: - with self._lock: + with self._acquire_device_lock("set_pattern"): # Phase 2 过渡:允许从 OPENED 直接 set_pattern——遗留路径 # (test_runner 等)通过旧 controller.apply_signal_format 写入 # 信号格式,未经过本设备的 configure。此时 self._state 仍为 @@ -260,7 +301,7 @@ class UCD323Device(IUcdDevice): # 仅本地暂存,真正写硬件在 apply() def apply(self) -> None: - with self._lock: + with self._acquire_device_lock("apply"): if self._state == UcdState.CLOSED: raise UcdNotConnected("UCD 未连接,无法 apply") if self._curr_pattern is None: @@ -268,7 +309,9 @@ class UCD323Device(IUcdDevice): try: ok = self._apply_pattern_via_controller(self._curr_pattern) except Exception as exc: # noqa: BLE001 - raise UcdSdkError("apply 异常") from exc + raise UcdSdkError( + f"apply 异常: {type(exc).__name__}: {exc}" + ) from exc if not ok: raise UcdApplyFailed( f"apply 失败: pattern={self._curr_pattern.kind.value}" @@ -333,7 +376,21 @@ class UCD323Device(IUcdDevice): if not self._controller.set_pattern(video_pattern, params): raise UcdApplyFailed("controller.set_pattern 返回 False") - return bool(self._controller.run()) + # Skip apply_video_mode() (i.e. pg.set_vm) – the video format is already + # configured by the main signal panel and re-applying it blocks until the + # device re-locks, causing an apparent UI freeze for pattern-only sends. + if not self._controller.apply_pattern(): + raise UcdApplyFailed("controller.apply_pattern 返回 False") + if getattr(self._controller, "current_timing", None) is None: + raise UcdConfigError( + "current_timing is None; please apply selected test profile/timing before sending pattern" + ) + try: + pg, _ = self._controller.get_tx_modules() + pg.apply() + except Exception as exc: + raise UcdSdkError("pg.apply() 失败") from exc + return True def _colorimetry_to_legacy_key(signal: SignalFormat) -> str: diff --git a/pqAutomationApp.py b/pqAutomationApp.py index dd4b150..0229ad8 100644 --- a/pqAutomationApp.py +++ b/pqAutomationApp.py @@ -394,6 +394,15 @@ class PQAutomationApp( return try: + def _safe_insert_tab(frame, text, target_pos=1): + """安全插入 Tab:有目标位置则插入,否则追加到末尾。""" + tabs = list(self.chart_notebook.tabs()) + if not tabs or target_pos >= len(tabs): + self.chart_notebook.add(frame, text=text) + return + before_tab_id = tabs[target_pos] + self.chart_notebook.insert(before_tab_id, frame, text=text) + current_tabs = list(self.chart_notebook.tabs()) gamma_tab_id = str(self.gamma_chart_frame) eotf_tab_id = str(self.eotf_chart_frame) @@ -402,14 +411,12 @@ class PQAutomationApp( if gamma_tab_id in current_tabs: self.chart_notebook.forget(self.gamma_chart_frame) if eotf_tab_id not in current_tabs: - insert_pos = min(1, len(self.chart_notebook.tabs())) - self.chart_notebook.insert(insert_pos, self.eotf_chart_frame, text="EOTF 曲线") + _safe_insert_tab(self.eotf_chart_frame, "EOTF 曲线", target_pos=1) else: if eotf_tab_id in current_tabs: self.chart_notebook.forget(self.eotf_chart_frame) if gamma_tab_id not in current_tabs: - insert_pos = min(1, len(self.chart_notebook.tabs())) - self.chart_notebook.insert(insert_pos, self.gamma_chart_frame, text="Gamma 曲线") + _safe_insert_tab(self.gamma_chart_frame, "Gamma 曲线", target_pos=1) custom_tab_id = str(self.custom_template_tab_frame) current_tabs = list(self.chart_notebook.tabs()) diff --git a/settings/pq_config.json b/settings/pq_config.json index eb26e0b..a92d57f 100644 --- a/settings/pq_config.json +++ b/settings/pq_config.json @@ -1,13 +1,10 @@ { - "current_test_type": "screen_module", + "current_test_type": "sdr_movie", "test_types": { "screen_module": { "name": "屏模组性能测试", "test_items": [ - "gamut", - "gamma", - "cct", - "contrast" + "gamma" ], "timing": "OVT 1280x 720 @ 120Hz", "data_range": "Full", @@ -31,10 +28,9 @@ "sdr_movie": { "name": "SDR Movie测试", "test_items": [ - "gamut", "accuracy" ], - "timing": "OVT 1280x 720 @ 120Hz", + "timing": "DMT 1680x 1050 @ 60Hz", "data_range": "Full", "color_format": "RGB", "bpc": 8,