"""AI 图片对话面板:输入提示 → 生成 → 显示 → 列表切换/保存/删除。""" from __future__ import annotations import os import sys import threading import tkinter as tk from tkinter import filedialog, messagebox, simpledialog import ttkbootstrap as ttk from PIL import Image, ImageTk from app.services import ai_image as _svc from drivers.ucd_helpers import get_current_resolution, send_image_pattern # ---------------- 面板创建 ---------------- def create_ai_image_panel(self): """创建 AI 图片对话面板,并注册到面板管理。""" frame = ttk.Frame(self.content_frame) self.ai_image_frame = frame # 内部状态 self.ai_image_records = [] # list[AIImageRecord] self.ai_image_current = None # AIImageRecord | None self.ai_image_photo = None # 防止 ImageTk.PhotoImage 被 GC self._ai_image_requesting = False self._ai_image_progress_job = None self._ai_image_progress_phase = 0 self._ai_image_cancel_event = None self._ai_image_request_seq = 0 self._ai_image_active_seq = 0 container = ttk.Frame(frame, padding=10) container.pack(fill=tk.BOTH, expand=True) # 左列:图片列表 # 使用 grid + 权重,让右侧预览区优先占据剩余空间。 container.columnconfigure(0, weight=0) container.columnconfigure(1, weight=1) container.rowconfigure(0, weight=1) left = ttk.Frame(container, width=360) left.grid(row=0, column=0, sticky=tk.NS, padx=(0, 10)) left.grid_propagate(False) ttk.Label(left, text="历史图片", font=("微软雅黑", 10, "bold")).pack( anchor=tk.W, pady=(0, 4) ) list_wrap = ttk.Frame(left, padding=2) list_wrap.pack(fill=tk.BOTH, expand=True) scroll = ttk.Scrollbar(list_wrap, orient=tk.VERTICAL) self.ai_image_listbox = tk.Listbox( list_wrap, width=34, height=1, # 由 pack fill/expand 撑满,height 仅为最小保底 activestyle="none", font=("微软雅黑", 9), bd=1, relief=tk.FLAT, highlightthickness=1, highlightbackground="#d8d8d8", highlightcolor="#4a90e2", selectbackground="#2b6cb0", selectforeground="#ffffff", yscrollcommand=scroll.set, ) scroll.config(command=self.ai_image_listbox.yview) self.ai_image_listbox.pack(side=tk.LEFT, fill=tk.Y) scroll.pack(side=tk.RIGHT, fill=tk.Y) self.ai_image_listbox.bind("<>", lambda e: _on_list_select(self)) # 右键菜单:发送到 UCD / 重命名 / 另存为 / 删除 # 索引: 0=发送, 1=sep, 2=重命名, 3=另存为, 4=删除 self.ai_image_menu = tk.Menu(self.root, tearoff=0) self.ai_image_menu.add_command( label="发送图片", command=lambda: _send_to_ucd(self), ) self.ai_image_menu.add_separator() self.ai_image_menu.add_command( label="重命名…", command=lambda: _rename_current(self), ) self.ai_image_menu.add_command( label="另存为…", command=lambda: _save_current(self), ) self.ai_image_menu.add_command( label="删除", command=lambda: _delete_current(self), ) self.ai_image_listbox.bind( "", lambda e: _show_list_context_menu(self, e), ) btn_row = ttk.Frame(left) btn_row.pack(fill=tk.X, pady=(6, 0)) self.ai_image_send_ucd_btn = ttk.Button( btn_row, text="发送", bootstyle="info-outline", width=8, command=lambda: _send_to_ucd(self), ) self.ai_image_send_ucd_btn.pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="保存", bootstyle="success-outline", width=8, command=lambda: _save_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="删除", bootstyle="danger-outline", width=8, command=lambda: _delete_current(self), ).pack(side=tk.LEFT, padx=(0, 4)) ttk.Button( btn_row, text="刷新", bootstyle="secondary-outline", width=8, command=lambda: reload_ai_image_list(self), ).pack(side=tk.LEFT) # 右列:预览 + 输入 right = ttk.Frame(container) right.grid(row=0, column=1, sticky=tk.NSEW) preview_frame = ttk.LabelFrame(right, text="图片预览", padding=6) preview_frame.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas = tk.Canvas( preview_frame, bg="#1e1e1e", highlightthickness=0 ) self.ai_image_canvas.pack(fill=tk.BOTH, expand=True) self.ai_image_canvas.bind("", lambda e: _redraw_preview(self)) meta_row = ttk.Frame(right) meta_row.pack(fill=tk.X, pady=(4, 4)) self.ai_image_meta_var = tk.StringVar(value="未选择图片") ttk.Label( meta_row, textvariable=self.ai_image_meta_var, foreground="#666", font=("微软雅黑", 9), ).pack(side=tk.LEFT) # 输入区 input_frame = ttk.LabelFrame(right, text="提示输入(Ctrl+Enter 发送)", padding=6) input_frame.pack(fill=tk.X, pady=(4, 0)) self.ai_image_input = tk.Text(input_frame, height=3, wrap=tk.WORD) self.ai_image_input.pack(fill=tk.X, side=tk.TOP) self.ai_image_input.bind("", lambda e: (_send_prompt(self), "break")) send_row = ttk.Frame(input_frame) send_row.pack(fill=tk.X, pady=(4, 0)) self.ai_image_status_var = tk.StringVar(value="就绪") ttk.Label( send_row, textvariable=self.ai_image_status_var, foreground="#888", font=("微软雅黑", 9), ).pack(side=tk.LEFT) self.ai_image_progress = ttk.Progressbar( send_row, mode="indeterminate", length=120, bootstyle="info-striped", ) self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.pack_forget() self.ai_image_send_btn = ttk.Button( send_row, text="发送", bootstyle="primary", width=10, command=lambda: _send_prompt(self), ) self.ai_image_send_btn.pack(side=tk.RIGHT) self.ai_image_new_session_btn = ttk.Button( send_row, text="新对话", bootstyle="secondary-outline", width=10, command=lambda: _start_new_session(self), ) self.ai_image_new_session_btn.pack(side=tk.RIGHT, padx=(0, 6)) self.ai_image_stop_btn = ttk.Button( send_row, text="停止", bootstyle="danger-outline", width=10, command=lambda: _stop_request(self), ) self.ai_image_stop_btn.pack(side=tk.RIGHT, padx=(0, 6)) self.ai_image_stop_btn.configure(state=tk.DISABLED) # 注册面板 self.register_panel("ai_image", frame, None, "ai_image_visible") self.ai_image_visible = False # 初次加载缓存 reload_ai_image_list(self) def toggle_ai_image_panel(self): """切换 AI 图片面板显隐。""" self.show_panel("ai_image") def _get_app_base_dir(self) -> str: """返回应用根目录(settings 的上一级)。""" if getattr(self, "config_file", None): return os.path.dirname(os.path.dirname(self.config_file)) if getattr(sys, "frozen", False): return os.path.dirname(sys.executable) return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # ---------------- 列表 / 选中 ---------------- def reload_ai_image_list(self, auto_select_first=True): """重新扫描缓存并刷新列表。 按 ``session_id`` 分组:每个会话用一行不可选的分隔头(``── 会话 #N · 时间 ──``), 其下列出该轮生成的所有图片。会话按"最近使用"倒序,组内按时间倒序。 auto_select_first: 是否自动选中第一张图片(默认 True)。 """ self.ai_image_records = _svc.list_records(base_dir=_get_app_base_dir(self)) self.ai_image_listbox.delete(0, tk.END) # 维护行号 → 记录索引的映射;分隔头处为 None self._ai_image_row_map = [] self._ai_image_row_session_map = [] sessions = _svc.group_records_by_session(self.ai_image_records) flat = [] current_sid = _svc.get_session_id() for idx, sess in enumerate(sessions, start=1): sid = sess["session_id"] is_current = sid and sid == current_sid header = _format_session_header(idx, sess, is_current=is_current) self.ai_image_listbox.insert(tk.END, header) # 头部行:禁用选中(视觉上变灰) last = self.ai_image_listbox.size() - 1 self.ai_image_listbox.itemconfig( last, foreground="#888", selectforeground="#888", background="#f5f5f5", selectbackground="#f5f5f5", ) self._ai_image_row_map.append(None) self._ai_image_row_session_map.append(sid) for rec in sess["records"]: label = " " + _format_list_label(rec) self.ai_image_listbox.insert(tk.END, label) self._ai_image_row_map.append(len(flat)) self._ai_image_row_session_map.append(rec.session_id) flat.append(rec) # 替换为按显示顺序展平后的列表,便于其它逻辑(前一张/后一张等) self.ai_image_records = flat if self.ai_image_records and auto_select_first: # 选中第一张实际记录 for row, ridx in enumerate(self._ai_image_row_map): if ridx is not None: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) _select_record(self, self.ai_image_records[ridx]) break else: self.ai_image_current = None self.ai_image_photo = None self.ai_image_canvas.delete("all") self.ai_image_meta_var.set("暂无缓存图片") def _format_session_header(index: int, sess: dict, is_current: bool) -> str: started = (sess.get("started_at") or "").replace("T", " ")[:16] tag = "(当前)" if is_current else "" if sess.get("session_id"): return f"── 会话 #{index} · {started} {tag}──" return f"── 未归类 · {started} ──" def _format_list_label(rec: _svc.AIImageRecord) -> str: # 分辨率前缀:优先从 extra["size"] 取,其次从文件名猜 size_tag = "" extra = rec.extra or {} if isinstance(extra, dict) and extra.get("size"): size_tag = f"[{extra['size']}] " else: try: from PIL import Image as _Im with _Im.open(rec.image_path) as _im: size_tag = f"[{_im.width}×{_im.height}] " except Exception: pass name_line = rec.display_name.splitlines()[0] if rec.display_name else "(未命名)" # 列表宽度 width=34,需要扣除两格缩进 + size_tag max_name = 34 - 2 - len(size_tag) - 2 if max_name > 4 and len(name_line) > max_name: name_line = name_line[:max_name] + "…" return f"{size_tag}{name_line}" def _on_list_select(self): sel = self.ai_image_listbox.curselection() if not sel: return row = sel[0] row_map = getattr(self, "_ai_image_row_map", None) or [] if row >= len(row_map): return ridx = row_map[row] if ridx is None: session_id = _session_id_for_row(self, row) if session_id: _switch_to_session(self, session_id, show_message=False) self.ai_image_listbox.selection_clear(row) return if 0 <= ridx < len(self.ai_image_records): rec = self.ai_image_records[ridx] if rec.session_id: _switch_to_session(self, rec.session_id, show_message=False, target_record_id=rec.id) _select_record(self, rec) def _select_record(self, rec: _svc.AIImageRecord): self.ai_image_current = rec self.ai_image_meta_var.set( f"{os.path.basename(rec.image_path)} | {rec.created_at}" ) _redraw_preview(self) # ---------------- 预览绘制 ---------------- def _redraw_preview(self): rec = getattr(self, "ai_image_current", None) canvas = self.ai_image_canvas canvas.delete("all") if rec is None or not os.path.isfile(rec.image_path): return cw = canvas.winfo_width() or 1 ch = canvas.winfo_height() or 1 try: img = Image.open(rec.image_path) img.load() except Exception as exc: canvas.create_text(cw // 2, ch // 2, text=f"加载失败: {exc}", fill="#f66") return iw, ih = img.size scale = min(cw / iw, ch / ih, 1.0) nw, nh = max(1, int(iw * scale)), max(1, int(ih * scale)) img_resized = img.resize((nw, nh), Image.LANCZOS) self.ai_image_photo = ImageTk.PhotoImage(img_resized) canvas.create_image(cw // 2, ch // 2, image=self.ai_image_photo, anchor="center") # ---------------- 发送 / 保存 / 删除 ---------------- def _start_new_session(self): """开启新的对话会话,后续生成将使用新的 session_id。""" if getattr(self, "_ai_image_requesting", False): messagebox.showinfo("提示", "请等待当前请求完成") return _svc.reset_session() self.ai_image_status_var.set("已开启新对话") reload_ai_image_list(self, auto_select_first=False) def _session_id_for_row(self, row: int) -> str: session_map = getattr(self, "_ai_image_row_session_map", None) or [] if row < 0 or row >= len(session_map): return "" return session_map[row] or "" def _switch_to_session(self, session_id: str, show_message: bool = True, target_record_id: str = ""): sid = (session_id or "").strip() if not sid: return if sid == _svc.get_session_id(): return _svc.set_session_id(sid) reload_ai_image_list(self) if target_record_id: for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue rec = self.ai_image_records[ridx] if rec.id == target_record_id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, rec) break self.ai_image_status_var.set("已切换到历史对话") if show_message: messagebox.showinfo("提示", "已切换到所选历史对话") def _update_request_progress(self): if not getattr(self, "_ai_image_requesting", False): self._ai_image_progress_job = None return self.ai_image_status_var.set("正在生成图片…") self._ai_image_progress_phase += 1 self._ai_image_progress_job = self.root.after(900, lambda: _update_request_progress(self)) def _send_prompt(self): if getattr(self, "_ai_image_requesting", False): return prompt = self.ai_image_input.get("1.0", tk.END).strip() if not prompt: messagebox.showinfo("提示", "请输入内容") return self._ai_image_request_seq += 1 req_seq = self._ai_image_request_seq self._ai_image_active_seq = req_seq self._ai_image_cancel_event = threading.Event() _set_requesting(self, True) is_remote_url = _svc.is_remote_image_url(prompt) self.ai_image_status_var.set("下载中…" if is_remote_url else "后端处理中…") def _success(record): self.root.after(0, lambda: _on_request_done(self, record, None, req_seq)) def _error(exc): self.root.after(0, lambda: _on_request_done(self, None, exc, req_seq)) if is_remote_url: _svc.import_image_from_url_async( prompt, on_success=_success, on_error=_error, base_dir=_get_app_base_dir(self), cancel_event=self._ai_image_cancel_event, ) return _svc.request_image_async( prompt, on_success=_success, on_error=_error, base_dir=_get_app_base_dir(self), cancel_event=self._ai_image_cancel_event, ) def _set_requesting(self, flag: bool): self._ai_image_requesting = flag try: self.ai_image_send_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) self.ai_image_new_session_btn.configure(state=tk.DISABLED if flag else tk.NORMAL) self.ai_image_stop_btn.configure(state=tk.NORMAL if flag else tk.DISABLED) except Exception: pass if flag: self._ai_image_progress_phase = 0 self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0)) self.ai_image_progress.start(10) if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = self.root.after(0, lambda: _update_request_progress(self)) else: if self._ai_image_progress_job is not None: self.root.after_cancel(self._ai_image_progress_job) self._ai_image_progress_job = None try: self.ai_image_progress.stop() self.ai_image_progress.pack_forget() except Exception: pass def _on_request_done(self, record, exc, req_seq): # 旧请求回调(例如用户已点击停止后)直接忽略 if req_seq != getattr(self, "_ai_image_active_seq", 0): return self._ai_image_active_seq = 0 self._ai_image_cancel_event = None _set_requesting(self, False) if exc is not None: self.ai_image_status_var.set(f"失败: {exc}") messagebox.showerror("生成失败", str(exc)) return self.ai_image_status_var.set("完成") self.ai_image_input.delete("1.0", tk.END) reload_ai_image_list(self) if record is not None and self.ai_image_records: for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue r = self.ai_image_records[ridx] if r.id == record.id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, r) break def _stop_request(self): """停止当前生成任务(协作取消:屏蔽后续回调并恢复 UI)。""" if not getattr(self, "_ai_image_requesting", False): return event = getattr(self, "_ai_image_cancel_event", None) if event is not None: event.set() self._ai_image_active_seq = 0 _set_requesting(self, False) self.ai_image_status_var.set("已停止生成") def _save_current(self): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return ext = os.path.splitext(rec.image_path)[1] or ".png" dest = filedialog.asksaveasfilename( title="另存为", defaultextension=ext, initialfile=os.path.basename(rec.image_path), filetypes=[("图片", "*.png;*.jpg;*.jpeg;*.bmp;*.webp"), ("所有文件", "*.*")], ) if not dest: return try: _svc.export_record(rec, dest) messagebox.showinfo("成功", f"已保存到:\n{dest}") except Exception as exc: messagebox.showerror("保存失败", str(exc)) def _delete_current(self): rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not messagebox.askyesno("确认删除", f"确定删除这张缓存图片吗?\n{os.path.basename(rec.image_path)}"): return _svc.delete_record(rec) reload_ai_image_list(self) def _rename_current(self): """弹窗让用户修改当前记录的展示标题(保存到侧车 ``title`` 字段,原始 prompt 不变)。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return current = rec.title or rec.display_name new_name = simpledialog.askstring( "重命名", "修改显示标题(留空可恢复使用原始提示词):", initialvalue=current, parent=self.root, ) if new_name is None: # 取消 return new_name = new_name.strip() if new_name == (rec.title or ""): return if not _svc.update_record_title(rec, new_name): messagebox.showerror("保存失败", "无法更新元数据,请检查文件权限。") return target_id = rec.id reload_ai_image_list(self) # 重新定位 for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []): if ridx is None: continue r = self.ai_image_records[ridx] if r.id == target_id: self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) self.ai_image_listbox.see(row) _select_record(self, r) break # ---------------- 发送到 UCD ---------------- def _show_list_context_menu(self, event): """在图片列表上显示右键菜单,并根据状态启用/禁用项。""" try: row = self.ai_image_listbox.nearest(event.y) except Exception: row = -1 row_map = getattr(self, "_ai_image_row_map", None) or [] ridx = row_map[row] if 0 <= row < len(row_map) else None if ridx is not None and 0 <= ridx < len(self.ai_image_records): self.ai_image_listbox.selection_clear(0, tk.END) self.ai_image_listbox.selection_set(row) self.ai_image_listbox.activate(row) _select_record(self, self.ai_image_records[ridx]) has_selection = self.ai_image_current is not None ucd = getattr(self, "ucd", None) can_send = ( has_selection and ucd is not None and getattr(ucd, "status", False) ) try: self.ai_image_menu.entryconfigure( 0, state=("normal" if can_send else "disabled") ) self.ai_image_menu.entryconfigure( 2, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 3, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.entryconfigure( 4, state=("normal" if has_selection else "disabled") ) self.ai_image_menu.tk_popup(event.x_root, event.y_root) finally: self.ai_image_menu.grab_release() def _send_to_ucd(self): """把当前选中的 AI 图片通过 UCD 发送到显示设备。""" rec = getattr(self, "ai_image_current", None) if rec is None: messagebox.showinfo("提示", "请先选择一张图片") return if not os.path.isfile(rec.image_path): messagebox.showerror("错误", f"图片文件不存在:\n{rec.image_path}") return ucd = getattr(self, "ucd", None) if ucd is None or not getattr(ucd, "status", False): messagebox.showwarning("警告", "请先连接 UCD323 设备") return image_path = rec.image_path send_path = image_path log = getattr(self, "log_gui", None) # 发送前检查分辨率:建议与当前 UCD 输出分辨率一致。 try: target_w, target_h = get_current_resolution(ucd) except Exception: target_w, target_h = 3840, 2160 try: with Image.open(image_path) as _img: src_w, src_h = _img.size except Exception as exc: messagebox.showerror("错误", f"无法读取图片尺寸:\n{exc}") return if log is not None: log.log( f"UCD分辨率: {target_w}x{target_h} | 图片分辨率: {src_w}x{src_h}", level="info", ) if (src_w, src_h) != (target_w, target_h): if not messagebox.askyesno( "分辨率不匹配", ( f"当前 UCD 分辨率: {target_w}x{target_h}\n" f"图片分辨率: {src_w}x{src_h}\n\n" "是否自动缩放后再发送?" ), ): if log is not None: log.log("用户取消发送:图片分辨率与 UCD 不一致", level="warning") return try: send_path = _build_ucd_resized_image(image_path, target_w, target_h) if log is not None: log.log( f"已生成匹配分辨率副本: {os.path.basename(send_path)}", level="info", ) except Exception as exc: messagebox.showerror("错误", f"自动缩放失败:\n{exc}") return if log is not None: log.log( f"发送 AI 图片到 UCD: {os.path.basename(send_path)}", level="info", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("发送中…") def _worker(): err = None try: ok = send_image_pattern(ucd, send_path) except Exception as exc: ok = False err = str(exc) def _done(): if ok: if log is not None: log.log( f"图片已发送到 UCD: {os.path.basename(send_path)}", level="success", ) if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("已发送到 UCD") else: msg = f"UCD 发送失败: {err}" if err else "UCD 发送失败" if log is not None: log.log(msg, level="error") if hasattr(self, "ai_image_status_var"): self.ai_image_status_var.set("UCD 发送失败") try: self.root.after(0, _done) except Exception: pass threading.Thread(target=_worker, daemon=True).start() def _build_ucd_resized_image(image_path: str, target_w: int, target_h: int) -> str: """生成与 UCD 分辨率匹配的临时副本(缓存目录内)。""" base_dir = os.path.dirname(image_path) base_name = os.path.splitext(os.path.basename(image_path))[0] out_name = f"{base_name}_{target_w}x{target_h}_ucd.png" out_path = os.path.join(base_dir, out_name) with Image.open(image_path) as img: resized = img.convert("RGB").resize((target_w, target_h), Image.LANCZOS) resized.save(out_path, format="PNG") return out_path