"""AI 图片对话面板:输入提示 → 生成 → 显示 → 列表切换/保存/删除。""" from __future__ import annotations import os import threading import tkinter as tk from tkinter import filedialog, messagebox, simpledialog import ttkbootstrap as ttk from PIL import Image, ImageTk from app.services import ai_image as _svc from drivers.ucd_helpers import get_current_resolution, send_image_pattern # ---------------- 面板创建 ---------------- def create_ai_image_panel(self): """创建 AI 图片对话面板,并注册到面板管理。""" frame = ttk.Frame(self.content_frame) self.ai_image_frame = frame # 内部状态 self.ai_image_records = [] # list[AIImageRecord] self.ai_image_current = None # AIImageRecord | None self.ai_image_photo = None # 防止 ImageTk.PhotoImage 被 GC self._ai_image_requesting = False self._ai_image_progress_job = None self._ai_image_progress_phase = 0 container = ttk.Frame(frame, padding=10) container.pack(fill=tk.BOTH, expand=True) # 左列:图片列表 # 使用 grid + 权重,让右侧预览区优先占据剩余空间。 container.columnconfigure(0, weight=0) container.columnconfigure(1, weight=1) container.rowconfigure(0, weight=1) left = ttk.Frame(container, width=360) left.grid(row=0, column=0, sticky=tk.NS, padx=(0, 10)) left.grid_propagate(False) ttk.Label(left, text="历史图片", font=("微软雅黑", 10, "bold")).pack( anchor=tk.W, pady=(0, 4) ) list_wrap = ttk.Frame(left, padding=2) list_wrap.pack(fill=tk.BOTH, expand=True) scroll = ttk.Scrollbar(list_wrap, orient=tk.VERTICAL) self.ai_image_listbox = tk.Listbox( list_wrap, width=34, height=1, # 由 pack fill/expand 撑满,height 仅为最小保底 activestyle="none", font=("微软雅黑", 9), bd=1, relief=tk.FLAT, highlightthickness=1, highlightbackground="#d8d8d8", highlightcolor="#4a90e2", selectbackground="#2b6cb0", selectforeground="#ffffff", yscrollcommand=scroll.set, ) scroll.config(command=self.ai_image_listbox.yview) self.ai_image_listbox.pack(side=tk.LEFT, fill=tk.Y) scroll.pack(side=tk.RIGHT, fill=tk.Y) self.ai_image_listbox.bind("<>", lambda e: _on_list_select(self)) # 右键菜单:发送到 UCD / 重命名 / 另存为 / 删除 # 索引: 0=发送, 1=sep, 2=重命名, 3=另存为, 4=删除 self.ai_image_menu = tk.Menu(self.root, tearoff=0) self.ai_image_menu.add_command( label="发送图片", command=lambda: _send_to_ucd(self), ) self.ai_image_menu.add_separator() self.ai_image_menu.add_command( label="重命名…", command=lambda: _rename_current(self), ) self.ai_image_menu.add_command( label="另存为…", command=lambda: _save_current(self), ) self.ai_image_menu.add_command( label="删除", command=lambda: _delete_current(self), ) self.ai_image_listbox.bind( "", lambda e: _show_list_context_menu(self, e), ) btn_row = ttk.Frame(left) btn_row.pack(fill=tk.X, pady=(6, 0)) self.ai_image_send_ucd_btn = ttk.Button( btn_row, text="发送", bootstyle="info-outline", width=8, command=lambda: _send_to_ucd(self), ) self.ai_image_send_ucd_btn.pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="保存", bootstyle="success-outline", width=8, command=lambda: _save_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="删除", bootstyle="danger-outline", width=8, command=lambda: _delete_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="刷新", bootstyle="secondary-outline", width=8, command=lambda: reload_ai_image_list(self), ).pack(side=tk.LEFT) # 右列:预览 + 输入 right = ttk.Frame(container) right.grid(row=0, column=1, sticky=tk.NSEW) preview_frame = ttk.LabelFrame(right, text="图片预览", padding=6) preview_frame.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas = tk.Canvas( preview_frame, bg="#1e1e1e", highlightthickness=0 ) self.ai_image_canvas.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas.bind("", lambda e: _redraw_preview(self)) meta_row = ttk.Frame(right) meta_row.pack(fill=tk.X, pady=(4, 4)) self.ai_image_meta_var = tk.StringVar(value="未选择图片") ttk.Label( meta_row, textvariable=self.ai_image_meta_var, foreground="#666", font=("微软雅黑", 9), ).pack(side=tk.LEFT) # 输入区 input_frame = ttk.LabelFrame(right, text="提示输入(Ctrl+Enter 发送)", padding=6) input_frame.pack(fill=tk.X, pady=(4, 0)) self.ai_image_input = tk.Text(input_frame, height=3, wrap=tk.WORD) self.ai_image_input.pack(fill=tk.X, side=tk.TOP) self.ai_image_input.bind("", lambda e: (_send_prompt(self), "break")) send_row = ttk.Frame(input_frame) send_row.pack(fill=tk.X, pady=(4, 0)) self.ai_image_status_var = tk.StringVar(value="就绪") ttk.Label( send_row, textvariable=self.ai_image_status_var, foreground="#888", font=("微软雅黑", 9), ).pack(side=tk.LEFT) self.ai_image_progress = ttk.Progressbar( send_row, mode="indeterminate", length=120, bootstyle="info-striped", ) self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.pack_forget() self.ai_image_send_btn = ttk.Button( send_row, text="发送", bootstyle="primary", width=10, command=lambda: _send_prompt(self), ) self.ai_image_send_btn.pack(side=tk.RIGHT) self.ai_image_new_session_btn = ttk.Button( send_row, text="新对话", bootstyle="secondary-outline", width=10, command=lambda: _start_new_session(self), ) self.ai_image_new_session_btn.pack(side=tk.RIGHT, padx=(0, 6)) # 注册面板 self.register_panel("ai_image", frame, None, "ai_image_visible") self.ai_image_visible = False # 初次加载缓存 reload_ai_image_list(self) def toggle_ai_image_panel(self): """切换 AI 图片面板显隐。""" self.show_panel("ai_image") # ---------------- 列表 / 选中 ---------------- def reload_ai_image_list(self, auto_select_first=True): """重新扫描缓存并刷新列表。 按 ``session_id`` 分组:每个会话用一行不可选的分隔头(``── 会话 #N · 时间 ──``), 其下列出该轮生成的所有图片。会话按"最近使用"倒序,组内按时间倒序。 auto_select_first: 是否自动选中第一张图片(默认 True)。 """ self.ai_image_records = _svc.list_records() self.ai_image_listbox.delete(0, tk.END) # 维护行号 → 记录索引的映射;分隔头处为 None self._ai_image_row_map = [] self._ai_image_row_session_map = [] sessions = _svc.group_records_by_session(self.ai_image_records) flat = [] current_sid = _svc.get_session_id() for idx, sess in enumerate(sessions, start=1): sid = sess["session_id"] is_current = sid and sid == current_sid header = _format_session_header(idx, sess, is_current=is_current) self.ai_image_listbox.insert(tk.END, header) # 头部行:禁用选中(视觉上变灰) last = self.ai_image_listbox.size() - 1 self.ai_image_listbox.itemconfig( last, foreground="#888", selectforeground="#888", background="#f5f5f5", selectbackground="#f5f5f5", ) self._ai_image_row_map.append(None) self._ai_image_row_session_map.append(sid) for rec in sess["records"]: label = " " + _format_list_label(rec) self.ai_image_listbox.insert(tk.END, label) self._ai_image_row_map.append(len(flat)) self._ai_image_row_session_map.append(rec.session_id) flat.append(rec) # 替换为按显示顺序展平后的列表,便于其它逻辑(前一张/后一张等) self.ai_image_records = flat if self.ai_image_records and auto_select_first: # 选中第一张实际记录 for row, ridx in enumerate(self._ai_image_row_map): if ridx is not None: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) _select_record(self, self.ai_image_records[ridx]) break else: self.ai_image_current = None self.ai_image_photo = None self.ai_image_canvas.delete("all") self.ai_image_meta_var.set("暂无缓存图片") def _format_session_header(index: int, sess: dict, is_current: bool) -> str: started = (sess.get("started_at") or "").replace("T", " ")[:16] tag = "(当前)" if is_current else "" if sess.get("session_id"): return f"── 会话 #{index} · {started} {tag}──" return f"── 未归类 · {started} ──" def _format_list_label(rec: _svc.AIImageRecord) -> str: # 分辨率前缀:优先从 extra["size"] 取,其次从文件名猜 size_tag = "" extra = rec.extra or {} if isinstance(extra, dict) and extra.get("size"): size_tag = f"[{extra['size']}] " else: try: from PIL import Image as _Im with _Im.open(rec.image_path) as _im: size_tag = f"[{_im.width}×{_im.height}] " except Exception: pass name_line = rec.display_name.splitlines()[0] if rec.display_name else "(未命名)" # 列表宽度 width=34,需要扣除两格缩进 + size_tag max_name = 34 - 2 - len(size_tag) - 2 if max_name > 4 and len(name_line) > max_name: name_line = name_line[:max_name] + "…" return f"{size_tag}{name_line}" def _on_list_select(self): sel = self.ai_image_listbox.curselection() if not sel: return row = sel[0] row_map = getattr(self, "_ai_image_row_map", None) or [] if row >= len(row_map): return ridx = row_map[row] if ridx is None: session_id = _session_id_for_row(self, row) if session_id: _switch_to_session(self, session_id, show_message=False) self.ai_image_listbox.selection_clear(row) return if 0 <= ridx < len(self.ai_image_records): rec = self.ai_image_records[ridx] if rec.session_id: _switch_to_session(self, rec.session_id, show_message=False, target_record_id=rec.id) _select_record(self, rec) def _select_record(self, rec: _svc.AIImageRecord): self.ai_image_current = rec self.ai_image_meta_var.set( f"{os.path.basename(rec.image_path)} | {rec.created_at}" ) _redraw_preview(self) # ---------------- 预览绘制 ---------------- def _redraw_preview(self): rec = getattr(self, "ai_image_current", None) canvas = self.ai_image_canvas canvas.delete("all") if rec is None or not os.path.isfile(rec.image_path): return cw = canvas.winfo_width() or 1 ch = canvas.winfo_height() or 1 try: img = Image.open(rec.image_path) img.load() except Exception as exc: canvas.create_text(cw // 2, ch // 2, text=f"加载失败: {exc}", fill="#f66") return iw, ih = img.size scale = min(cw / iw, ch / ih, 1.0) nw, nh = max(1, int(iw * scale)), max(1, int(ih * scale)) img_resized = img.resize((nw, nh), Image.LANCZOS) self.ai_image_photo = ImageTk.PhotoImage(img_resized) canvas.create_image(cw // 2, ch // 2, image=self.ai_image_photo, anchor="center") # ---------------- 发送 / 保存 / 删除 ---------------- def _start_new_session(self): """开启新的对话会话,后续生成将使用新的 session_id。""" if getattr(self, "_ai_image_requesting", False): messagebox.showinfo("提示", "请等待当前请求完成") return _svc.reset_session() self.ai_image_status_var.set("已开启新对话") reload_ai_image_list(self, auto_select_first=False) def _session_id_for_row(self, row: int) -> str: session_map = getattr(self, "_ai_image_row_session_map", None) or [] if row < 0 or row >= len(session_map): return "" return session_map[row] or "" def _switch_to_session(self, session_id: str, show_message: bool = True, target_record_id: str = ""): sid = (session_id or "").strip() if not sid: return if sid == _svc.get_session_id(): return _svc.set_session_id(sid) reload_ai_image_list(self) if target_record_id: for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue rec = self.ai_image_records[ridx] if rec.id == target_record_id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, rec) break self.ai_image_status_var.set("已切换到历史对话") if show_message: messagebox.showinfo("提示", "已切换到所选历史对话") def _update_request_progress(self): if not getattr(self, "_ai_image_requesting", False): self._ai_image_progress_job = None return self.ai_image_status_var.set("正在生成图片…") self._ai_image_progress_phase += 1 self._ai_image_progress_job = self.root.after(900, lambda: _update_request_progress(self)) def _send_prompt(self): if getattr(self, "_ai_image_requesting", False): return prompt = self.ai_image_input.get("1.0", tk.END).strip() if not prompt: messagebox.showinfo("提示", "请输入内容") return _set_requesting(self, True) is_remote_url = _svc.is_remote_image_url(prompt) self.ai_image_status_var.set("下载中…" if is_remote_url else "后端处理中…") def _success(record): self.root.after(0, lambda: _on_request_done(self, record, None)) def _error(exc): self.root.after(0, lambda: _on_request_done(self, None, exc)) if is_remote_url: _svc.import_image_from_url_async( prompt, on_success=_success, on_error=_error, ) return _svc.request_image_async(prompt, on_success=_success, on_error=_error) def _set_requesting(self, flag: bool): self._ai_image_requesting = flag try: self.ai_image_send_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) self.ai_image_new_session_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) except Exception: pass if flag: self._ai_image_progress_phase = 0 self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.start(10) if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = self.root.after(0, lambda: _update_request_progress(self)) else: if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = None try: self.ai_image_progress.stop() self.ai_image_progress.pack_forget() except Exception: pass def _on_request_done(self, record, exc): _set_requesting(self, False) if exc is not None: self.ai_image_status_var.set(f"失败: {exc}") messagebox.showerror("生成失败", str(exc)) return self.ai_image_status_var.set("完成") self.ai_image_input.delete("1.0", tk.END) reload_ai_image_list(self) if record is not None and self.ai_image_records: for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue r = self.ai_image_records[ridx] if r.id == record.id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, r) break def _save_current(self): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return ext = os.path.splitext(rec.image_path)[1] or ".png" dest = filedialog.asksaveasfilename( title="另存为", defaultextension=ext, initialfile=os.path.basename(rec.image_path), filetypes=[("图片", "*.png;*.jpg;*.jpeg;*.bmp;*.webp"), ("所有文件", "*.*")], ) if not dest: return try: _svc.export_record(rec, dest) messagebox.showinfo("成功", f"已保存到:\n{dest}") except Exception as exc: messagebox.showerror("保存失败", str(exc)) def _delete_current(self): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not messagebox.askyesno("确认删除", f"确定删除这张缓存图片吗?\n{os.path.basename(rec.image_path)}"): return _svc.delete_record(rec) reload_ai_image_list(self) def _rename_current(self): """弹窗让用户修改当前记录的展示标题(保存到侧车 ``title`` 字段,原始 prompt 不变)。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return current = rec.title or rec.display_name new_name = simpledialog.askstring( "重命名", "修改显示标题(留空可恢复使用原始提示词):", initialvalue=current, parent=self.root, ) if new_name is None: # 取消 return new_name = new_name.strip() if new_name == (rec.title or ""): return if not _svc.update_record_title(rec, new_name): messagebox.showerror("保存失败", "无法更新元数据,请检查文件权限。") return target_id = rec.id reload_ai_image_list(self) # 重新定位 for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue r = self.ai_image_records[ridx] if r.id == target_id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, r) break # ---------------- 发送到 UCD ---------------- def _show_list_context_menu(self, event): """在图片列表上显示右键菜单,并根据状态启用/禁用项。""" try: row = self.ai_image_listbox.nearest(event.y) except Exception: row = -1 row_map = getattr(self, "_ai_image_row_map", None) or [] ridx = row_map[row] if 0 <= row < len(row_map) else None if ridx is not None and 0 <= ridx < len(self.ai_image_records): self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) _select_record(self, self.ai_image_records[ridx]) has_selection = self.ai_image_current is not None ucd = getattr(self, "ucd", None) can_send = ( has_selection and ucd is not None and getattr(ucd, "status", False) ) try: self.ai_image_menu.entryconfigure( 0, state=("normal" if can_send else "disabled") ) self.ai_image_menu.entryconfigure( 2, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 3, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 4, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.tk_popup(event.x_root, event.y_root) finally: self.ai_image_menu.grab_release() def _send_to_ucd(self): """把当前选中的 AI 图片通过 UCD 发送到显示设备。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not os.path.isfile(rec.image_path): messagebox.showerror("错误", f"图片文件不存在:\n{rec.image_path}") return ucd = getattr(self, "ucd", None) if ucd is None or not getattr(ucd, "status", False): messagebox.showwarning("警告", "请先连接 UCD323 设备") return image_path = rec.image_path send_path = image_path log = getattr(self, "log_gui", None) # 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。 try: target_w, target_h = get_current_resolution(ucd) except Exception: target_w, target_h = 3840, 2160 try: with Image.open(image_path) as _img: src_w, src_h = _img.size except Exception as exc: messagebox.showerror("错误", f"无法读取图片尺寸:\n{exc}") return if log is not None: log.log( f"UCD分辨率: {target_w}x{target_h} | 图片分辨率: {src_w}x{src_h}", level="info", ) if (src_w, src_h) != (target_w, target_h): if not messagebox.askyesno( "分辨率不匹配", ( f"当前 UCD 分辨率: {target_w}x{target_h}\n" f"图片分辨率: {src_w}x{src_h}\n\n" "是否自动缩放后再发送?" ), ): if log is not None: log.log("用户取消发送:图片分辨率与 UCD 不一致", level="warning") return try: send_path = _build_ucd_resized_image(image_path, target_w, target_h) if log is not None: log.log( f"已生成匹配分辨率副本: {os.path.basename(send_path)}", level="info", ) except Exception as exc: messagebox.showerror("错误", f"自动缩放失败:\n{exc}") return if log is not None: log.log( f"发送 AI 图片到 UCD: {os.path.basename(send_path)}", level="info", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("发送中…") def _worker(): err = None try: ok = send_image_pattern(ucd, send_path) except Exception as exc: ok = False err = str(exc) def _done(): if ok: if log is not None: log.log( f"图片已发送到 UCD: {os.path.basename(send_path)}", level="success", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("已发送到 UCD") else: msg = f"UCD 发送失败: {err}" if err else "UCD 发送失败" if log is not None: log.log(msg, level="error") if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("UCD 发送失败") try: self.root.after(0, _done) except Exception: pass threading.Thread(target=_worker, daemon=True).start() def _build_ucd_resized_image(image_path: str, target_w: int, target_h: int) -> str: """生成与 UCD 分辨率匹配的临时副本(缓存目录内)。""" base_dir = os.path.dirname(image_path) base_name = os.path.splitext(os.path.basename(image_path))[0] out_name = f"{base_name}_{target_w}x{target_h}_ucd.png" out_path = os.path.join(base_dir, out_name) with Image.open(image_path) as img: resized = img.convert("RGB").resize((target_w, target_h), Image.LANCZOS) resized.save(out_path, format="PNG") return out_path