"""AI 图片对话面板:输入提示 → 生成 → 显示 → 列表切换/保存/删除。""" from __future__ import annotations import os import sys import threading import time import logging import tkinter as tk from tkinter import filedialog, messagebox, simpledialog import ttkbootstrap as ttk from PIL import Image, ImageTk from app.services import ai_image as _svc from app.views.modern_styles import apply_tooltip_theme, get_theme_palette from typing import TYPE_CHECKING if TYPE_CHECKING: from pqAutomationApp import PQAutomationApp logger = logging.getLogger(__name__) def _theme_colors(): palette = get_theme_palette() return { "bg": palette["bg"], "fg": palette["fg"], "muted": palette["muted_fg"], "input_bg": palette["input_bg"], "input_fg": palette["input_fg"], "select_bg": palette["select_bg"], "select_fg": palette["select_fg"], "border": palette["border"], "tooltip_bg": palette["tooltip_bg"], "tooltip_fg": palette["tooltip_fg"], "tooltip_border": palette["tooltip_border"], } def _apply_ai_image_list_style(self: "PQAutomationApp"): """刷新 AI 图片列表控件样式,尽量贴合当前主题色板。""" palette = _theme_colors() style = ttk.Style() style.configure( "AIImage.Treeview", background=palette["input_bg"], fieldbackground=palette["input_bg"], foreground=palette["input_fg"], bordercolor=palette["border"], lightcolor=palette["border"], darkcolor=palette["border"], rowheight=24, font=("微软雅黑", 9), ) style.configure( "AIImage.Treeview.Heading", background=palette["bg"], foreground=palette["muted"], bordercolor=palette["border"], font=("微软雅黑", 9, "bold"), ) style.map( "AIImage.Treeview", background=[("selected", palette["select_bg"])], foreground=[("selected", palette["select_fg"])], ) def _hide_tree_tooltip(self: "PQAutomationApp"): tip = getattr(self, "_ai_image_tooltip", None) if tip is None: return try: tip.withdraw() except Exception: pass self._ai_image_tooltip_item = "" def _show_tree_tooltip(self: "PQAutomationApp", text: str, x_root: int, y_root: int, item_id: str): if not text: _hide_tree_tooltip(self) return tip = getattr(self, "_ai_image_tooltip", None) if tip is None: tip = tk.Toplevel(self.root) tip.withdraw() tip.overrideredirect(True) tip.transient(self.root) label = tk.Label( tip, text="", justify=tk.LEFT, anchor=tk.W, relief=tk.SOLID, bd=1, padx=8, pady=6, font=("微软雅黑", 9), wraplength=520, ) apply_tooltip_theme(tip, label) label.pack(fill=tk.BOTH, expand=True) self._ai_image_tooltip = tip self._ai_image_tooltip_label = label else: label = getattr(self, "_ai_image_tooltip_label", None) if label is None: return self._ai_image_tooltip_item = item_id label.configure(text=text) apply_tooltip_theme(tip, label) tip.geometry(f"+{x_root + 14}+{y_root + 18}") tip.deiconify() tip.lift() def _on_tree_motion(self: "PQAutomationApp", event): tree = self.ai_image_tree item_id = tree.identify_row(event.y) col = tree.identify_column(event.x) region = tree.identify("region", event.x, event.y) if not item_id or col != "#0" or region not in {"tree", "cell"}: _hide_tree_tooltip(self) return node_map = getattr(self, "_ai_image_node_map", None) or {} ridx = node_map.get(item_id) if ridx is None or ridx < 0 or ridx >= len(self.ai_image_records): _hide_tree_tooltip(self) return rec = self.ai_image_records[ridx] full_text = (rec.display_name or "").strip() if not full_text: _hide_tree_tooltip(self) return # 悬浮到任意图片标题项时都显示完整描述,避免因截断判定误差导致无响应。 if getattr(self, "_ai_image_tooltip_item", "") == item_id: tip = getattr(self, "_ai_image_tooltip", None) if tip is not None: try: tip.geometry(f"+{event.x_root + 14}+{event.y_root + 18}") except Exception: pass return _show_tree_tooltip(self, full_text, event.x_root, event.y_root, item_id) # ---------------- 面板创建 ---------------- def create_ai_image_panel(self: "PQAutomationApp"): """创建 AI 图片对话面板,并注册到面板管理。""" frame = ttk.Frame(self.content_frame) self.ai_image_frame = frame # 内部状态 self.ai_image_records = [] # list[AIImageRecord] self.ai_image_current = None # AIImageRecord | None self.ai_image_photo = None # 防止 ImageTk.PhotoImage 被 GC self._ai_image_requesting = False self._ai_image_progress_job = None self._ai_image_progress_phase = 0 self._ai_image_cancel_event = None self._ai_image_request_seq = 0 self._ai_image_active_seq = 0 self._ai_image_node_map = {} self._ai_image_session_node_map = {} self._ai_image_all_records = [] self._ai_image_search_var = tk.StringVar(value="") self._ai_image_count_var = tk.StringVar(value="0 张图片") self._ai_image_reloading = False self._ai_image_select_guard = False self._ai_image_list_loaded = False self._ai_image_tooltip = None self._ai_image_tooltip_label = None self._ai_image_tooltip_item = "" # 会话级参考图 URL(图生图模式):session_id -> upload_image_url self._ai_image_session_refs = {} # 本轮发送前手动上传的参考图(覆盖会话级) self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" self._ai_image_uploading = False self.ai_image_ref_var = tk.StringVar(value="未设置参考图(文生图模式)") container = ttk.Frame(frame, padding=10) container.pack(fill=tk.BOTH, expand=True) # 左列:图片列表 # 使用 grid + 权重,让右侧预览区优先占据剩余空间。 container.columnconfigure(0, weight=0) container.columnconfigure(1, weight=1) container.rowconfigure(0, weight=1) palette = _theme_colors() left = ttk.Frame(container, width=360) left.grid(row=0, column=0, sticky=tk.NS, padx=(0, 10)) left.grid_propagate(False) title_row = ttk.Frame(left) title_row.pack(fill=tk.X, pady=(0, 4)) ttk.Label(title_row, text="历史图片", font=("微软雅黑", 10, "bold")).pack(side=tk.LEFT) ttk.Label( title_row, textvariable=self._ai_image_count_var, foreground=palette["muted"], font=("微软雅黑", 9), ).pack(side=tk.RIGHT) search_row = ttk.Frame(left) search_row.pack(fill=tk.X, pady=(0, 6)) ttk.Label( search_row, text="搜索", foreground=palette["muted"], font=("微软雅黑", 9), ).pack(side=tk.LEFT, padx=(0, 6)) self.ai_image_search_entry = ttk.Entry( search_row, textvariable=self._ai_image_search_var, bootstyle="secondary", ) self.ai_image_search_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.ai_image_search_entry.bind("", lambda e: reload_ai_image_list(self, auto_select_first=False)) ttk.Button( search_row, text="清空", width=6, bootstyle="secondary-outline", command=lambda: _clear_list_search(self), ).pack(side=tk.LEFT, padx=(6, 0)) list_wrap = ttk.Frame(left, padding=2) list_wrap.pack(fill=tk.BOTH, expand=True) list_wrap.columnconfigure(0, weight=1) list_wrap.rowconfigure(0, weight=1) _apply_ai_image_list_style(self) scroll = ttk.Scrollbar(list_wrap, orient=tk.VERTICAL) self.ai_image_tree = ttk.Treeview( list_wrap, columns=("time",), show="tree headings", selectmode="browse", style="AIImage.Treeview", yscrollcommand=scroll.set, ) self.ai_image_tree.heading("#0", text="标题") self.ai_image_tree.heading("time", text="时间") self.ai_image_tree.column("#0", width=210, minwidth=140, anchor=tk.W) self.ai_image_tree.column("time", width=105, minwidth=90, stretch=False, anchor=tk.W) scroll.config(command=self.ai_image_tree.yview) self.ai_image_tree.grid(row=0, column=0, sticky=tk.NSEW) scroll.grid(row=0, column=1, sticky=tk.NS) self.ai_image_tree.bind("<>", lambda e: _on_list_select(self)) self.ai_image_tree.bind("", lambda e: _on_tree_double_click(self, e)) self.ai_image_tree.bind("", lambda e: _on_tree_motion(self, e)) self.ai_image_tree.bind("", lambda e: _hide_tree_tooltip(self)) # 右键菜单:发送到 UCD / 重命名 / 另存为 / 删除 # 索引: 0=发送, 1=sep, 2=重命名, 3=另存为, 4=删除 self.ai_image_menu = tk.Menu(self.root, tearoff=0) self.ai_image_menu.add_command( label="发送图片", command=lambda: _send_to_ucd(self), ) self.ai_image_menu.add_separator() self.ai_image_menu.add_command( label="重命名…", command=lambda: _rename_current(self), ) self.ai_image_menu.add_command( label="另存为…", command=lambda: _save_current(self), ) self.ai_image_menu.add_command( label="删除", command=lambda: _delete_current(self), ) self.ai_image_tree.bind( "", lambda e: _show_list_context_menu(self, e), ) btn_row = ttk.Frame(left) btn_row.pack(fill=tk.X, pady=(6, 0)) self.ai_image_send_ucd_btn = ttk.Button( btn_row, text="发送", bootstyle="info-outline", width=8, command=lambda: _send_to_ucd(self), ) self.ai_image_send_ucd_btn.pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="保存", bootstyle="success-outline", width=8, command=lambda: _save_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="删除", bootstyle="danger-outline", width=8, command=lambda: _delete_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="刷新", bootstyle="secondary-outline", width=8, command=lambda: reload_ai_image_list(self), ).pack(side=tk.LEFT) # 右列:预览 + 输入 right = ttk.Frame(container) right.grid(row=0, column=1, sticky=tk.NSEW) preview_frame = ttk.LabelFrame(right, text="图片预览", padding=6) preview_frame.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas = tk.Canvas( preview_frame, bg=palette["bg"], highlightthickness=0 ) self.ai_image_canvas.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas.bind("", lambda e: _redraw_preview(self)) meta_row = ttk.Frame(right) meta_row.pack(fill=tk.X, pady=(4, 4)) self.ai_image_meta_var = tk.StringVar(value="未选择图片") ttk.Label( meta_row, textvariable=self.ai_image_meta_var, foreground=palette["muted"], font=("微软雅黑", 9), ).pack(side=tk.LEFT) # 输入区 input_frame = ttk.LabelFrame(right, text="提示输入(Ctrl+Enter 发送)", padding=6) input_frame.pack(fill=tk.X, pady=(4, 0)) # 参考图行(图生图) ref_row = ttk.Frame(input_frame) ref_row.pack(fill=tk.X, pady=(0, 4)) ttk.Label( ref_row, text="参考图:", foreground=palette["muted"], font=("微软雅黑", 9), ).pack(side=tk.LEFT) self.ai_image_ref_label = ttk.Label( ref_row, textvariable=self.ai_image_ref_var, foreground=palette["fg"], font=("微软雅黑", 9), ) self.ai_image_ref_label.pack(side=tk.LEFT, padx=(4, 0), fill=tk.X, expand=True) self.ai_image_clear_ref_btn = ttk.Button( ref_row, text="清除", width=6, bootstyle="secondary-outline", command=lambda: _clear_reference_image(self), ) self.ai_image_clear_ref_btn.pack(side=tk.RIGHT, padx=(4, 0)) self.ai_image_upload_ref_btn = ttk.Button( ref_row, text="上传参考图…", width=12, bootstyle="info-outline", command=lambda: _upload_reference_image(self), ) self.ai_image_upload_ref_btn.pack(side=tk.RIGHT) self.ai_image_input = tk.Text( input_frame, height=3, wrap=tk.WORD, bg=palette["input_bg"], fg=palette["input_fg"], insertbackground=palette["input_fg"], ) self.ai_image_input.pack(fill=tk.X, side=tk.TOP) self.ai_image_input.bind("", lambda e: (_send_prompt(self), "break")) send_row = ttk.Frame(input_frame) send_row.pack(fill=tk.X, pady=(4, 0)) self.ai_image_status_var = tk.StringVar(value="就绪") ttk.Label( send_row, textvariable=self.ai_image_status_var, foreground=palette["muted"], font=("微软雅黑", 9), ).pack(side=tk.LEFT) self.ai_image_progress = ttk.Progressbar( send_row, mode="indeterminate", length=120, bootstyle="info-striped", ) self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.pack_forget() self.ai_image_send_btn = ttk.Button( send_row, text="发送", bootstyle="primary", width=10, command=lambda: _send_prompt(self), ) self.ai_image_send_btn.pack(side=tk.RIGHT) self.ai_image_new_session_btn = ttk.Button( send_row, text="新对话", bootstyle="secondary-outline", width=10, command=lambda: _start_new_session(self), ) self.ai_image_new_session_btn.pack(side=tk.RIGHT, padx=(0, 6)) self.ai_image_stop_btn = ttk.Button( send_row, text="停止", bootstyle="danger-outline", width=10, command=lambda: _stop_request(self), ) self.ai_image_stop_btn.pack(side=tk.RIGHT, padx=(0, 6)) self.ai_image_stop_btn.configure(state=tk.DISABLED) # 注册面板 self.register_panel("ai_image", frame, None, "ai_image_visible") self.ai_image_visible = False self.ai_image_meta_var.set("首次打开 AI 图片面板时加载缓存") def toggle_ai_image_panel(self: "PQAutomationApp"): """切换 AI 图片面板显隐。""" self.show_panel("ai_image") _apply_ai_image_list_style(self) if not getattr(self, "_ai_image_list_loaded", False): logger.info("[AIImagePanel] 首次显示面板,开始加载列表") reload_ai_image_list(self) self._ai_image_list_loaded = True def _get_app_base_dir(self: "PQAutomationApp") -> str: """返回应用根目录(settings 的上一级)。""" if getattr(self, "config_file", None): return os.path.dirname(os.path.dirname(self.config_file)) if getattr(sys, "frozen", False): return os.path.dirname(sys.executable) return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # ---------------- 列表 / 选中 ---------------- def reload_ai_image_list(self: "PQAutomationApp", auto_select_first=True): """重新扫描缓存并刷新列表。 按 ``session_id`` 分组:每个会话用一行不可选的分隔头(``── 会话 #N · 时间 ──``), 其下列出该轮生成的所有图片。会话按"最近使用"倒序,组内按时间倒序。 auto_select_first: 是否自动选中第一张图片(默认 True)。 """ t0 = time.monotonic() if getattr(self, "_ai_image_reloading", False): logger.debug("[AIImagePanel] 忽略重入 reload 请求") return self._ai_image_reloading = True try: _apply_ai_image_list_style(self) all_records = _svc.list_records(base_dir=_get_app_base_dir(self)) self._ai_image_all_records = list(all_records) self.ai_image_tree.delete(*self.ai_image_tree.get_children()) self._ai_image_node_map = {} self._ai_image_session_node_map = {} keyword = (self._ai_image_search_var.get() or "").strip().lower() records = [] for rec in all_records: if not keyword: records.append(rec) continue hay = "\n".join([ rec.display_name or "", rec.prompt or "", os.path.basename(rec.image_path), ]).lower() if keyword in hay: records.append(rec) self.ai_image_records = records sessions = _svc.group_records_by_session(self.ai_image_records) # \u4f1a\u8bdd\u7ea7\u53c2\u8003\u56fe\u94fe\u8def\uff1a\u4ee5\u6bcf\u4e2a\u4f1a\u8bdd\u6700\u8fd1\u4e00\u5f20\u751f\u6210\u56fe\u7684 imageUrl \u4f5c\u4e3a\u53c2\u8003 refs_map = getattr(self, "_ai_image_session_refs", None) if isinstance(refs_map, dict): for sess in sessions: sid = sess.get("session_id") or "" if not sid or sid in refs_map: continue for r in sess.get("records") or []: src = "" if isinstance(r.extra, dict): src = (r.extra.get("source_url") or "").strip() if src: refs_map[sid] = src break flat = [] current_sid = _svc.get_session_id() for idx, sess in enumerate(sessions, start=1): sid = sess["session_id"] is_current = sid and sid == current_sid header = _format_session_header(idx, sess, is_current=is_current) parent_id = self.ai_image_tree.insert( "", tk.END, text=header, values=("",), open=True, ) self._ai_image_session_node_map[parent_id] = sid for rec in sess["records"]: label = _format_list_label(rec) created = (rec.created_at or "").replace("T", " ")[:16] node_id = self.ai_image_tree.insert( parent_id, tk.END, text=label, values=(created,), ) self._ai_image_node_map[node_id] = len(flat) flat.append(rec) # 替换为按显示顺序展平后的列表,便于其它逻辑(前一张/后一张等) self.ai_image_records = flat total_count = len(all_records) visible_count = len(self.ai_image_records) if keyword: self._ai_image_count_var.set(f"{visible_count}/{total_count} 张") else: self._ai_image_count_var.set(f"{visible_count} 张图片") if self.ai_image_records and auto_select_first: first_id = "" for parent in self.ai_image_tree.get_children(""): children = self.ai_image_tree.get_children(parent) if children: first_id = children[0] break if first_id: _set_tree_selection(self, first_id) else: self.ai_image_current = None self.ai_image_photo = None self.ai_image_canvas.delete("all") self.ai_image_meta_var.set("暂无缓存图片" if not keyword else "当前筛选无结果") logger.info( "[AIImagePanel] reload 完成 total=%d visible=%d sessions=%d keyword=%r elapsed=%.3fs", total_count, visible_count, len(sessions), keyword, time.monotonic() - t0, ) self._ai_image_list_loaded = True finally: self._ai_image_reloading = False try: _refresh_ref_label(self) except Exception: pass def _format_session_header(index: int, sess: dict, is_current: bool) -> str: started = (sess.get("started_at") or "").replace("T", " ")[:16] tag = "(当前)" if is_current else "" count = len(sess.get("records") or []) if sess.get("session_id"): return f"会话 #{index} · {started} · {count} 张 {tag}".strip() return f"未归类 · {started} · {count} 张" def _format_list_label(rec: _svc.AIImageRecord) -> str: # 分辨率前缀:优先从 extra["size"] 取,其次从文件名猜 size_tag = "" extra = rec.extra or {} if isinstance(extra, dict) and extra.get("size"): size_tag = f"[{extra['size']}] " name_line = rec.display_name.splitlines()[0] if rec.display_name else "(未命名)" # Treeview 标题列可变宽,给展示名预留较长空间 max_name = 40 - len(size_tag) if max_name > 4 and len(name_line) > max_name: name_line = name_line[:max_name] + "…" return f"{size_tag}{name_line}" def _clear_list_search(self: "PQAutomationApp"): if (self._ai_image_search_var.get() or "").strip() == "": return self._ai_image_search_var.set("") reload_ai_image_list(self, auto_select_first=False) _hide_tree_tooltip(self) def _set_tree_selection(self: "PQAutomationApp", item_id: str): if not item_id: return _hide_tree_tooltip(self) try: current = self.ai_image_tree.selection() if current and current[0] == item_id: node_map = getattr(self, "_ai_image_node_map", None) or {} ridx = node_map.get(item_id) if ridx is not None and 0 <= ridx < len(self.ai_image_records): _select_record(self, self.ai_image_records[ridx]) return self.ai_image_tree.selection_set(item_id) self.ai_image_tree.focus(item_id) self.ai_image_tree.see(item_id) except Exception: return node_map = getattr(self, "_ai_image_node_map", None) or {} ridx = node_map.get(item_id) if ridx is not None and 0 <= ridx < len(self.ai_image_records): _select_record(self, self.ai_image_records[ridx]) def _find_tree_item_by_record_id(self: "PQAutomationApp", record_id: str) -> str: if not record_id: return "" node_map = getattr(self, "_ai_image_node_map", None) or {} for item_id, ridx in node_map.items(): if ridx is None or ridx >= len(self.ai_image_records): continue rec = self.ai_image_records[ridx] if rec.id == record_id: return item_id return "" def _on_list_select(self: "PQAutomationApp"): if getattr(self, "_ai_image_reloading", False): return if getattr(self, "_ai_image_select_guard", False): logger.debug("[AIImagePanel] 忽略重入选择事件") return sel = self.ai_image_tree.selection() if not sel: return self._ai_image_select_guard = True try: item_id = sel[0] node_map = getattr(self, "_ai_image_node_map", None) or {} ridx = node_map.get(item_id) if ridx is None: session_id = _session_id_for_item(self, item_id) if session_id: logger.info("[AIImagePanel] 选中会话头 sid=%s", session_id[:8]) _switch_to_session(self, session_id, show_message=False, refresh_list=False) return if 0 <= ridx < len(self.ai_image_records): rec = self.ai_image_records[ridx] if rec.session_id: _switch_to_session( self, rec.session_id, show_message=False, refresh_list=False, ) _select_record(self, rec) logger.debug("[AIImagePanel] 选中图片 id=%s sid=%s", rec.id, (rec.session_id or "")[:8]) finally: self._ai_image_select_guard = False def _on_tree_double_click(self: "PQAutomationApp", event): """双击会话头时只切换会话并展开;双击记录保持默认行为。""" item_id = self.ai_image_tree.identify_row(event.y) if not item_id: return node_map = getattr(self, "_ai_image_node_map", None) or {} if item_id in node_map: return sid = _session_id_for_item(self, item_id) if sid: _switch_to_session(self, sid, show_message=False, refresh_list=False) def _select_record(self: "PQAutomationApp", rec: _svc.AIImageRecord): self.ai_image_current = rec self.ai_image_meta_var.set( f"{os.path.basename(rec.image_path)} | {rec.created_at}" ) _redraw_preview(self) # ---------------- 预览绘制 ---------------- def _redraw_preview(self: "PQAutomationApp"): rec = getattr(self, "ai_image_current", None) canvas = self.ai_image_canvas palette = _theme_colors() canvas.delete("all") if rec is None or not os.path.isfile(rec.image_path): return cw = canvas.winfo_width() or 1 ch = canvas.winfo_height() or 1 try: img = Image.open(rec.image_path) img.load() except Exception as exc: canvas.create_text(cw // 2, ch // 2, text=f"加载失败: {exc}", fill=palette["select_bg"]) return iw, ih = img.size scale = min(cw / iw, ch / ih, 1.0) nw, nh = max(1, int(iw * scale)), max(1, int(ih * scale)) img_resized = img.resize((nw, nh), Image.LANCZOS) self.ai_image_photo = ImageTk.PhotoImage(img_resized) canvas.create_image(cw // 2, ch // 2, image=self.ai_image_photo, anchor="center") # ---------------- 发送 / 保存 / 删除 ---------------- def _start_new_session(self: "PQAutomationApp"): """开启新的对话会话,后续生成将使用新的 session_id。""" if getattr(self, "_ai_image_requesting", False): messagebox.showinfo("提示", "请等待当前请求完成") return _svc.reset_session() self.ai_image_status_var.set("已开启新对话") # 新会话清除参考图(未设置时默认为文生图模式) self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" _refresh_ref_label(self) # 新对话创建后不要自动选中历史图片,否则会立即把 session 切回旧会话。 reload_ai_image_list(self, auto_select_first=False) try: self.ai_image_tree.selection_remove(self.ai_image_tree.selection()) except Exception: pass self.ai_image_current = None self.ai_image_photo = None self.ai_image_canvas.delete("all") self.ai_image_meta_var.set("新对话已开启,等待生成图片") def _resolve_pending_ref_url(self: "PQAutomationApp") -> str: """返回本次发送应使用的 upload_image_url。 优先使用用户在当前会话中手动上传的参考图;其次使用上一轮 imageUrl 自动链路。 """ pending = (getattr(self, "_ai_image_pending_ref_url", "") or "").strip() if pending: return pending sid = _svc.get_session_id() refs = getattr(self, "_ai_image_session_refs", None) or {} return (refs.get(sid) or "").strip() def _refresh_ref_label(self: "PQAutomationApp"): """根据当前 pending 上传 / 会话链路状态刷新参考图提示标签。""" var = getattr(self, "ai_image_ref_var", None) if var is None: return pending = (getattr(self, "_ai_image_pending_ref_url", "") or "").strip() if pending: name = getattr(self, "_ai_image_pending_ref_name", "") or "参考图" var.set(f"[图生图] {name}") return sid = _svc.get_session_id() refs = getattr(self, "_ai_image_session_refs", None) or {} chained = (refs.get(sid) or "").strip() if chained: # 自动链路:显示来源是"上一轮" var.set("[图生图] 沿用上一轮生成图为参考") return var.set("未设置参考图(文生图模式)") def _upload_reference_image(self: "PQAutomationApp"): """选择本地图片并上传到后端,成功后作为本次发送的参考图。""" if getattr(self, "_ai_image_requesting", False): messagebox.showinfo("提示", "请等待当前请求完成") return if getattr(self, "_ai_image_uploading", False): return path = filedialog.askopenfilename( title="选择参考图(PNG/JPG/JPEG,超过限制将自动缩放)", filetypes=[("图片", "*.png;*.jpg;*.jpeg"), ("所有文件", "*.*")], ) if not path: return self._ai_image_uploading = True try: self.ai_image_upload_ref_btn.configure(state=tk.DISABLED, text="上传中…") self.ai_image_clear_ref_btn.configure(state=tk.DISABLED) except Exception: pass self.ai_image_status_var.set("正在上传参考图…") name = os.path.basename(path) def _ok(url: str): self.root.after(0, lambda: _on_upload_done(self, name, url, None)) def _err(exc: Exception): self.root.after(0, lambda: _on_upload_done(self, name, "", exc)) _svc.upload_image_async(path, on_success=_ok, on_error=_err) def _on_upload_done(self: "PQAutomationApp", name: str, url: str, exc): self._ai_image_uploading = False try: self.ai_image_upload_ref_btn.configure(state=tk.NORMAL, text="上传参考图…") self.ai_image_clear_ref_btn.configure(state=tk.NORMAL) except Exception: pass if exc is not None: self.ai_image_status_var.set(f"上传失败: {exc}") messagebox.showerror("上传失败", str(exc)) return self._ai_image_pending_ref_url = url self._ai_image_pending_ref_name = name _refresh_ref_label(self) self.ai_image_status_var.set(f"参考图已上传:{name}") def _clear_reference_image(self: "PQAutomationApp"): """清除手动上传的参考图。 v2.1 规则要求:从第二轮开始应使用最近一次成功生成图作为输入, 因此这里不清除会话级自动链路参考;若需彻底重置,请点“新对话”。 """ if getattr(self, "_ai_image_requesting", False): return self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" _refresh_ref_label(self) sid = _svc.get_session_id() refs = getattr(self, "_ai_image_session_refs", None) or {} if (refs.get(sid) or "").strip(): self.ai_image_status_var.set("已清除手动参考图,当前会话仍沿用上一轮生成图") else: self.ai_image_status_var.set("已清除参考图,当前为文生图模式") def _session_id_for_item(self: "PQAutomationApp", item_id: str) -> str: session_map = getattr(self, "_ai_image_session_node_map", None) or {} parent = item_id if parent not in session_map: try: parent = self.ai_image_tree.parent(item_id) except Exception: parent = "" return (session_map.get(parent) or "") if parent else "" def _switch_to_session( self: "PQAutomationApp", session_id: str, show_message: bool = True, target_record_id: str = "", refresh_list: bool = True, ): sid = (session_id or "").strip() if not sid: return if sid == _svc.get_session_id(): return _svc.set_session_id(sid) logger.info( "[AIImagePanel] 切换会话 sid=%s refresh=%s target=%s", sid[:8], refresh_list, target_record_id[:8] if target_record_id else "", ) if refresh_list: reload_ai_image_list(self) if target_record_id: item_id = _find_tree_item_by_record_id(self, target_record_id) if item_id: _set_tree_selection(self, item_id) self.ai_image_status_var.set("已切换到历史对话") # 切换会话后刷新参考图标签(pending 仅当前会话有效,故清除) self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" _refresh_ref_label(self) if show_message: messagebox.showinfo("提示", "已切换到所选历史对话") def _update_request_progress(self: "PQAutomationApp"): if not getattr(self, "_ai_image_requesting", False): self._ai_image_progress_job = None return self.ai_image_status_var.set("正在生成图片…") self._ai_image_progress_phase += 1 self._ai_image_progress_job = self.root.after(900, lambda: _update_request_progress(self)) def _send_prompt(self: "PQAutomationApp"): if getattr(self, "_ai_image_requesting", False): return if getattr(self, "_ai_image_uploading", False): messagebox.showinfo("提示", "参考图上传中,请稍后再发送") return prompt = self.ai_image_input.get("1.0", tk.END).strip() if not prompt: messagebox.showinfo("提示", "请输入内容") return self._ai_image_request_seq += 1 req_seq = self._ai_image_request_seq self._ai_image_active_seq = req_seq self._ai_image_cancel_event = threading.Event() _set_requesting(self, True) is_remote_url = _svc.is_remote_image_url(prompt) self.ai_image_status_var.set("下载中…" if is_remote_url else "后端处理中…") def _success(record): self.root.after(0, lambda: _on_request_done(self, record, None, req_seq)) def _error(exc): self.root.after(0, lambda: _on_request_done(self, None, exc, req_seq)) if is_remote_url: _svc.import_image_from_url_async( prompt, on_success=_success, on_error=_error, base_dir=_get_app_base_dir(self), cancel_event=self._ai_image_cancel_event, ) return ref_url = _resolve_pending_ref_url(self) if ref_url: self.ai_image_status_var.set("后端处理中(图生图)…") _svc.request_image_async( prompt, on_success=_success, on_error=_error, base_dir=_get_app_base_dir(self), cancel_event=self._ai_image_cancel_event, upload_image_url=ref_url or None, ) def _set_requesting(self: "PQAutomationApp", flag: bool): self._ai_image_requesting = flag try: self.ai_image_send_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) self.ai_image_new_session_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) self.ai_image_stop_btn.configure(state=tk.NORMAL if flag else tk.DISABLED) if hasattr(self, "ai_image_upload_ref_btn"): self.ai_image_upload_ref_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) if hasattr(self, "ai_image_clear_ref_btn"): self.ai_image_clear_ref_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) except Exception: pass if flag: self._ai_image_progress_phase = 0 self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.start(10) if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = self.root.after(0, lambda: _update_request_progress(self)) else: if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = None try: self.ai_image_progress.stop() self.ai_image_progress.pack_forget() except Exception: pass def _on_request_done(self: "PQAutomationApp", record, exc, req_seq): # 旧请求回调(例如用户已点击停止后)直接忽略 if req_seq != getattr(self, "_ai_image_active_seq", 0): return self._ai_image_active_seq = 0 self._ai_image_cancel_event = None _set_requesting(self, False) if exc is not None: self.ai_image_status_var.set(f"失败: {exc}") messagebox.showerror("生成失败", str(exc)) return self.ai_image_status_var.set("完成") self.ai_image_input.delete("1.0", tk.END) # 多轮对话链路:本轮返回的 imageUrl 作为下一轮的参考图 next_ref = "" try: if record is not None and isinstance(record.extra, dict): next_ref = (record.extra.get("source_url") or "").strip() except Exception: next_ref = "" sid = (record.session_id if record is not None else "") or _svc.get_session_id() if next_ref and sid: self._ai_image_session_refs[sid] = next_ref # 手动上传的 pending 参考图只对本次发送生效,发完清除 self._ai_image_pending_ref_url = "" self._ai_image_pending_ref_name = "" _refresh_ref_label(self) reload_ai_image_list(self) if record is not None: logger.info("[AIImagePanel] 生成完成并入列 id=%s sid=%s next_ref=%s", record.id, (record.session_id or "")[:8], next_ref or "-") if record is not None and self.ai_image_records: item_id = _find_tree_item_by_record_id(self, record.id) if item_id: _set_tree_selection(self, item_id) def _stop_request(self: "PQAutomationApp"): """停止当前生成任务(协作取消:屏蔽后续回调并恢复 UI)。""" if not getattr(self, "_ai_image_requesting", False): return event = getattr(self, "_ai_image_cancel_event", None) if event is not None: event.set() self._ai_image_active_seq = 0 _set_requesting(self, False) self.ai_image_status_var.set("已停止生成") def _save_current(self: "PQAutomationApp"): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return ext = os.path.splitext(rec.image_path)[1] or ".png" dest = filedialog.asksaveasfilename( title="另存为", defaultextension=ext, initialfile=os.path.basename(rec.image_path), filetypes=[("图片", "*.png;*.jpg;*.jpeg;*.bmp;*.webp"), ("所有文件", "*.*")], ) if not dest: return try: _svc.export_record(rec, dest) messagebox.showinfo("成功", f"已保存到:\n{dest}") except Exception as exc: messagebox.showerror("保存失败", str(exc)) def _delete_current(self: "PQAutomationApp"): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not messagebox.askyesno("确认删除", f"确定删除这张缓存图片吗?\n{os.path.basename(rec.image_path)}"): return _svc.delete_record(rec) reload_ai_image_list(self) def _rename_current(self: "PQAutomationApp"): """弹窗让用户修改当前记录的展示标题(保存到侧车 ``title`` 字段,原始 prompt 不变)。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return current = rec.title or rec.display_name new_name = simpledialog.askstring( "重命名", "修改显示标题(留空可恢复使用原始提示词):", initialvalue=current, parent=self.root, ) if new_name is None: # 取消 return new_name = new_name.strip() if new_name == (rec.title or ""): return if not _svc.update_record_title(rec, new_name): messagebox.showerror("保存失败", "无法更新元数据,请检查文件权限。") return target_id = rec.id reload_ai_image_list(self) item_id = _find_tree_item_by_record_id(self, target_id) if item_id: _set_tree_selection(self, item_id) # ---------------- 发送到 UCD ---------------- def _show_list_context_menu(self: "PQAutomationApp", event): """在图片列表上显示右键菜单,并根据状态启用/禁用项。""" _hide_tree_tooltip(self) try: item_id = self.ai_image_tree.identify_row(event.y) except Exception: item_id = "" node_map = getattr(self, "_ai_image_node_map", None) or {} ridx = node_map.get(item_id) if ridx is not None and 0 <= ridx < len(self.ai_image_records): _set_tree_selection(self, item_id) _select_record(self, self.ai_image_records[ridx]) elif item_id: sid = _session_id_for_item(self, item_id) if sid: _switch_to_session(self, sid, show_message=False, refresh_list=False) has_selection = self.ai_image_current is not None ucd = getattr(self, "ucd", None) can_send = ( has_selection and ucd is not None and getattr(ucd, "status", False) ) try: self.ai_image_menu.entryconfigure( 0, state=("normal" if can_send else "disabled") ) self.ai_image_menu.entryconfigure( 2, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 3, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 4, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.tk_popup(event.x_root, event.y_root) finally: self.ai_image_menu.grab_release() def _send_to_ucd(self: "PQAutomationApp"): """把当前选中的 AI 图片通过 UCD 发送到显示设备。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not os.path.isfile(rec.image_path): messagebox.showerror("错误", f"图片文件不存在:\n{rec.image_path}") return ucd = getattr(self, "ucd", None) if ucd is None or not getattr(ucd, "status", False): messagebox.showwarning("警告", "请先连接 UCD323 设备") return image_path = rec.image_path send_path = image_path log = getattr(self, "log_gui", None) # 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。 try: target_w, target_h = self.signal_service.current_resolution() except Exception: target_w, target_h = 3840, 2160 try: with Image.open(image_path) as _img: src_w, src_h = _img.size except Exception as exc: messagebox.showerror("错误", f"无法读取图片尺寸:\n{exc}") return if log is not None: log.log( f"UCD分辨率: {target_w}x{target_h} | 图片分辨率: {src_w}x{src_h}", level="info", ) if (src_w, src_h) != (target_w, target_h): if not messagebox.askyesno( "分辨率不匹配", ( f"当前 UCD 分辨率: {target_w}x{target_h}\n" f"图片分辨率: {src_w}x{src_h}\n\n" "是否自动缩放后再发送?" ), ): if log is not None: log.log("用户取消发送:图片分辨率与 UCD 不一致", level="warning") return try: send_path = _build_ucd_resized_image(image_path, target_w, target_h) if log is not None: log.log( f"已生成匹配分辨率副本: {os.path.basename(send_path)}", level="info", ) except Exception as exc: messagebox.showerror("错误", f"自动缩放失败:\n{exc}") return if log is not None: log.log( f"发送 AI 图片到 UCD: {os.path.basename(send_path)}", level="info", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("发送中…") def _worker(): err = None try: self.signal_service.send_image(send_path) ok = True except Exception as exc: ok = False err = str(exc) def _done(): if ok: if log is not None: log.log( f"图片已发送到 UCD: {os.path.basename(send_path)}", level="success", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("已发送到 UCD") else: msg = f"UCD 发送失败: {err}" if err else "UCD 发送失败" if log is not None: log.log(msg, level="error") if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("UCD 发送失败") try: self.root.after(0, _done) except Exception: pass threading.Thread(target=_worker, daemon=True).start() def _build_ucd_resized_image(image_path: str, target_w: int, target_h: int) -> str: """生成与 UCD 分辨率匹配的临时副本(缓存目录内)。""" base_dir = os.path.dirname(image_path) base_name = os.path.splitext(os.path.basename(image_path))[0] out_name = f"{base_name}_{target_w}x{target_h}_ucd.png" out_path = os.path.join(base_dir, out_name) with Image.open(image_path) as img: resized = img.convert("RGB").resize((target_w, target_h), Image.LANCZOS) resized.save(out_path, format="PNG") return out_path def refresh_ai_image_theme(self: "PQAutomationApp"): """刷新 AI 图片面板中的主题相关控件。""" if hasattr(self, "_apply_ai_image_list_style"): self._apply_ai_image_list_style() tip = getattr(self, "_ai_image_tooltip", None) label = getattr(self, "_ai_image_tooltip_label", None) if tip is not None and label is not None: apply_tooltip_theme(tip, label) class AIImagePanelMixin: """由 tools/refactor_to_mixins.py 自动生成。 把本模块的自由函数挂到 PQAutomationApp 上,便于 F12 跳转与类型推断。 """ create_ai_image_panel = create_ai_image_panel toggle_ai_image_panel = toggle_ai_image_panel _get_app_base_dir = _get_app_base_dir reload_ai_image_list = reload_ai_image_list _on_list_select = _on_list_select _select_record = _select_record _redraw_preview = _redraw_preview _start_new_session = _start_new_session _session_id_for_item = _session_id_for_item _switch_to_session = _switch_to_session _update_request_progress = _update_request_progress _send_prompt = _send_prompt _set_requesting = _set_requesting _on_request_done = _on_request_done _stop_request = _stop_request _save_current = _save_current _delete_current = _delete_current _rename_current = _rename_current _show_list_context_menu = _show_list_context_menu _send_to_ucd = _send_to_ucd _apply_ai_image_list_style = _apply_ai_image_list_style refresh_ai_image_theme = refresh_ai_image_theme