Files
pqAutomationApp/app/views/panels/ai_image_panel.py

1247 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 图片对话面板:输入提示 → 生成 → 显示 → 列表切换/保存/删除。"""
from __future__ import annotations
import os
import sys
import threading
import time
import logging
import tkinter as tk
from tkinter import filedialog, messagebox, simpledialog
import ttkbootstrap as ttk
from PIL import Image, ImageTk
from app.services import ai_image as _svc
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pqAutomationApp import PQAutomationApp
logger = logging.getLogger(__name__)
def _theme_colors():
style = ttk.Style()
colors = style.colors
return {
"bg": colors.bg,
"fg": colors.fg,
"muted": colors.secondary,
"input_bg": colors.inputbg,
"input_fg": colors.inputfg,
"select_bg": colors.selectbg,
"select_fg": colors.selectfg,
"border": colors.border,
}
def _apply_ai_image_list_style(self: "PQAutomationApp"):
"""刷新 AI 图片列表控件样式,尽量贴合当前主题色板。"""
palette = _theme_colors()
style = ttk.Style()
style.configure(
"AIImage.Treeview",
background=palette["input_bg"],
fieldbackground=palette["input_bg"],
foreground=palette["input_fg"],
bordercolor=palette["border"],
lightcolor=palette["border"],
darkcolor=palette["border"],
rowheight=24,
font=("微软雅黑", 9),
)
style.configure(
"AIImage.Treeview.Heading",
background=palette["bg"],
foreground=palette["muted"],
bordercolor=palette["border"],
font=("微软雅黑", 9, "bold"),
)
style.map(
"AIImage.Treeview",
background=[("selected", palette["select_bg"])],
foreground=[("selected", palette["select_fg"])],
)
def _hide_tree_tooltip(self: "PQAutomationApp"):
tip = getattr(self, "_ai_image_tooltip", None)
if tip is None:
return
try:
tip.withdraw()
except Exception:
pass
self._ai_image_tooltip_item = ""
def _show_tree_tooltip(self: "PQAutomationApp", text: str, x_root: int, y_root: int, item_id: str):
if not text:
_hide_tree_tooltip(self)
return
tip = getattr(self, "_ai_image_tooltip", None)
if tip is None:
tip = tk.Toplevel(self.root)
tip.withdraw()
tip.overrideredirect(True)
tip.transient(self.root)
label = tk.Label(
tip,
text="",
justify=tk.LEFT,
anchor=tk.W,
bg="#ffffff",
fg="#1f2937",
relief=tk.SOLID,
bd=1,
padx=8,
pady=6,
font=("微软雅黑", 9),
wraplength=520,
)
label.pack(fill=tk.BOTH, expand=True)
self._ai_image_tooltip = tip
self._ai_image_tooltip_label = label
else:
label = getattr(self, "_ai_image_tooltip_label", None)
if label is None:
return
self._ai_image_tooltip_item = item_id
label.configure(text=text)
tip.geometry(f"+{x_root + 14}+{y_root + 18}")
tip.deiconify()
tip.lift()
def _on_tree_motion(self: "PQAutomationApp", event):
tree = self.ai_image_tree
item_id = tree.identify_row(event.y)
col = tree.identify_column(event.x)
region = tree.identify("region", event.x, event.y)
if not item_id or col != "#0" or region not in {"tree", "cell"}:
_hide_tree_tooltip(self)
return
node_map = getattr(self, "_ai_image_node_map", None) or {}
ridx = node_map.get(item_id)
if ridx is None or ridx < 0 or ridx >= len(self.ai_image_records):
_hide_tree_tooltip(self)
return
rec = self.ai_image_records[ridx]
full_text = (rec.display_name or "").strip()
if not full_text:
_hide_tree_tooltip(self)
return
# 悬浮到任意图片标题项时都显示完整描述,避免因截断判定误差导致无响应。
if getattr(self, "_ai_image_tooltip_item", "") == item_id:
tip = getattr(self, "_ai_image_tooltip", None)
if tip is not None:
try:
tip.geometry(f"+{event.x_root + 14}+{event.y_root + 18}")
except Exception:
pass
return
_show_tree_tooltip(self, full_text, event.x_root, event.y_root, item_id)
# ---------------- 面板创建 ----------------
def create_ai_image_panel(self: "PQAutomationApp"):
"""创建 AI 图片对话面板,并注册到面板管理。"""
frame = ttk.Frame(self.content_frame)
self.ai_image_frame = frame
# 内部状态
self.ai_image_records = [] # list[AIImageRecord]
self.ai_image_current = None # AIImageRecord | None
self.ai_image_photo = None # 防止 ImageTk.PhotoImage 被 GC
self._ai_image_requesting = False
self._ai_image_progress_job = None
self._ai_image_progress_phase = 0
self._ai_image_cancel_event = None
self._ai_image_request_seq = 0
self._ai_image_active_seq = 0
self._ai_image_node_map = {}
self._ai_image_session_node_map = {}
self._ai_image_all_records = []
self._ai_image_search_var = tk.StringVar(value="")
self._ai_image_count_var = tk.StringVar(value="0 张图片")
self._ai_image_reloading = False
self._ai_image_select_guard = False
self._ai_image_list_loaded = False
self._ai_image_tooltip = None
self._ai_image_tooltip_label = None
self._ai_image_tooltip_item = ""
# 会话级参考图 URL图生图模式session_id -> upload_image_url
self._ai_image_session_refs = {}
# 本轮发送前手动上传的参考图(覆盖会话级)
self._ai_image_pending_ref_url = ""
self._ai_image_pending_ref_name = ""
self._ai_image_uploading = False
self.ai_image_ref_var = tk.StringVar(value="未设置参考图(文生图模式)")
container = ttk.Frame(frame, padding=10)
container.pack(fill=tk.BOTH, expand=True)
# 左列:图片列表
# 使用 grid + 权重,让右侧预览区优先占据剩余空间。
container.columnconfigure(0, weight=0)
container.columnconfigure(1, weight=1)
container.rowconfigure(0, weight=1)
palette = _theme_colors()
left = ttk.Frame(container, width=360)
left.grid(row=0, column=0, sticky=tk.NS, padx=(0, 10))
left.grid_propagate(False)
title_row = ttk.Frame(left)
title_row.pack(fill=tk.X, pady=(0, 4))
ttk.Label(title_row, text="历史图片", font=("微软雅黑", 10, "bold")).pack(side=tk.LEFT)
ttk.Label(
title_row,
textvariable=self._ai_image_count_var,
foreground=palette["muted"],
font=("微软雅黑", 9),
).pack(side=tk.RIGHT)
search_row = ttk.Frame(left)
search_row.pack(fill=tk.X, pady=(0, 6))
ttk.Label(
search_row,
text="搜索",
foreground=palette["muted"],
font=("微软雅黑", 9),
).pack(side=tk.LEFT, padx=(0, 6))
self.ai_image_search_entry = ttk.Entry(
search_row,
textvariable=self._ai_image_search_var,
bootstyle="secondary",
)
self.ai_image_search_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.ai_image_search_entry.bind("<KeyRelease>", lambda e: reload_ai_image_list(self, auto_select_first=False))
ttk.Button(
search_row,
text="清空",
width=6,
bootstyle="secondary-outline",
command=lambda: _clear_list_search(self),
).pack(side=tk.LEFT, padx=(6, 0))
list_wrap = ttk.Frame(left, padding=2)
list_wrap.pack(fill=tk.BOTH, expand=True)
list_wrap.columnconfigure(0, weight=1)
list_wrap.rowconfigure(0, weight=1)
_apply_ai_image_list_style(self)
scroll = ttk.Scrollbar(list_wrap, orient=tk.VERTICAL)
self.ai_image_tree = ttk.Treeview(
list_wrap,
columns=("time",),
show="tree headings",
selectmode="browse",
style="AIImage.Treeview",
yscrollcommand=scroll.set,
)
self.ai_image_tree.heading("#0", text="标题")
self.ai_image_tree.heading("time", text="时间")
self.ai_image_tree.column("#0", width=210, minwidth=140, anchor=tk.W)
self.ai_image_tree.column("time", width=105, minwidth=90, stretch=False, anchor=tk.W)
scroll.config(command=self.ai_image_tree.yview)
self.ai_image_tree.grid(row=0, column=0, sticky=tk.NSEW)
scroll.grid(row=0, column=1, sticky=tk.NS)
self.ai_image_tree.bind("<<TreeviewSelect>>", lambda e: _on_list_select(self))
self.ai_image_tree.bind("<Double-1>", lambda e: _on_tree_double_click(self, e))
self.ai_image_tree.bind("<Motion>", lambda e: _on_tree_motion(self, e))
self.ai_image_tree.bind("<Leave>", lambda e: _hide_tree_tooltip(self))
# 右键菜单:发送到 UCD / 重命名 / 另存为 / 删除
# 索引: 0=发送, 1=sep, 2=重命名, 3=另存为, 4=删除
self.ai_image_menu = tk.Menu(self.root, tearoff=0)
self.ai_image_menu.add_command(
label="发送图片",
command=lambda: _send_to_ucd(self),
)
self.ai_image_menu.add_separator()
self.ai_image_menu.add_command(
label="重命名…",
command=lambda: _rename_current(self),
)
self.ai_image_menu.add_command(
label="另存为…",
command=lambda: _save_current(self),
)
self.ai_image_menu.add_command(
label="删除",
command=lambda: _delete_current(self),
)
self.ai_image_tree.bind(
"<Button-3>",
lambda e: _show_list_context_menu(self, e),
)
btn_row = ttk.Frame(left)
btn_row.pack(fill=tk.X, pady=(6, 0))
self.ai_image_send_ucd_btn = ttk.Button(
btn_row, text="发送", bootstyle="info-outline", width=8,
command=lambda: _send_to_ucd(self),
)
self.ai_image_send_ucd_btn.pack(side=tk.LEFT, padx=(0, 4))
ttk.Button(
btn_row, text="保存", bootstyle="success-outline", width=8,
command=lambda: _save_current(self),
).pack(side=tk.LEFT, padx=(0, 4))
ttk.Button(
btn_row, text="删除", bootstyle="danger-outline", width=8,
command=lambda: _delete_current(self),
).pack(side=tk.LEFT, padx=(0, 4))
ttk.Button(
btn_row, text="刷新", bootstyle="secondary-outline", width=8,
command=lambda: reload_ai_image_list(self),
).pack(side=tk.LEFT)
# 右列:预览 + 输入
right = ttk.Frame(container)
right.grid(row=0, column=1, sticky=tk.NSEW)
preview_frame = ttk.LabelFrame(right, text="图片预览", padding=6)
preview_frame.pack(fill=tk.BOTH, expand=True)
self.ai_image_canvas = tk.Canvas(
preview_frame, bg=palette["bg"], highlightthickness=0
)
self.ai_image_canvas.pack(fill=tk.BOTH, expand=True)
self.ai_image_canvas.bind("<Configure>", lambda e: _redraw_preview(self))
meta_row = ttk.Frame(right)
meta_row.pack(fill=tk.X, pady=(4, 4))
self.ai_image_meta_var = tk.StringVar(value="未选择图片")
ttk.Label(
meta_row, textvariable=self.ai_image_meta_var,
foreground=palette["muted"], font=("微软雅黑", 9),
).pack(side=tk.LEFT)
# 输入区
input_frame = ttk.LabelFrame(right, text="提示输入Ctrl+Enter 发送)", padding=6)
input_frame.pack(fill=tk.X, pady=(4, 0))
# 参考图行(图生图)
ref_row = ttk.Frame(input_frame)
ref_row.pack(fill=tk.X, pady=(0, 4))
ttk.Label(
ref_row, text="参考图:",
foreground=palette["muted"], font=("微软雅黑", 9),
).pack(side=tk.LEFT)
self.ai_image_ref_label = ttk.Label(
ref_row, textvariable=self.ai_image_ref_var,
foreground=palette["fg"], font=("微软雅黑", 9),
)
self.ai_image_ref_label.pack(side=tk.LEFT, padx=(4, 0), fill=tk.X, expand=True)
self.ai_image_clear_ref_btn = ttk.Button(
ref_row, text="清除", width=6, bootstyle="secondary-outline",
command=lambda: _clear_reference_image(self),
)
self.ai_image_clear_ref_btn.pack(side=tk.RIGHT, padx=(4, 0))
self.ai_image_upload_ref_btn = ttk.Button(
ref_row, text="上传参考图…", width=12, bootstyle="info-outline",
command=lambda: _upload_reference_image(self),
)
self.ai_image_upload_ref_btn.pack(side=tk.RIGHT)
self.ai_image_input = tk.Text(
input_frame,
height=3,
wrap=tk.WORD,
bg=palette["input_bg"],
fg=palette["input_fg"],
insertbackground=palette["input_fg"],
)
self.ai_image_input.pack(fill=tk.X, side=tk.TOP)
self.ai_image_input.bind("<Control-Return>", lambda e: (_send_prompt(self), "break"))
send_row = ttk.Frame(input_frame)
send_row.pack(fill=tk.X, pady=(4, 0))
self.ai_image_status_var = tk.StringVar(value="就绪")
ttk.Label(
send_row, textvariable=self.ai_image_status_var,
foreground=palette["muted"], font=("微软雅黑", 9),
).pack(side=tk.LEFT)
self.ai_image_progress = ttk.Progressbar(
send_row,
mode="indeterminate",
length=120,
bootstyle="info-striped",
)
self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0))
self.ai_image_progress.pack_forget()
self.ai_image_send_btn = ttk.Button(
send_row, text="发送", bootstyle="primary", width=10,
command=lambda: _send_prompt(self),
)
self.ai_image_send_btn.pack(side=tk.RIGHT)
self.ai_image_new_session_btn = ttk.Button(
send_row, text="新对话", bootstyle="secondary-outline", width=10,
command=lambda: _start_new_session(self),
)
self.ai_image_new_session_btn.pack(side=tk.RIGHT, padx=(0, 6))
self.ai_image_stop_btn = ttk.Button(
send_row, text="停止", bootstyle="danger-outline", width=10,
command=lambda: _stop_request(self),
)
self.ai_image_stop_btn.pack(side=tk.RIGHT, padx=(0, 6))
self.ai_image_stop_btn.configure(state=tk.DISABLED)
# 注册面板
self.register_panel("ai_image", frame, None, "ai_image_visible")
self.ai_image_visible = False
self.ai_image_meta_var.set("首次打开 AI 图片面板时加载缓存")
def toggle_ai_image_panel(self: "PQAutomationApp"):
"""切换 AI 图片面板显隐。"""
self.show_panel("ai_image")
_apply_ai_image_list_style(self)
if not getattr(self, "_ai_image_list_loaded", False):
logger.info("[AIImagePanel] 首次显示面板,开始加载列表")
reload_ai_image_list(self)
self._ai_image_list_loaded = True
def _get_app_base_dir(self: "PQAutomationApp") -> str:
"""返回应用根目录settings 的上一级)。"""
if getattr(self, "config_file", None):
return os.path.dirname(os.path.dirname(self.config_file))
if getattr(sys, "frozen", False):
return os.path.dirname(sys.executable)
return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# ---------------- 列表 / 选中 ----------------
def reload_ai_image_list(self: "PQAutomationApp", auto_select_first=True):
"""重新扫描缓存并刷新列表。
按 ``session_id`` 分组:每个会话用一行不可选的分隔头(``── 会话 #N · 时间 ──``
其下列出该轮生成的所有图片。会话按"最近使用"倒序,组内按时间倒序。
auto_select_first: 是否自动选中第一张图片(默认 True
"""
t0 = time.monotonic()
if getattr(self, "_ai_image_reloading", False):
logger.debug("[AIImagePanel] 忽略重入 reload 请求")
return
self._ai_image_reloading = True
try:
_apply_ai_image_list_style(self)
all_records = _svc.list_records(base_dir=_get_app_base_dir(self))
self._ai_image_all_records = list(all_records)
self.ai_image_tree.delete(*self.ai_image_tree.get_children())
self._ai_image_node_map = {}
self._ai_image_session_node_map = {}
keyword = (self._ai_image_search_var.get() or "").strip().lower()
records = []
for rec in all_records:
if not keyword:
records.append(rec)
continue
hay = "\n".join([
rec.display_name or "",
rec.prompt or "",
os.path.basename(rec.image_path),
]).lower()
if keyword in hay:
records.append(rec)
self.ai_image_records = records
sessions = _svc.group_records_by_session(self.ai_image_records)
# \u4f1a\u8bdd\u7ea7\u53c2\u8003\u56fe\u94fe\u8def\uff1a\u4ee5\u6bcf\u4e2a\u4f1a\u8bdd\u6700\u8fd1\u4e00\u5f20\u751f\u6210\u56fe\u7684 imageUrl \u4f5c\u4e3a\u53c2\u8003
refs_map = getattr(self, "_ai_image_session_refs", None)
if isinstance(refs_map, dict):
for sess in sessions:
sid = sess.get("session_id") or ""
if not sid or sid in refs_map:
continue
for r in sess.get("records") or []:
src = ""
if isinstance(r.extra, dict):
src = (r.extra.get("source_url") or "").strip()
if src:
refs_map[sid] = src
break
flat = []
current_sid = _svc.get_session_id()
for idx, sess in enumerate(sessions, start=1):
sid = sess["session_id"]
is_current = sid and sid == current_sid
header = _format_session_header(idx, sess, is_current=is_current)
parent_id = self.ai_image_tree.insert(
"",
tk.END,
text=header,
values=("",),
open=True,
)
self._ai_image_session_node_map[parent_id] = sid
for rec in sess["records"]:
label = _format_list_label(rec)
created = (rec.created_at or "").replace("T", " ")[:16]
node_id = self.ai_image_tree.insert(
parent_id,
tk.END,
text=label,
values=(created,),
)
self._ai_image_node_map[node_id] = len(flat)
flat.append(rec)
# 替换为按显示顺序展平后的列表,便于其它逻辑(前一张/后一张等)
self.ai_image_records = flat
total_count = len(all_records)
visible_count = len(self.ai_image_records)
if keyword:
self._ai_image_count_var.set(f"{visible_count}/{total_count}")
else:
self._ai_image_count_var.set(f"{visible_count} 张图片")
if self.ai_image_records and auto_select_first:
first_id = ""
for parent in self.ai_image_tree.get_children(""):
children = self.ai_image_tree.get_children(parent)
if children:
first_id = children[0]
break
if first_id:
_set_tree_selection(self, first_id)
else:
self.ai_image_current = None
self.ai_image_photo = None
self.ai_image_canvas.delete("all")
self.ai_image_meta_var.set("暂无缓存图片" if not keyword else "当前筛选无结果")
logger.info(
"[AIImagePanel] reload 完成 total=%d visible=%d sessions=%d keyword=%r elapsed=%.3fs",
total_count,
visible_count,
len(sessions),
keyword,
time.monotonic() - t0,
)
self._ai_image_list_loaded = True
finally:
self._ai_image_reloading = False
try:
_refresh_ref_label(self)
except Exception:
pass
def _format_session_header(index: int, sess: dict, is_current: bool) -> str:
started = (sess.get("started_at") or "").replace("T", " ")[:16]
tag = "(当前)" if is_current else ""
count = len(sess.get("records") or [])
if sess.get("session_id"):
return f"会话 #{index} · {started} · {count}{tag}".strip()
return f"未归类 · {started} · {count}"
def _format_list_label(rec: _svc.AIImageRecord) -> str:
# 分辨率前缀:优先从 extra["size"] 取,其次从文件名猜
size_tag = ""
extra = rec.extra or {}
if isinstance(extra, dict) and extra.get("size"):
size_tag = f"[{extra['size']}] "
name_line = rec.display_name.splitlines()[0] if rec.display_name else "(未命名)"
# Treeview 标题列可变宽,给展示名预留较长空间
max_name = 40 - len(size_tag)
if max_name > 4 and len(name_line) > max_name:
name_line = name_line[:max_name] + ""
return f"{size_tag}{name_line}"
def _clear_list_search(self: "PQAutomationApp"):
if (self._ai_image_search_var.get() or "").strip() == "":
return
self._ai_image_search_var.set("")
reload_ai_image_list(self, auto_select_first=False)
_hide_tree_tooltip(self)
def _set_tree_selection(self: "PQAutomationApp", item_id: str):
if not item_id:
return
_hide_tree_tooltip(self)
try:
current = self.ai_image_tree.selection()
if current and current[0] == item_id:
node_map = getattr(self, "_ai_image_node_map", None) or {}
ridx = node_map.get(item_id)
if ridx is not None and 0 <= ridx < len(self.ai_image_records):
_select_record(self, self.ai_image_records[ridx])
return
self.ai_image_tree.selection_set(item_id)
self.ai_image_tree.focus(item_id)
self.ai_image_tree.see(item_id)
except Exception:
return
node_map = getattr(self, "_ai_image_node_map", None) or {}
ridx = node_map.get(item_id)
if ridx is not None and 0 <= ridx < len(self.ai_image_records):
_select_record(self, self.ai_image_records[ridx])
def _find_tree_item_by_record_id(self: "PQAutomationApp", record_id: str) -> str:
if not record_id:
return ""
node_map = getattr(self, "_ai_image_node_map", None) or {}
for item_id, ridx in node_map.items():
if ridx is None or ridx >= len(self.ai_image_records):
continue
rec = self.ai_image_records[ridx]
if rec.id == record_id:
return item_id
return ""
def _on_list_select(self: "PQAutomationApp"):
if getattr(self, "_ai_image_reloading", False):
return
if getattr(self, "_ai_image_select_guard", False):
logger.debug("[AIImagePanel] 忽略重入选择事件")
return
sel = self.ai_image_tree.selection()
if not sel:
return
self._ai_image_select_guard = True
try:
item_id = sel[0]
node_map = getattr(self, "_ai_image_node_map", None) or {}
ridx = node_map.get(item_id)
if ridx is None:
session_id = _session_id_for_item(self, item_id)
if session_id:
logger.info("[AIImagePanel] 选中会话头 sid=%s", session_id[:8])
_switch_to_session(self, session_id, show_message=False, refresh_list=False)
return
if 0 <= ridx < len(self.ai_image_records):
rec = self.ai_image_records[ridx]
if rec.session_id:
_switch_to_session(
self,
rec.session_id,
show_message=False,
refresh_list=False,
)
_select_record(self, rec)
logger.debug("[AIImagePanel] 选中图片 id=%s sid=%s", rec.id, (rec.session_id or "")[:8])
finally:
self._ai_image_select_guard = False
def _on_tree_double_click(self: "PQAutomationApp", event):
"""双击会话头时只切换会话并展开;双击记录保持默认行为。"""
item_id = self.ai_image_tree.identify_row(event.y)
if not item_id:
return
node_map = getattr(self, "_ai_image_node_map", None) or {}
if item_id in node_map:
return
sid = _session_id_for_item(self, item_id)
if sid:
_switch_to_session(self, sid, show_message=False, refresh_list=False)
def _select_record(self: "PQAutomationApp", rec: _svc.AIImageRecord):
self.ai_image_current = rec
self.ai_image_meta_var.set(
f"{os.path.basename(rec.image_path)} | {rec.created_at}"
)
_redraw_preview(self)
# ---------------- 预览绘制 ----------------
def _redraw_preview(self: "PQAutomationApp"):
rec = getattr(self, "ai_image_current", None)
canvas = self.ai_image_canvas
palette = _theme_colors()
canvas.delete("all")
if rec is None or not os.path.isfile(rec.image_path):
return
cw = canvas.winfo_width() or 1
ch = canvas.winfo_height() or 1
try:
img = Image.open(rec.image_path)
img.load()
except Exception as exc:
canvas.create_text(cw // 2, ch // 2, text=f"加载失败: {exc}", fill=palette["select_bg"])
return
iw, ih = img.size
scale = min(cw / iw, ch / ih, 1.0)
nw, nh = max(1, int(iw * scale)), max(1, int(ih * scale))
img_resized = img.resize((nw, nh), Image.LANCZOS)
self.ai_image_photo = ImageTk.PhotoImage(img_resized)
canvas.create_image(cw // 2, ch // 2, image=self.ai_image_photo, anchor="center")
# ---------------- 发送 / 保存 / 删除 ----------------
def _start_new_session(self: "PQAutomationApp"):
"""开启新的对话会话,后续生成将使用新的 session_id。"""
if getattr(self, "_ai_image_requesting", False):
messagebox.showinfo("提示", "请等待当前请求完成")
return
_svc.reset_session()
self.ai_image_status_var.set("已开启新对话")
# 新会话清除参考图(未设置时默认为文生图模式)
self._ai_image_pending_ref_url = ""
self._ai_image_pending_ref_name = ""
_refresh_ref_label(self)
# 新对话创建后不要自动选中历史图片,否则会立即把 session 切回旧会话。
reload_ai_image_list(self, auto_select_first=False)
try:
self.ai_image_tree.selection_remove(self.ai_image_tree.selection())
except Exception:
pass
self.ai_image_current = None
self.ai_image_photo = None
self.ai_image_canvas.delete("all")
self.ai_image_meta_var.set("新对话已开启,等待生成图片")
def _resolve_pending_ref_url(self: "PQAutomationApp") -> str:
"""返回本次发送应使用的 upload_image_url。
优先使用用户在当前会话中手动上传的参考图;其次使用上一轮 imageUrl 自动链路。
"""
pending = (getattr(self, "_ai_image_pending_ref_url", "") or "").strip()
if pending:
return pending
sid = _svc.get_session_id()
refs = getattr(self, "_ai_image_session_refs", None) or {}
return (refs.get(sid) or "").strip()
def _refresh_ref_label(self: "PQAutomationApp"):
"""根据当前 pending 上传 / 会话链路状态刷新参考图提示标签。"""
var = getattr(self, "ai_image_ref_var", None)
if var is None:
return
pending = (getattr(self, "_ai_image_pending_ref_url", "") or "").strip()
if pending:
name = getattr(self, "_ai_image_pending_ref_name", "") or "参考图"
var.set(f"[图生图] {name}")
return
sid = _svc.get_session_id()
refs = getattr(self, "_ai_image_session_refs", None) or {}
chained = (refs.get(sid) or "").strip()
if chained:
# 自动链路:显示来源是"上一轮"
var.set("[图生图] 沿用上一轮生成图为参考")
return
var.set("未设置参考图(文生图模式)")
def _upload_reference_image(self: "PQAutomationApp"):
"""选择本地图片并上传到后端,成功后作为本次发送的参考图。"""
if getattr(self, "_ai_image_requesting", False):
messagebox.showinfo("提示", "请等待当前请求完成")
return
if getattr(self, "_ai_image_uploading", False):
return
path = filedialog.askopenfilename(
title="选择参考图PNG/JPG/JPEG超过限制将自动缩放",
filetypes=[("图片", "*.png;*.jpg;*.jpeg"), ("所有文件", "*.*")],
)
if not path:
return
self._ai_image_uploading = True
try:
self.ai_image_upload_ref_btn.configure(state=tk.DISABLED, text="上传中…")
self.ai_image_clear_ref_btn.configure(state=tk.DISABLED)
except Exception:
pass
self.ai_image_status_var.set("正在上传参考图…")
name = os.path.basename(path)
def _ok(url: str):
self.root.after(0, lambda: _on_upload_done(self, name, url, None))
def _err(exc: Exception):
self.root.after(0, lambda: _on_upload_done(self, name, "", exc))
_svc.upload_image_async(path, on_success=_ok, on_error=_err)
def _on_upload_done(self: "PQAutomationApp", name: str, url: str, exc):
self._ai_image_uploading = False
try:
self.ai_image_upload_ref_btn.configure(state=tk.NORMAL, text="上传参考图…")
self.ai_image_clear_ref_btn.configure(state=tk.NORMAL)
except Exception:
pass
if exc is not None:
self.ai_image_status_var.set(f"上传失败: {exc}")
messagebox.showerror("上传失败", str(exc))
return
self._ai_image_pending_ref_url = url
self._ai_image_pending_ref_name = name
_refresh_ref_label(self)
self.ai_image_status_var.set(f"参考图已上传:{name}")
def _clear_reference_image(self: "PQAutomationApp"):
"""清除手动上传的参考图,同时清除当前会话的自动链路参考。"""
if getattr(self, "_ai_image_requesting", False):
return
self._ai_image_pending_ref_url = ""
self._ai_image_pending_ref_name = ""
sid = _svc.get_session_id()
refs = getattr(self, "_ai_image_session_refs", None)
if isinstance(refs, dict):
refs.pop(sid, None)
_refresh_ref_label(self)
self.ai_image_status_var.set("已清除参考图,切换为文生图模式")
def _session_id_for_item(self: "PQAutomationApp", item_id: str) -> str:
session_map = getattr(self, "_ai_image_session_node_map", None) or {}
parent = item_id
if parent not in session_map:
try:
parent = self.ai_image_tree.parent(item_id)
except Exception:
parent = ""
return (session_map.get(parent) or "") if parent else ""
def _switch_to_session(
self: "PQAutomationApp",
session_id: str,
show_message: bool = True,
target_record_id: str = "",
refresh_list: bool = True,
):
sid = (session_id or "").strip()
if not sid:
return
if sid == _svc.get_session_id():
return
_svc.set_session_id(sid)
logger.info(
"[AIImagePanel] 切换会话 sid=%s refresh=%s target=%s",
sid[:8],
refresh_list,
target_record_id[:8] if target_record_id else "",
)
if refresh_list:
reload_ai_image_list(self)
if target_record_id:
item_id = _find_tree_item_by_record_id(self, target_record_id)
if item_id:
_set_tree_selection(self, item_id)
self.ai_image_status_var.set("已切换到历史对话")
# 切换会话后刷新参考图标签pending 仅当前会话有效,故清除)
self._ai_image_pending_ref_url = ""
self._ai_image_pending_ref_name = ""
_refresh_ref_label(self)
if show_message:
messagebox.showinfo("提示", "已切换到所选历史对话")
def _update_request_progress(self: "PQAutomationApp"):
if not getattr(self, "_ai_image_requesting", False):
self._ai_image_progress_job = None
return
self.ai_image_status_var.set("正在生成图片…")
self._ai_image_progress_phase += 1
self._ai_image_progress_job = self.root.after(900, lambda: _update_request_progress(self))
def _send_prompt(self: "PQAutomationApp"):
if getattr(self, "_ai_image_requesting", False):
return
if getattr(self, "_ai_image_uploading", False):
messagebox.showinfo("提示", "参考图上传中,请稍后再发送")
return
prompt = self.ai_image_input.get("1.0", tk.END).strip()
if not prompt:
messagebox.showinfo("提示", "请输入内容")
return
self._ai_image_request_seq += 1
req_seq = self._ai_image_request_seq
self._ai_image_active_seq = req_seq
self._ai_image_cancel_event = threading.Event()
_set_requesting(self, True)
is_remote_url = _svc.is_remote_image_url(prompt)
self.ai_image_status_var.set("下载中…" if is_remote_url else "后端处理中…")
def _success(record):
self.root.after(0, lambda: _on_request_done(self, record, None, req_seq))
def _error(exc):
self.root.after(0, lambda: _on_request_done(self, None, exc, req_seq))
if is_remote_url:
_svc.import_image_from_url_async(
prompt,
on_success=_success,
on_error=_error,
base_dir=_get_app_base_dir(self),
cancel_event=self._ai_image_cancel_event,
)
return
ref_url = _resolve_pending_ref_url(self)
if ref_url:
self.ai_image_status_var.set("后端处理中(图生图)…")
_svc.request_image_async(
prompt,
on_success=_success,
on_error=_error,
base_dir=_get_app_base_dir(self),
cancel_event=self._ai_image_cancel_event,
upload_image_url=ref_url or None,
)
def _set_requesting(self: "PQAutomationApp", flag: bool):
self._ai_image_requesting = flag
try:
self.ai_image_send_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
self.ai_image_new_session_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
self.ai_image_stop_btn.configure(state=tk.NORMAL if flag else tk.DISABLED)
if hasattr(self, "ai_image_upload_ref_btn"):
self.ai_image_upload_ref_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
if hasattr(self, "ai_image_clear_ref_btn"):
self.ai_image_clear_ref_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
except Exception:
pass
if flag:
self._ai_image_progress_phase = 0
self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0))
self.ai_image_progress.start(10)
if self._ai_image_progress_job is not None:
self.root.after_cancel(self._ai_image_progress_job)
self._ai_image_progress_job = self.root.after(0, lambda: _update_request_progress(self))
else:
if self._ai_image_progress_job is not None:
self.root.after_cancel(self._ai_image_progress_job)
self._ai_image_progress_job = None
try:
self.ai_image_progress.stop()
self.ai_image_progress.pack_forget()
except Exception:
pass
def _on_request_done(self: "PQAutomationApp", record, exc, req_seq):
# 旧请求回调(例如用户已点击停止后)直接忽略
if req_seq != getattr(self, "_ai_image_active_seq", 0):
return
self._ai_image_active_seq = 0
self._ai_image_cancel_event = None
_set_requesting(self, False)
if exc is not None:
self.ai_image_status_var.set(f"失败: {exc}")
messagebox.showerror("生成失败", str(exc))
return
self.ai_image_status_var.set("完成")
self.ai_image_input.delete("1.0", tk.END)
# 多轮对话链路:本轮返回的 imageUrl 作为下一轮的参考图
next_ref = ""
try:
if record is not None and isinstance(record.extra, dict):
next_ref = (record.extra.get("source_url") or "").strip()
except Exception:
next_ref = ""
sid = (record.session_id if record is not None else "") or _svc.get_session_id()
if next_ref and sid:
self._ai_image_session_refs[sid] = next_ref
# 手动上传的 pending 参考图只对本次发送生效,发完清除
self._ai_image_pending_ref_url = ""
self._ai_image_pending_ref_name = ""
_refresh_ref_label(self)
reload_ai_image_list(self)
if record is not None:
logger.info("[AIImagePanel] 生成完成并入列 id=%s sid=%s next_ref=%s",
record.id, (record.session_id or "")[:8], next_ref or "-")
if record is not None and self.ai_image_records:
item_id = _find_tree_item_by_record_id(self, record.id)
if item_id:
_set_tree_selection(self, item_id)
def _stop_request(self: "PQAutomationApp"):
"""停止当前生成任务(协作取消:屏蔽后续回调并恢复 UI"""
if not getattr(self, "_ai_image_requesting", False):
return
event = getattr(self, "_ai_image_cancel_event", None)
if event is not None:
event.set()
self._ai_image_active_seq = 0
_set_requesting(self, False)
self.ai_image_status_var.set("已停止生成")
def _save_current(self: "PQAutomationApp"):
rec = getattr(self, "ai_image_current", None)
if rec is None:
messagebox.showinfo("提示", "请先选择一张图片")
return
ext = os.path.splitext(rec.image_path)[1] or ".png"
dest = filedialog.asksaveasfilename(
title="另存为",
defaultextension=ext,
initialfile=os.path.basename(rec.image_path),
filetypes=[("图片", "*.png;*.jpg;*.jpeg;*.bmp;*.webp"), ("所有文件", "*.*")],
)
if not dest:
return
try:
_svc.export_record(rec, dest)
messagebox.showinfo("成功", f"已保存到:\n{dest}")
except Exception as exc:
messagebox.showerror("保存失败", str(exc))
def _delete_current(self: "PQAutomationApp"):
rec = getattr(self, "ai_image_current", None)
if rec is None:
messagebox.showinfo("提示", "请先选择一张图片")
return
if not messagebox.askyesno("确认删除", f"确定删除这张缓存图片吗?\n{os.path.basename(rec.image_path)}"):
return
_svc.delete_record(rec)
reload_ai_image_list(self)
def _rename_current(self: "PQAutomationApp"):
"""弹窗让用户修改当前记录的展示标题(保存到侧车 ``title`` 字段,原始 prompt 不变)。"""
rec = getattr(self, "ai_image_current", None)
if rec is None:
messagebox.showinfo("提示", "请先选择一张图片")
return
current = rec.title or rec.display_name
new_name = simpledialog.askstring(
"重命名",
"修改显示标题(留空可恢复使用原始提示词):",
initialvalue=current,
parent=self.root,
)
if new_name is None: # 取消
return
new_name = new_name.strip()
if new_name == (rec.title or ""):
return
if not _svc.update_record_title(rec, new_name):
messagebox.showerror("保存失败", "无法更新元数据,请检查文件权限。")
return
target_id = rec.id
reload_ai_image_list(self)
item_id = _find_tree_item_by_record_id(self, target_id)
if item_id:
_set_tree_selection(self, item_id)
# ---------------- 发送到 UCD ----------------
def _show_list_context_menu(self: "PQAutomationApp", event):
"""在图片列表上显示右键菜单,并根据状态启用/禁用项。"""
_hide_tree_tooltip(self)
try:
item_id = self.ai_image_tree.identify_row(event.y)
except Exception:
item_id = ""
node_map = getattr(self, "_ai_image_node_map", None) or {}
ridx = node_map.get(item_id)
if ridx is not None and 0 <= ridx < len(self.ai_image_records):
_set_tree_selection(self, item_id)
_select_record(self, self.ai_image_records[ridx])
elif item_id:
sid = _session_id_for_item(self, item_id)
if sid:
_switch_to_session(self, sid, show_message=False, refresh_list=False)
has_selection = self.ai_image_current is not None
ucd = getattr(self, "ucd", None)
can_send = (
has_selection
and ucd is not None
and getattr(ucd, "status", False)
)
try:
self.ai_image_menu.entryconfigure(
0, state=("normal" if can_send else "disabled")
)
self.ai_image_menu.entryconfigure(
2, state=("normal" if has_selection else "disabled")
)
self.ai_image_menu.entryconfigure(
3, state=("normal" if has_selection else "disabled")
)
self.ai_image_menu.entryconfigure(
4, state=("normal" if has_selection else "disabled")
)
self.ai_image_menu.tk_popup(event.x_root, event.y_root)
finally:
self.ai_image_menu.grab_release()
def _send_to_ucd(self: "PQAutomationApp"):
"""把当前选中的 AI 图片通过 UCD 发送到显示设备。"""
rec = getattr(self, "ai_image_current", None)
if rec is None:
messagebox.showinfo("提示", "请先选择一张图片")
return
if not os.path.isfile(rec.image_path):
messagebox.showerror("错误", f"图片文件不存在:\n{rec.image_path}")
return
ucd = getattr(self, "ucd", None)
if ucd is None or not getattr(ucd, "status", False):
messagebox.showwarning("警告", "请先连接 UCD323 设备")
return
image_path = rec.image_path
send_path = image_path
log = getattr(self, "log_gui", None)
# 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。
try:
target_w, target_h = self.signal_service.current_resolution()
except Exception:
target_w, target_h = 3840, 2160
try:
with Image.open(image_path) as _img:
src_w, src_h = _img.size
except Exception as exc:
messagebox.showerror("错误", f"无法读取图片尺寸:\n{exc}")
return
if log is not None:
log.log(
f"UCD分辨率: {target_w}x{target_h} | 图片分辨率: {src_w}x{src_h}",
level="info",
)
if (src_w, src_h) != (target_w, target_h):
if not messagebox.askyesno(
"分辨率不匹配",
(
f"当前 UCD 分辨率: {target_w}x{target_h}\n"
f"图片分辨率: {src_w}x{src_h}\n\n"
"是否自动缩放后再发送?"
),
):
if log is not None:
log.log("用户取消发送:图片分辨率与 UCD 不一致", level="warning")
return
try:
send_path = _build_ucd_resized_image(image_path, target_w, target_h)
if log is not None:
log.log(
f"已生成匹配分辨率副本: {os.path.basename(send_path)}",
level="info",
)
except Exception as exc:
messagebox.showerror("错误", f"自动缩放失败:\n{exc}")
return
if log is not None:
log.log(
f"发送 AI 图片到 UCD: {os.path.basename(send_path)}",
level="info",
)
if hasattr(self, "ai_image_status_var"):
self.ai_image_status_var.set("发送中…")
def _worker():
err = None
try:
self.signal_service.send_image(send_path)
ok = True
except Exception as exc:
ok = False
err = str(exc)
def _done():
if ok:
if log is not None:
log.log(
f"图片已发送到 UCD: {os.path.basename(send_path)}",
level="success",
)
if hasattr(self, "ai_image_status_var"):
self.ai_image_status_var.set("已发送到 UCD")
else:
msg = f"UCD 发送失败: {err}" if err else "UCD 发送失败"
if log is not None:
log.log(msg, level="error")
if hasattr(self, "ai_image_status_var"):
self.ai_image_status_var.set("UCD 发送失败")
try:
self.root.after(0, _done)
except Exception:
pass
threading.Thread(target=_worker, daemon=True).start()
def _build_ucd_resized_image(image_path: str, target_w: int, target_h: int) -> str:
"""生成与 UCD 分辨率匹配的临时副本(缓存目录内)。"""
base_dir = os.path.dirname(image_path)
base_name = os.path.splitext(os.path.basename(image_path))[0]
out_name = f"{base_name}_{target_w}x{target_h}_ucd.png"
out_path = os.path.join(base_dir, out_name)
with Image.open(image_path) as img:
resized = img.convert("RGB").resize((target_w, target_h), Image.LANCZOS)
resized.save(out_path, format="PNG")
return out_path
class AIImagePanelMixin:
"""由 tools/refactor_to_mixins.py 自动生成。
把本模块的自由函数挂到 PQAutomationApp 上,便于 F12 跳转与类型推断。
"""
create_ai_image_panel = create_ai_image_panel
toggle_ai_image_panel = toggle_ai_image_panel
_get_app_base_dir = _get_app_base_dir
reload_ai_image_list = reload_ai_image_list
_on_list_select = _on_list_select
_select_record = _select_record
_redraw_preview = _redraw_preview
_start_new_session = _start_new_session
_session_id_for_item = _session_id_for_item
_switch_to_session = _switch_to_session
_update_request_progress = _update_request_progress
_send_prompt = _send_prompt
_set_requesting = _set_requesting
_on_request_done = _on_request_done
_stop_request = _stop_request
_save_current = _save_current
_delete_current = _delete_current
_rename_current = _rename_current
_show_list_context_menu = _show_list_context_menu
_send_to_ucd = _send_to_ucd